Vert.x リファレンスガイド
Vert.xは、リアクティブなアプリケーションを構築するためのツールキットです。 Quarkus Reactive Architectureに記載されているように、QuarkusではVert.xを下地に使用しています。
このガイドは、 QuarkusアプリケーションからEclipse Vert.x APIの使用 ガイドの姉妹編です。Quarkusで使用されるVert.xインスタンスの使用方法や設定について、より詳細に説明しています。
Vert.xインスタンスへのアクセス
マネージド Vert.x インスタンスにアクセスするには、 quarkus-vertx
エクステンションをプロジェクトに追加します。
この依存関係は、プロジェクトで既に利用可能な場合があります(推移的依存関係として)。
このエクステンションでは、フィールドまたはコンストラクタ・インジェクションのいずれかを使用して、Vert.xのマネージドインスタンスを取得できます。
@ApplicationScoped
public class MyBean {
// Field injection
@Inject Vertx vertx;
// Constructor injection
MyBean(Vertx vertx) {
// ...
}
}
以下のどちらかを注入することができます。
-
素の Vert.x APIを公開する
io.vertx.core.Vertx
インスタンス -
Mutiny APIを公開する
io.vertx.mutiny.core.Vertx
インスタンス
Mutiny版は、Quarkusが提供する他のリアクティブAPIと統合されているため、Mutiny版の使用をお勧めします。
Mutiny
Mutinyに慣れていない方は、 Mutiny - 直感的なリアクティブプログラミングライブラリをご覧ください。 |
Vert.x Mutiny版に関するドキュメントは https://smallrye.io/smallrye-mutiny-vertx-bindings にあります。
Vert.xインスタンスの設定
application.properties
ファイルから Vert.x インスタンスを設定することができます。次の表は、サポートされているプロパティの一覧です。
ビルド時に固定される構成プロパティ - 他のすべての構成プロパティは実行時にオーバーライド可能
Configuration property |
タイプ |
デフォルト |
---|---|---|
Enables or disables the Vert.x cache. Environment variable: Show more |
ブーリアン |
|
Configure the file cache directory. When not set, the cache is stored in the system temporary directory (read from the Note that this property is ignored if the Environment variable: Show more |
string |
|
Enables or disabled the Vert.x classpath resource resolver. Environment variable: Show more |
ブーリアン |
|
The number of event loops. By default, it matches the number of CPUs detected on the system. Environment variable: Show more |
int |
|
The maximum amount of time the event loop can be blocked. Environment variable: Show more |
|
|
The amount of time before a warning is displayed if the event loop is blocked. Environment variable: Show more |
|
|
The maximum amount of time the worker thread can be blocked. Environment variable: Show more |
|
|
The size of the internal thread pool (used for the file system). Environment variable: Show more |
int |
|
The queue size. For most applications this should be unbounded Environment variable: Show more |
int |
|
The executor growth resistance. A resistance factor applied after the core pool is full; values applied here will cause that fraction of submissions to create new threads when no idle thread is available. A value of Environment variable: Show more |
float |
|
The amount of time a thread will stay alive with no work. Environment variable: Show more |
|
|
Prefill thread pool when creating a new Executor. When Environment variable: Show more |
ブーリアン |
|
Enables the async DNS resolver. Environment variable: Show more |
ブーリアン |
|
PEM Key/cert config is disabled by default. Environment variable: Show more |
ブーリアン |
|
Comma-separated list of the path to the key files (Pem format). Environment variable: Show more |
list of string |
|
Comma-separated list of the path to the certificate files (Pem format). Environment variable: Show more |
list of string |
|
JKS config is disabled by default. Environment variable: Show more |
ブーリアン |
|
Path of the key file (JKS format). Environment variable: Show more |
string |
|
Password of the key file. Environment variable: Show more |
string |
|
PFX config is disabled by default. Environment variable: Show more |
ブーリアン |
|
Path to the key file (PFX format). Environment variable: Show more |
string |
|
Password of the key. Environment variable: Show more |
string |
|
PEM Trust config is disabled by default. Environment variable: Show more |
ブーリアン |
|
Comma-separated list of the trust certificate files (Pem format). Environment variable: Show more |
list of string |
|
JKS config is disabled by default. Environment variable: Show more |
ブーリアン |
|
Path of the key file (JKS format). Environment variable: Show more |
string |
|
Password of the key file. Environment variable: Show more |
string |
|
PFX config is disabled by default. Environment variable: Show more |
ブーリアン |
|
Path to the key file (PFX format). Environment variable: Show more |
string |
|
Password of the key. Environment variable: Show more |
string |
|
The accept backlog. Environment variable: Show more |
int |
|
The client authentication. Environment variable: Show more |
string |
|
The connect timeout. Environment variable: Show more |
|
|
The idle timeout in milliseconds. Environment variable: Show more |
||
The receive buffer size. Environment variable: Show more |
int |
|
The number of reconnection attempts. Environment variable: Show more |
int |
|
The reconnection interval in milliseconds. Environment variable: Show more |
|
|
Whether to reuse the address. Environment variable: Show more |
ブーリアン |
|
Whether to reuse the port. Environment variable: Show more |
ブーリアン |
|
The send buffer size. Environment variable: Show more |
int |
|
The so linger. Environment variable: Show more |
int |
|
Enables or Disabled SSL. Environment variable: Show more |
ブーリアン |
|
Whether to keep the TCP connection opened (keep-alive). Environment variable: Show more |
ブーリアン |
|
Configure the TCP no delay. Environment variable: Show more |
ブーリアン |
|
Configure the traffic class. Environment variable: Show more |
int |
|
Enables or disables the trust all parameter. Environment variable: Show more |
ブーリアン |
|
The host name. Environment variable: Show more |
string |
|
int |
||
The public host name. Environment variable: Show more |
string |
|
The public port. Environment variable: Show more |
int |
|
Enables or disables the clustering. Environment variable: Show more |
ブーリアン |
|
The ping interval. Environment variable: Show more |
|
|
The ping reply interval. Environment variable: Show more |
|
|
The maximum amount of time in seconds that a successfully resolved address will be cached. If not set explicitly, resolved addresses may be cached forever. Environment variable: Show more |
int |
|
The minimum amount of time in seconds that a successfully resolved address will be cached. Environment variable: Show more |
int |
|
The amount of time in seconds that an unsuccessful attempt to resolve an address will be cached. Environment variable: Show more |
int |
|
The maximum number of queries to be sent during a resolution. Environment variable: Show more |
int |
|
The duration after which a DNS query is considered to be failed. Environment variable: Show more |
|
|
Set the path of an alternate hosts configuration file to use instead of the one provided by the os. The default value is Environment variable: Show more |
string |
|
Set the hosts configuration refresh period in millis, The resolver caches the hosts configuration (configured using Environment variable: Show more |
int |
|
Set the list of DNS server addresses, an address is the IP of the dns server, followed by an optional colon and a port, e.g Environment variable: Show more |
list of string |
|
Set to true to enable the automatic inclusion in DNS queries of an optional record that hints the remote DNS server about how much data the resolver can read per response. Environment variable: Show more |
ブーリアン |
|
Set the DNS queries Recursion Desired flag value. Environment variable: Show more |
ブーリアン |
|
Set the lists of DNS search domains. When the search domain list is null, the effective search domain list will be populated using the system DNS search domains. Environment variable: Show more |
list of string |
|
Set the ndots value used when resolving using search domains, the default value is Environment variable: Show more |
int |
|
Set to Environment variable: Show more |
ブーリアン |
|
Set to Environment variable: Show more |
ブーリアン |
|
Enable or disable native transport Environment variable: Show more |
ブーリアン |
|
期間フォーマットについて
To write duration values, use the standard 数字で始まる簡略化した書式を使うこともできます:
その他の場合は、簡略化されたフォーマットが解析のために
|
プログラム的なアプローチで Vert.x インスタンスを構成するには、 Vert.x設定のカスタマイズ を参照してください。
vert.xクライアントを使用
Vert.xコアに加えて、ほとんどのVert.xエコシステムライブラリを使用することができます。いくつかのQuarkusエクステンションは、すでにVert.xライブラリをラップしています。
利用可能なAPI
次の表は、Vert.xエコシステムで 最も 使用されているライブラリの一覧です。 これらのAPIにアクセスするには、指定されたエクステンションまたは依存関係をプロジェクトに追加します。 使用方法については、関連ドキュメントを参照してください。
API |
エクステンションか依存関係 |
ドキュメント |
AMQPクライアント |
|
|
サーキットブレーカー |
|
|
Consul クライアント |
|
|
DB2クライアント |
|
|
Kafkaクライアント |
|
|
メールクライアント |
|
|
MQTTクライアント |
|
ガイドはまだありません |
MS SQLクライアント |
|
|
MySQLクライアント |
|
|
Oracleクライアント |
|
|
PostgreSQLクライアント |
|
|
RabbitMQクライアント |
|
|
Redisクライアント |
|
|
ウェブクライアント |
|
Vert.x Mutiny APIの使用方法について詳しくは、 https://smallrye.io/smallrye-mutiny-vertx-bindings を参照してください。
Vert.xウェブクライアントの使用
このセクションでは、Quarkus REST(旧 RESTEasy Reactive)アプリケーションのコンテキストで Vert.x WebClient
を使用する例を示します。
上の表に示すように、次の依存関係をプロジェクトに追加します:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")
これで、コードの中で、 WebClient
のインスタンスを作成することが可能です。
package org.acme.vertx;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
@Path("/fruit-data")
public class ResourceUsingWebClient {
private final WebClient client;
@Inject
VertxResource(Vertx vertx) {
this.client = WebClient.create(vertx);
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/{name}")
public Uni<JsonObject> getFruitData(String name) {
return client.getAbs("https://.../api/fruit/" + name)
.send()
.onItem().transform(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
return new JsonObject()
.put("code", resp.statusCode())
.put("message", resp.bodyAsString());
}
});
}
}
このリソースは、 WebClient
を作成し、リクエストに応じて、このクライアントを使用してリモート HTTP API を呼び出します。結果に応じて、レスポンスは受信したまま転送されるか、エラーをラップしたJSONオブジェクトが作成されます。 WebClient
は非同期(かつノンブロッキング)で、エンドポイントからは Uni
が返されます。
このアプリケーションは、ネイティブ実行可能ファイルとしても実行できます。しかし、その前に、Quarkusに ssl を有効にするよう指示する必要があります(リモートAPIがHTTPSを使用している場合)。 src/main/resources/application.properties
を開き、以下を追加します。
quarkus.ssl.native=true
そして、ネイティブ実行可能ファイルを作成します。
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
Vert.x JSONの使用
Vert.xのAPIはしばしばJSONに依存しています。Vert.xは、JSONドキュメントを操作する2つの便利なクラスを提供しています: io.vertx.core.json.JsonObject
および io.vertx.core.json.JsonArray
。
JsonObject
は、オブジェクトをJSON表現にマッピングしたり、JSONドキュメントからオブジェクトを構築するために使用できます。
// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);
// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);
なお、これらの機能は、 quarkus-jackson
エクステンションで管理されているマッパーを使用しています。マッピングをカスタマイズするには Jacksonの設定 を参照してください。
JSONオブジェクトとJSON配列は、Quarkus HTTPエンドポイントのリクエストとレスポンスボディとしてサポートされています(従来のRESTEasyとQuarkus RESTを使用)。 これらのエンドポイントを考えてみましょう:
package org.acme.vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {
@GET
@Path("{name}/object")
public JsonObject jsonObject(String name) {
return new JsonObject().put("Hello", name);
}
@GET
@Path("{name}/array")
public JsonArray jsonArray(String name) {
return new JsonArray().add("Hello").add(name);
}
}
{"Hello":"Quarkus"}
["Hello","Quarkus"]
これは、JSONコンテンツがリクエストボディである場合や、 Uni
, Multi
, CompletionStage
, Publisher
で包まれている場合にも同様に機能します。
Verticlesの使用
Verticles は、 _Vert.x が提供する「シンプルでスケーラブルな、アクターのようなデプロイ・並行性モデル」です。このモデルは、厳密なアクターモデルの実装を主張するものではありませんが、特に並行性、スケーリング、デプロイに関する類似性を共有しています。このモデルを使用するには、バーティクルを書いて デプロイ し、イベントバスにメッセージを送信することで通信します。
Quarkusで verticles をデプロイすることができます。次をサポートしています:
-
生の verticle -
io.vertx.core.AbstractVerticle
を拡張するJavaクラス -
Mutiny verticle -
io.smallrye.mutiny.vertx.core.AbstractVerticle
を拡張するJavaクラス
Verticleのデプロイ
verticleをデプロイするには、deployVerticle
メソッドを使用します。
@Inject Vertx vertx;
// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });
Vert.x の Mutiny 版を使用する場合、 deployVerticle
メソッドは Uni
を返すため、実際のデプロイメントを行うにはサブスクリプションをトリガーする必要があることに注意してください。
アプリケーションの初期化時にVerticleをデプロイする方法については、次の例で説明します。 |
@ApplicationScopedのBeanをVerticleとして使用する
一般的に、Vert.xのバーティクルはCDI Beanではありません。そのため、依存性注入は使用できません。しかし、QuarkusではVerticleをBeanとしてデプロイすることができます。この場合、CDI(QuarkusではArc)がインスタンスの作成を担当することに注意してください。
次のスニペットはその例です。
package io.quarkus.vertx.verticles;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {
@ConfigProperty(name = "address") String address;
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer(address)
.handler(m -> m.replyAndForget("hello"))
.completionHandler();
}
}
vertx
のインスタンスを注入する必要はなく、代わりに AbstractVerticle
の protected フィールドを利用します。
そして、Verticleインスタンスをデプロイします。
package io.quarkus.vertx.verticles;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@ApplicationScoped
public class VerticleDeployer {
public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
露出しているすべての AbstractVerticle
をデプロイしたい場合は、次のようにします。
public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
for (AbstractVerticle verticle : verticles) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
複数のvirticleのインスタンスの作成
@ApplicationScoped
を使用する場合、Verticleのインスタンスは1つになります。複数のVerticleのインスタンスを持つことは、それらの間で負荷を共有するのに役立ちます。各々のインスタンスは、異なるI/Oスレッド(Vert.xイベントループ)に関連付けられます。
Verticleの複数のインスタンスをデプロイするには、 @ApplicationScoped
の代わりに @Dependent
スコープを使用します。
package org.acme.verticle;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
@Dependent
public class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer("address")
.handler(m -> m.reply("Hello from " + this))
.completionHandler();
}
}
そして、verticleを次のようにデプロイします:
package org.acme.verticle;
import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
@ApplicationScoped
public class MyApp {
void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
vertx
.deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
.await().indefinitely();
}
}
init
メソッドは、 Instance<MyVerticle>
を受け取ります。そして、 deployVerticle
メソッドにサプライヤーを渡します。サプライヤーは get()
メソッドを呼び出しているだけです。 @Dependent
スコープのおかげで、呼び出すたびに新しいインスタンスが返されます。最後に、希望する数のインスタンスを DeploymentOptions
に渡します。例えば、前の例では2つです。サプライヤーを2回呼び出し、Verticleの2つのインスタンスを作成します。
Event Busの使用
Vert.xには、Quarkusアプリケーションから使用できる イベントバスが組み込まれています。そのため、アプリケーションコンポーネント(CDI Bean、リソース…)は、非同期イベントを使用して相互に作用することができ、疎結合を促進します。
イベントバスでは、 仮想アドレス に メッセージ を送信します。イベントバスには、3種類の配送メカニズムが用意されています。
-
point-to-point - メッセージを送信すると、1人の消費者がそれを受信します。複数の消費者がそのアドレスを聞く場合は、ラウンドロビンが適用されます。
-
publish/subscribe - メッセージを発行し、そのアドレスを聞いているすべてのコンシューマーがメッセージを受信します。
-
request/reply - メッセージを送信し、応答を期待します。受信者は非同期的にメッセージに応答することができます。
これらの配信メカニズムはすべてノンブロッキングであり、リアクティブなアプリケーションを構築するための基本的な要素の一つとなっています。
イベントの消費
Vert.x APIを使用してコンシューマーを登録することができますが、Quarkusには宣言型のサポートがあります。イベントを消費するには、 io.quarkus.vertx.ConsumeEvent
アノテーションを使用します。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | 設定されていない場合、アドレスはBeanの完全修飾名となります。例えば、このスニペットでは、 org.acme.vertx.GreetingService となります。 |
2 | メソッドのパラメータはメッセージボディです。メソッドが 何か を返す場合は、それがメッセージのレスポンスとなります。 |
アドレスの設定
@ConsumeEvent
アノテーションでアドレスを設定することができます。
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | greeting アドレスに送信されたメッセージを受信します。 |
The address value can be a property expression.
In this case, the configured value is used instead: @ConsumeEvent("${my.consumer.address}")
.
Additionally, the property expression can specify a default value: @ConsumeEvent("${my.consumer.address:defaultAddress}")
.
@ConsumeEvent("${my.consumer.address}") (1)
public String consume(String name) {
return name.toLowerCase();
}
1 | Receive the messages sent to the address configured with the my.consumer.address key. |
If no config property with the specified key exists and no default value is set then the application startup fails. |
イベントの非同期処理
これまでの例では、同期処理を行っています。 io.smallrye.mutiny.Uni
または java.util.concurrent.CompletionStage
を返却することで、非同期処理も可能です。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
前の例はMutinyリアクティブ型を使用しています。Mutinyに慣れていない方は、 Mutiny - 直感的なリアクティブプログラミングライブラリをご覧ください。 |
イベントのブロッキング処理
デフォルトでは、イベントを消費するコードは、I/Oスレッドで呼び出されるため、 ノンブロッキング でなければなりません。処理がブロッキングの場合は、 @io.smallrye.common.annotation.Blocking
アノテーションを使用してください。
@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
// Something blocking
}
あるいは、 @ConsumeEvent
アノテーションの blocking
属性を使用することもできます。
@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
// Something blocking
}
@Blocking
を使用する場合、 @ConsumeEvent
の blocking
属性の値は無視されます。
イベントへの返信
@ConsumeEvent
でアノテーションされたメソッドの 戻り 値は、受信したメッセージへの応答に使用されます。たとえば、次のスニペットでは、返された String
が応答となります。
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
また、 Uni<T>
や CompletionStage<T>
を返すことで、非同期応答を扱うこともできます:
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
コンテキスト伝搬エクステンションを使えば、
|
ファイヤー・アンド・フォーゲット・インタラクションの実装
受信したメッセージに返信する必要はありません。通常、 fire and forget のインタラクションでは、メッセージは消費され、送信者はそのことを知る必要はありません。このパターンを実装するために、コンシューマー・メソッドは void
を返します。
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
(イベントの代わりに)メッセージを消費
ペイロード を直接使用する前の例とは異なり、 Message
を直接使用することもできます。
@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
System.out.println(msg.address());
System.out.println(msg.body());
}
失敗のハンドリング
@ConsumeEvent
でアノテーションされたメソッドが例外を発生させた場合、
-
返信ハンドラが設定されている場合、失敗はコード
ConsumeEvent#FAILURE_CODE
と例外メッセージを含むio.vertx.core.eventbus.ReplyException
を通じて送信者に伝えられます。 -
リプライ・ハンドラが設定されていない場合、例外は再スローされ(必要であれば
RuntimeException
でラップされる)、デフォルトの例外ハンドラ、 すなわちio.vertx.core.Vertx#exceptionHandler()
で処理されます。
メッセージの送信
メッセージの送信と公開にはVert.x Event busを使用します。
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
1 | Event busの注入 |
2 | アドレス greeting にメッセージを送信します。メッセージのペイロードは name です |
EventBus
オブジェクトは、以下のメソッドを提供します。
-
send
特定のアドレスへのメッセージの送信 - 一人のコンシューマーがメッセージを受信する。 -
publish
特定のアドレスへのメッセージの送信 - 全てのコンシューマーがメッセージを受け取る。 -
request
メッセージを送信し、返信を期待する
// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
仮想スレッドのイベント処理
@ConsumeEvent
でアノテーションされたメソッドが @RunOnVirtualThread
でもアノテーションされている場合、
メソッドは仮想スレッド上で呼び出されます。
各イベントは異なる仮想スレッド上で起動されます。
この機能を使うには以下が必要です:
-
Javaランタイムが仮想スレッドをサポートしていること。
-
メソッドがブロッキング・シグネチャを使っていること。
2つ目のポイントは、オブジェクトまたは void
を返すメソッドだけが @RunOnVirtualThread
を使えるということです。
Uni
や CompletionStage
を返すメソッドは、仮想スレッドでは実行 できません 。
詳しくは バーチャル・スレッド・ガイド をお読みください。
コーデックの使用
The https://vertx.io/docs/vertx-core/java/event_bus[Vert.x Event Bus] uses https://vertx.io/docs/vertx-core/java/message_codecs[codecs] to _serialize and deserialize message objects.
Quarkus provides a default codec for local delivery.
This codec is automatically used for return types and message body parameters of local consumers, i.e. methods annotated with @ConsumeEvent
whete ConsumeEvent#local() == true
(which is the default).
So that you can exchange the message objects as follows:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
特定のコーデックを使用したい場合は、両サイドで明示的に設定する必要があります。
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 | メッセージの送信に使用するコーデックの名前を設定 |
2 | メッセージの受信に使用するコーデックの設定 |
HTTPとEvent Busの組み合わせ
グリーティングHTTPエンドポイントに戻りましょう。非同期メッセージパッシングを使用して、別のBeanへの呼出に委譲してみましょう。これは、request/replyディスパッチメカニズムを使用しています。Jakarta RESTエンドポイント内でビジネスロジックを実装する代わりに、メッセージを送信しています。別の Bean がこのメッセージを消費し、応答は reply メカニズムを使用して送信されます。
HTTPエンドポイントクラスでは、イベントバスを注入し、 request
メソッドを使用して、イベントバスにメッセージを送信し、応答を期待します。
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/bus")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
1 | greeting アドレスに name を送信し、レスポンスを要求します。 |
2 | レスポンスを受け取ったら、ボディを抽出してユーザーに送信 |
the HTTP method returns a Uni .
If you are using Quarkus REST, Uni support is built-in.
If you are using classic RESTEasy, you need to add the quarkus resteasy-mutiny extension to your project.
|
greeting
のアドレスをリッスンするコンシューマーが必要です。このコンシューマーは、同じクラスでも、次のような別のBeanでも構いません。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
このBeanは、名前を受け取り、グリーティングメッセージを返します。
これにより、 /bus/quarkus
のすべてのHTTPリクエストは、イベントバスにメッセージを送信し、応答を待ち、その応答が到着すると、HTTPレスポンスを書き込みます。
Hello Quarkus
より理解しやすくするために、HTTP リクエスト/レスポンスがどのように処理されたかを詳しく見てみましょう。
-
リクエストは、
greeting
メソッドで受信されます。 -
name を含むメッセージがイベントバスに送信される
-
別のBeanがこのメッセージを受信し、応答を計算します
-
この応答は、返信メカニズムを使って送り返されます
-
送信者が返信を受信すると、その内容がHTTPレスポンスに書き込まれます
SockJSによるブラウザとの双方向通信
Vert.xが提供するSockJSブリッジは、ブラウザアプリケーションとQuarkusアプリケーションがイベントバスを使って通信できるようにします。双方を接続します。そのため、双方が相手側で受信したメッセージを送信することができます。3つの配信メカニズムをサポートしています。
SockJSは、Quarkusアプリケーションとブラウザの間の通信チャネルをネゴシエートします。WebSocketがサポートされている場合はそれを使用し、そうでない場合はSSEや長いポーリングなどにダウングレードします。
SockJSを使用するためには、ブリッジの設定、特に通信に使用されるアドレスの設定が必要です。
package org.acme;
import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;
@ApplicationScoped
public class SockJsExample {
@Inject
Vertx vertx;
public void init(@Observes Router router) {
SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
Router bridge = sockJSHandler.bridge(new SockJSBridgeOptions()
.addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
router.route("/eventbus/*").subRouter(bridge);
AtomicInteger counter = new AtomicInteger();
vertx.setPeriodic(1000,
ignored -> vertx.eventBus().publish("ticks", counter.getAndIncrement()));
}
}
このコードは、ticks
アドレスをターゲットとするすべてのメッセージを接続されたブラウザに送信するように SockJS ブリッジを設定します。 設定に関するより詳細な説明は、 Vert.x SockJS Bridge ドキュメント に記載されています。
ブラウザは、メッセージを消費するために、 vertx-eventbus
JavaScriptライブラリを使用する必要があります。
<!doctype html>
<html>
<head>
<meta charset="utf-8"/>
<title>SockJS example - Quarkus</title>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"
integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
<script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>
<h1>SockJS Examples</h1>
<p><strong>Last Tick:</strong> <span id="tick"></span></p>
</body>
<script>
var eb = new EventBus('/eventbus');
eb.onopen = function () {
eb.registerHandler('ticks', function (error, message) {
$("#tick").html(message.body);
});
}
</script>
</html>
ネイティブ・トランスポートの使用
ネイティブ・トランスポートは、ネイティブ実行可能ファイルではサポートされていません。 |
io_uring を使用するには、 io_uringを使用 のセクションを参照してください。
|
Vert.xでは、 Nettyのネイティブトランスポート を使用することができ、特定のプラットフォームでパフォーマンスが向上します。 これを有効にするには、プラットフォームに適した依存関係を含める必要があります。 通常、アプリケーションをプラットフォームに依存しないようにするためには、両方用意するのがよいでしょう。 Netty は賢いので、未対応のプラットフォームには全く依存しないなど、正しい方を使用します:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")
implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")
また、Vert.xでネイティブトランスポートを使用するように明示的に設定する必要があります。 application.properties
に次を追加します。
quarkus.vertx.prefer-native-transport=true
あるいは、 application.yml
で次のように設定します。
quarkus:
vertx:
prefer-native-transport: true
順調にいけばquarkusは以下のようにログ出力します。
[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true
Vert.x コンテキスト対応スケジューラの使用
Mutiny オペレータの中には、エクゼキュータ・スレッドプール上で作業をスケジュー ルする必要があるものがあります。
その良い例が .onItem().delayIt().by(Duration.ofMillis(10)
で、排出を遅らせるためにそのようなエクゼキュータを必要とします。
デフォルトのエクゼキュータは、 io.smallrye.mutiny.infrastructure.Infrastructure
によって返却され、Quarkusによってすでに設定、管理されています。
That being said, there are cases where you need to make sure that an operation is run on a Vert.x (duplicated) context and not just on any random thread.
The io.smallrye.mutiny.vertx.core.ContextAwareScheduler
interface offers an API to obtain context-aware schedulers.
Such a scheduler is configured with:
-
a delegate
ScheduledExecutorService
of your choice (hint: you can reuseInfrastructure.getDefaultWorkerPool()
), and -
a context fetching strategy among:
-
an explicit
Context
, or -
calling
Vertx::getOrCreateContext()
either on the current thread or later when the scheduling request happens, or -
calling
Vertx::currentContext()
, which fails if the current thread is not a Vert.x thread.
-
Here is a sample where ContextAwareScheduler
is used:
class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
vertx.getOrCreateContext().put("foo", "bar");
var delegate = Infrastructure.getDefaultWorkerPool();
var scheduler = ContextAwareScheduler.delegatingTo(delegate)
.withCurrentContext();
return Uni.createFrom().voidItem()
.onItem().delayIt().onExecutor(scheduler).by(Duration.ofMillis(10))
.onItem().invoke(() -> {
// Prints "bar"
var ctx = vertx.getOrCreateContext();
System.out.println(ctx.get("foo"));
});
}
}
In this example a scheduler is created by capturing the context of the Vert.x event-loop that calls asyncStart()
.
The delayIt
operator uses that scheduler, and we can check that the context that we get in invoke
is a Vert.x duplicated context where the data for key "foo"
has been propagated.
Unixドメインソケットの使用
Unixドメインソケットでリスニングすることにより、quarkusサービスへの接続が同じホストから確立されている場合、TCPのオーバーヘッドを省略することができます。これは、サービスへのアクセスがプロキシを経由する場合に起こります。Envoyのようなプロキシを使ってサービスメッシュを設定している場合によく見られます。
これは、 [native-transport] をサポートするプラットフォームでのみ動作します。 |
適切な ネイティブ・トランスポートの使用 を有効にし、以下の環境プロパティを設定します。
quarkus.http.domain-socket=/var/run/io.quarkus.app.socket quarkus.http.domain-socket-enabled=true quarkus.vertx.prefer-native-transport=true
これだけでは、デフォルトで 0.0.0.0:8080
で開かれる tcp ソケットを無効にすることはできません。これは次のように明示的に無効にすることができます。
quarkus.http.host-enabled=false
これらのプロパティは、Javaの -D
コマンドライン・パラメータまたは application.properties
で設定できます。
ネイティブのトランスポート依存関係を追加することを忘れないでください。 詳細については、ネイティブ・トランスポートの使用 を参照してください。 |
アプリケーションがソケットに書き込むための適切なアクセス許可を持っていることを確認してください。 |
io_uringを使用
io_uring はネイティブ実行可能ファイルではサポートされていません。
|
io_uring サポートは実験的です。
|
io_uring
は、非同期にデータを送受信できる Linux カーネルインターフェースです。
ファイルI/OとネットワークI/Oの両方に統一されたセマンティクスを提供します。
もともとはブロックデバイスとファイルをターゲットに設計されましたが、その後、ネットワークソケットのようなものと連携できるようになりました。
ネットワークI/O単体ではそこそこの性能上の利点を発揮し、ファイルI/OとネットワークI/Oが混在するアプリケーション負荷ではより大きな効果を発揮する可能性があります。
io_uring
についてもっと知るには、以下のリンクをお勧めします:
-
Why you should use io_uring for network I/O: The main benefit of io_uring for network I/O is a modern asynchronous API that is straightforward to use and provides unified semantics for file and network I/O. A potential performance benefit of io_uring for network I/O is reducing the number of syscalls. This could provide the biggest benefit for high volumes of small operations where the overhead of system calls can be significant.
-
The Backend Revolution and Why io_uring Is So Important: The io_uring API uses two ring buffers for communication between application and kernel (hence the API name) and designed in a way that enables natural batching of requests and responses. Besides, it provides a way to submit multiple requests in one system call, which can reduce overhead.
-
What exactly is io_uring?: io_uring is a Linux kernel interface to efficiently allow you to send and receive data asynchronously. It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets.
To use io_uring
, you need to add two dependencies to your project and enable native transport.
First add the following dependencies to your project:
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>0.0.21.Final</version> <!-- Update this version (https://github.com/netty/netty-incubator-transport-io_uring/tags) -->
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-io_uring-incubator</artifactId>
</dependency>
// Update the io_uring version by picking the latest from https://github.com/netty/netty-incubator-transport-io_uring/tags
implementation("io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.21.Final")
implementation("io.vertx:vertx-io_uring-incubator")
Then, in the application.properties
, add:
quarkus.vertx.prefer-native-transport=true
Linuxマシンでio_uringは使えますか?
Linuxマシンで
もし上記のように表示されたら、 |
トラブルシューティング
|
ドメイン・ソケットはio_uringではまだサポートされていません。 |
Vert.xの非同期ファイルシステムAPIは、まだio_uringを使用していません。 |
読み取り専用環境へのデプロイ
ファイルシステムが読み取り専用の環境では、次のようなエラーが発生することがあります。
java.lang.IllegalStateException: Failed to create cache dir
Assuming /tmp/
is writable this can be fixed by setting the vertx.cacheDirBase
property to point to a directory in /tmp/
for instance in Kubernetes by creating an environment variable JAVA_OPTS
with the value -Dvertx.cacheDirBase=/tmp/vertx
, or setting the quarkus.vertx.cache-directory
property in application.properties
:
quarkus.vertx.cache-directory=/tmp/vertx
Vert.x設定のカスタマイズ
マネージド Vert.x インスタンスの設定は、application.properties
ファイルを使用して提供できますが、special bean を使用することもできます。 io.quarkus.vertx.VertxOptionsCustomizer
インターフェイスを公開する CDI Bean を使用して、Vert.x 設定をカスタマイズできます。 たとえば、次のカスタマイザは tmp
ベース ディレクトリを変更します。
@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {
@Override
public void accept(VertxOptions options) {
options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
}
}
customizer Beanは、(アプリケーションの設定に由来する) VertxOptions
を受け取り、それを修正することができます。