Vert.x リファレンスガイド
Vert.xは、リアクティブなアプリケーションを構築するためのツールキットです。 Quarkus Reactive Architectureに記載されているように、QuarkusではVert.xを下地に使用しています。
このガイドは、 QuarkusアプリケーションからEclipse Vert.x APIの使用 ガイドの姉妹編です。Quarkusで使用されるVert.xインスタンスの使用方法や設定について、より詳細に説明しています。
Vert.xインスタンスへのアクセス
マネージドVert.xインスタンスにアクセスするには、 quarkus-vertx
エクステンションをプロジェクトに追加してください。この依存関係がすでにインストールされている可能性があることに注意してください(推移的依存関係として)。
このエクステンションでは、フィールドまたはコンストラクタ・インジェクションのいずれかを使用して、Vert.xのマネージドインスタンスを取得できます。
@ApplicationScoped
public class MyBean {
// Field injection
@Inject Vertx vertx;
// Constructor injection
MyBean(Vertx vertx) {
// ...
}
}
以下のどちらかを注入することができます。
-
素の Vert.x APIを公開する
io.vertx.core.Vertx
インスタンス -
Mutiny APIを公開する
io.vertx.mutiny.core.Vertx
インスタンス
Mutiny版は、Quarkusが提供する他のリアクティブAPIと統合されているため、Mutiny版の使用をお勧めします。
Mutiny
Mutinyに慣れていない方は、 Mutiny - 直感的なリアクティブプログラミングライブラリをご覧ください。 |
Vert.x Mutiny版に関するドキュメントは https://smallrye.io/smallrye-mutiny-vertx-bindings にあります。
Vert.xインスタンスの設定
application.properties
ファイルから Vert.x インスタンスを設定することができます。次の表は、サポートされているプロパティの一覧です。
ビルド時に固定される設定プロパティ - その他の設定プロパティはランタイムでオーバーライド可能です。
型 |
デフォルト |
|
---|---|---|
Enables or disables the Vert.x cache. Environment variable: |
boolean |
|
Enables or disabled the Vert.x classpath resource resolver. Environment variable: |
boolean |
|
The number of event loops. By default, it matches the number of CPUs detected on the system. Environment variable: |
int |
|
The maximum amount of time the event loop can be blocked. Environment variable: |
|
|
The amount of time before a warning is displayed if the event loop is blocked. Environment variable: |
|
|
The size of the worker thread pool. Environment variable: |
int |
|
The maximum amount of time the worker thread can be blocked. Environment variable: |
|
|
The size of the internal thread pool (used for the file system). Environment variable: |
int |
|
The queue size. For most applications this should be unbounded Environment variable: |
int |
|
The executor growth resistance. A resistance factor applied after the core pool is full; values applied here will cause that fraction of submissions to create new threads when no idle thread is available. A value of Environment variable: |
float |
|
The amount of time a thread will stay alive with no work. Environment variable: |
|
|
Prefill thread pool when creating a new Executor. When io.vertx.core.spi.ExecutorServiceFactory.createExecutor is called, initialise with the number of defined threads at startup Environment variable: |
boolean |
|
Enables the async DNS resolver. Environment variable: |
boolean |
|
PEM Key/cert config is disabled by default. Environment variable: |
boolean |
|
Comma-separated list of the path to the key files (Pem format). Environment variable: |
list of string |
|
Comma-separated list of the path to the certificate files (Pem format). Environment variable: |
list of string |
|
JKS config is disabled by default. Environment variable: |
boolean |
|
Path of the key file (JKS format). Environment variable: |
string |
|
Password of the key file. Environment variable: |
string |
|
PFX config is disabled by default. Environment variable: |
boolean |
|
Path to the key file (PFX format). Environment variable: |
string |
|
Password of the key. Environment variable: |
string |
|
PEM Trust config is disabled by default. Environment variable: |
boolean |
|
Comma-separated list of the trust certificate files (Pem format). Environment variable: |
list of string |
|
JKS config is disabled by default. Environment variable: |
boolean |
|
Path of the key file (JKS format). Environment variable: |
string |
|
Password of the key file. Environment variable: |
string |
|
PFX config is disabled by default. Environment variable: |
boolean |
|
Path to the key file (PFX format). Environment variable: |
string |
|
Password of the key. Environment variable: |
string |
|
The accept backlog. Environment variable: |
int |
|
The client authentication. Environment variable: |
string |
|
The connect timeout. Environment variable: |
|
|
The idle timeout in milliseconds. Environment variable: |
||
The receive buffer size. Environment variable: |
int |
|
The number of reconnection attempts. Environment variable: |
int |
|
The reconnection interval in milliseconds. Environment variable: |
|
|
Whether to reuse the address. Environment variable: |
boolean |
|
Whether to reuse the port. Environment variable: |
boolean |
|
The send buffer size. Environment variable: |
int |
|
The so linger. Environment variable: |
int |
|
Enables or Disabled SSL. Environment variable: |
boolean |
|
Whether to keep the TCP connection opened (keep-alive). Environment variable: |
boolean |
|
Configure the TCP no delay. Environment variable: |
boolean |
|
Configure the traffic class. Environment variable: |
int |
|
Enables or disables the trust all parameter. Environment variable: |
boolean |
|
The host name. Environment variable: |
string |
|
The port. Environment variable: |
int |
|
The public host name. Environment variable: |
string |
|
The public port. Environment variable: |
int |
|
Enables or disables the clustering. Environment variable: |
boolean |
|
The ping interval. Environment variable: |
|
|
The ping reply interval. Environment variable: |
|
|
The maximum amount of time in seconds that a successfully resolved address will be cached. If not set explicitly, resolved addresses may be cached forever. Environment variable: |
int |
|
The minimum amount of time in seconds that a successfully resolved address will be cached. Environment variable: |
int |
|
The amount of time in seconds that an unsuccessful attempt to resolve an address will be cached. Environment variable: |
int |
|
The maximum number of queries to be sent during a resolution. Environment variable: |
int |
|
The duration after which a DNS query is considered to be failed. Environment variable: |
|
|
Enable or disable native transport Environment variable: |
boolean |
|
期間フォーマットについて
期間のフォーマットは標準の 数値で始まる期間の値を指定することもできます。この場合、値が数値のみで構成されている場合、コンバーターは値を秒として扱います。そうでない場合は、 |
Vert.xクライアントの使用
Vert.xコアに加えて、ほとんどのVert.xエコシステムライブラリを使用することができます。いくつかのQuarkusエクステンションは、すでにVert.xライブラリをラップしています。
利用可能なAPI
次の表は、Vert.xエコシステムで最も使用されているライブラリの一覧です。これらのAPIにアクセスするには、指定されたエクステンションまたは依存関係をプロジェクトに追加します。使用方法については、関連ドキュメントを参照してください。
API |
エクステンションか依存関係 |
ドキュメント |
AMQPクライアント |
|
|
サーキットブレーカー |
|
|
Consul クライアント |
|
|
DB2クライアント |
|
|
Kafkaクライアント |
|
|
メールクライアント |
|
|
MQTTクライアント |
|
ガイドはまだありません |
MS SQLクライアント |
|
|
MySQLクライアント |
|
|
Oracleクライアント |
|
|
PostgreSQLクライアント |
|
|
RabbitMQクライアント |
|
|
Redisクライアント |
|
|
ウェブクライアント |
|
Vert.x Mutiny APIの使用方法について詳しくは、 https://smallrye.io/smallrye-mutiny-vertx-bindings を参照してください。
使用例
このセクションでは、RESTEasy Reactiveアプリケーションの文脈でVert.x WebClient
を使った例を紹介します。上の表にあるように、プロジェクトに以下の依存関係を追加します。
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")
さて、コードの中で、 WebClient
のインスタンスを作成することが可能です。
package org.acme.vertx;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
@Path("/fruit-data")
public class ResourceUsingWebClient {
private final WebClient client;
@Inject
VertxResource(Vertx vertx) {
this.client = WebClient.create(vertx);
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/{name}")
public Uni<JsonObject> getFruitData(String name) {
return client.getAbs("https://.../api/fruit/" + name)
.send()
.onItem().transform(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
return new JsonObject()
.put("code", resp.statusCode())
.put("message", resp.bodyAsString());
}
});
}
}
このリソースは、 WebClient
を作成し、リクエストに応じて、このクライアントを使用してリモート HTTP API を呼び出します。結果に応じて、レスポンスは受信したまま転送されるか、エラーをラップしたJSONオブジェクトが作成されます。 WebClient
は非同期(かつノンブロッキング)で、エンドポイントからは Uni
が返されます。
このアプリケーションは、ネイティブ実行可能ファイルとしても実行できます。しかし、その前に、Quarkusに ssl を有効にするよう指示する必要があります(リモートAPIがHTTPSを使用している場合)。 src/main/resources/application.properties
を開き、以下を追加します。
quarkus.ssl.native=true
そして、ネイティブ実行可能ファイルを作成します。
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.package.type=native
Vert.x JSONの使用
Vert.xのAPIはしばしばJSONに依存しています。Vert.xは、JSONドキュメントを操作する2つの便利なクラスを提供しています: io.vertx.core.json.JsonObject
および io.vertx.core.json.JsonArray
。
JsonObject
は、オブジェクトをJSON表現にマッピングしたり、JSONドキュメントからオブジェクトを構築するために使用できます。
// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);
// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);
なお、これらの機能は、 quarkus-jackson
エクステンションで管理されているマッパーを使用しています。マッピングをカスタマイズするには Jacksonの設定 を参照してください。
JSON ObjectとJSON Arrayは、QuarkusのHTTPエンドポイントのリクエストとレスポンスボディとしてサポートされています(従来のRESTEasyとRESTEasy Reactiveを使用)。これらのエンドポイントについて考えてみましょう。
package org.acme.vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {
@GET
@Path("{name}/object")
public JsonObject jsonObject(String name) {
return new JsonObject().put("Hello", name);
}
@GET
@Path("{name}/array")
public JsonArray jsonArray(String name) {
return new JsonArray().add("Hello").add(name);
}
}
{"Hello":"Quarkus"}
["Hello","Quarkus"]
これは、JSONコンテンツがリクエストボディである場合や、 Uni
, Multi
, CompletionStage
, Publisher
で包まれている場合にも同様に機能します。
verticleの使用
Verticles は、 _Vert.x が提供する「シンプルでスケーラブルな、アクターのようなデプロイ・並行性モデル」です。このモデルは、厳密なアクターモデルの実装を主張するものではありませんが、特に並行性、スケーリング、デプロイに関する類似性を共有しています。このモデルを使用するには、バーティクルを書いて デプロイ し、イベントバスにメッセージを送信することで通信します。
Quarkusで verticles をデプロイすることができます。次をサポートしています:
-
生の verticle -
io.vertx.core.AbstractVerticle
を拡張するJavaクラス -
Mutiny verticle -
io.smallrye.mutiny.vertx.core.AbstractVerticle
を拡張するJavaクラス
verticleのデプロイ
verticleをデプロイするには、deployVerticle
メソッドを使用します。
@Inject Vertx vertx;
// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });
Vert.x の Mutiny 版を使用する場合、 deployVerticle
メソッドは Uni
を返すため、実際のデプロイメントを行うにはサブスクリプションをトリガーする必要があることに注意してください。
アプリケーションの初期化時にVerticleをデプロイする方法については、次の例で説明します。 |
Verticleとしての@ApplicationScoped Beansの使用
一般的に、Vert.xのバーティクルはCDI Beanではありません。そのため、依存性注入は使用できません。しかし、QuarkusではVerticleをBeanとしてデプロイすることができます。この場合、CDI(QuarkusではArc)がインスタンスの作成を担当することに注意してください。
次のスニペットはその例です。
package io.quarkus.vertx.verticles;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {
@ConfigProperty(name = "address") String address;
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer(address)
.handler(m -> m.replyAndForget("hello"))
.completionHandler();
}
}
vertx
のインスタンスを注入する必要はなく、代わりに AbstractVerticle
の protected フィールドを利用します。
そして、Verticleインスタンスをデプロイします。
package io.quarkus.vertx.verticles;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
@ApplicationScoped
public class VerticleDeployer {
public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
露出しているすべての AbstractVerticle
をデプロイしたい場合は、次のようにします。
public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
for (AbstractVerticle verticle : verticles) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
そして、Verticleインスタンスをデプロイします。
@ApplicationScoped
を使用する場合、Verticleのインスタンスは1つになります。複数のVerticleのインスタンスを持つことは、それらの間で負荷を共有するのに役立ちます。各々のインスタンスは、異なるI/Oスレッド(Vert.xイベントループ)に関連付けられます。
Verticleの複数のインスタンスをデプロイするには、 @ApplicationScoped
の代わりに @Dependent
スコープを使用します。
package org.acme.verticle;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
@Dependent
public class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer("address")
.handler(m -> m.reply("Hello from " + this))
.completionHandler();
}
}
そして、verticleをデプロイします。
package org.acme.verticle;
import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
@ApplicationScoped
public class MyApp {
void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
vertx
.deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
.await().indefinitely();
}
}
init
メソッドは、 Instance<MyVerticle>
を受け取ります。そして、 deployVerticle
メソッドにサプライヤーを渡します。サプライヤーは get()
メソッドを呼び出しているだけです。 @Dependent
スコープのおかげで、呼び出すたびに新しいインスタンスが返されます。最後に、希望する数のインスタンスを DeploymentOptions
に渡します。例えば、前の例では2つです。サプライヤーを2回呼び出し、Verticleの2つのインスタンスを作成します。
イベントバスの利用
Vert.xには、Quarkusアプリケーションから使用できる イベントバスが組み込まれています。そのため、アプリケーションコンポーネント(CDI Bean、リソース…)は、非同期イベントを使用して相互に作用することができ、疎結合を促進します。
イベントバスでは、 仮想アドレス に メッセージ を送信します。イベントバスには、3種類の配送メカニズムが用意されています。
-
point-to-point - メッセージを送信すると、1人の消費者がそれを受信します。複数の消費者がそのアドレスを聞く場合は、ラウンドロビンが適用されます。
-
publish/subscribe - メッセージを発行し、そのアドレスを聞いているすべてのコンシューマーがメッセージを受信します。
-
request/reply - メッセージを送信し、応答を期待します。受信者は非同期的にメッセージに応答することができます。
これらの配信メカニズムはすべてノンブロッキングであり、リアクティブなアプリケーションを構築するための基本的な要素の一つとなっています。
イベントの消費
Vert.x APIを使用してコンシューマーを登録することができますが、Quarkusには宣言型のサポートがあります。イベントを消費するには、 io.quarkus.vertx.ConsumeEvent
アノテーションを使用します。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | 設定されていない場合、アドレスはBeanの完全修飾名となります。例えば、このスニペットでは、 org.acme.vertx.GreetingService となります。 |
2 | メソッドのパラメータはメッセージボディです。メソッドが 何か を返す場合は、それがメッセージのレスポンスとなります。 |
アドレスを設定
@ConsumeEvent
アノテーションでアドレスを設定することができます。
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | greeting アドレスに送信されたメッセージを受信します。 |
非同期処理
これまでの例では、同期処理を行っています。 io.smallrye.mutiny.Uni
または java.util.concurrent.CompletionStage
を返却することで、非同期処理も可能です。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public CompletionStage<String> consume(String name) {
// return a CompletionStage completed when the processing is finished.
// You can also fail the CompletionStage explicitly
}
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
前の例はMutinyリアクティブ型を使用しています。Mutinyに慣れていない方は、 Mutiny - 直感的なリアクティブプログラミングライブラリをご覧ください。 |
ブロッキング処理
デフォルトでは、イベントを消費するコードは、I/Oスレッドで呼び出されるため、 ノンブロッキング でなければなりません。処理がブロッキングの場合は、 @io.smallrye.common.annotation.Blocking
アノテーションを使用してください。
@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
// Something blocking
}
あるいは、 @ConsumeEvent
アノテーションの blocking
属性を使用することもできます。
@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
// Something blocking
}
@Blocking
を使用する場合、 @ConsumeEvent
の blocking
属性の値は無視されます。
メッセージへの返信
@ConsumeEvent
でアノテーションされたメソッドの 戻り 値は、受信したメッセージへの応答に使用されます。たとえば、次のスニペットでは、返された String
が応答となります。
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
また、 Uni<T>
や CompletionStage<T>
を返すことで、非同期応答を扱うこともできます:
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
コンテキスト伝搬エクステンションを使えば、
|
fire and forget インタラクションの実装
受信したメッセージに返信する必要はありません。通常、 fire and forget のインタラクションでは、メッセージは消費され、送信者はそのことを知る必要はありません。このパターンを実装するために、コンシューマー・メソッドは void
を返します。
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
メッセージの処理
ペイロード を直接使用する前の例とは異なり、 Message
を直接使用することもできます。
@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
System.out.println(msg.address());
System.out.println(msg.body());
}
失敗のハンドリング
@ConsumeEvent
でアノテーションされたメソッドが例外を発生させた場合、
-
返信ハンドラが設定されている場合、失敗はコード
ConsumeEvent#FAILURE_CODE
と例外メッセージを含むio.vertx.core.eventbus.ReplyException
を通じて送信者に伝えられます。 -
応答ハンドラが設定されていない場合、例外は再スローされ(必要に応じて
RuntimeException
でラップされる)、デフォルトの例外ハンドラ、すなわちio.vertx.core.Vertx#exceptionHandler()
で処理することができます。
メッセージの送信
メッセージの送信と公開にはVert.x Event busを使用します。
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
1 | Event busの注入 |
2 | アドレス greeting にメッセージを送信します。メッセージのペイロードは name です |
EventBus
オブジェクトは、以下のメソッドを提供します。
-
send
特定のアドレスへのメッセージ - 一人のコンシューマーがメッセージを受信する。 -
publish
特定のアドレスにメッセージを送る - すべてのコンシューマーがメッセージを受け取る。 -
request
メッセージを送って返信を期待する
// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
コーデックの使用
Vert.xイベントバス では、コーデックを使用してオブジェクトの シリアライズ と デシリアライズ を行います。Quarkusでは、ローカル配信用のデフォルトのコーデックを提供しています。そのため、以下のようにオブジェクトを交換することができます。
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
特定のコーデックを使用したい場合は、両サイドで明示的に設定する必要があります。
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 | メッセージの送信に使用するコーデックの名前を設定 |
2 | メッセージの受信に使用するコーデックの設定 |
HTTPとイベントバスの融合
greeting HTTPエンドポイントに戻り、非同期メッセージパッシングを使用して呼び出しを別のBeanに委譲しましょう。 要求/応答ディスパッチメカニズムを使用します。 JAX-RSエンドポイント内にビジネスロジックを実装する代わりに、メッセージを送信しています。 別のBeanがこのメッセージを消費し、応答 メカニズムを使用して応答が送信されます。
HTTPエンドポイントクラスでは、イベントバスを注入し、 request
メソッドを使用して、イベントバスにメッセージを送信し、応答を期待します。
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/bus")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
1 | greeting アドレスに name を送信し、レスポンスを要求します。 |
2 | レスポンスを受け取ったら、ボディを抽出してユーザーに送信 |
HTTP メソッドは Uni を返します。RESTEasy Reactive を使用している場合は、 Uni のサポートが組み込まれています。 従来の RESTEasyを使用している場合は、 quarkus resteasy-mutiny エクステンションをプロジェクトに追加する必要があります。
|
greeting
のアドレスをリッスンするコンシューマーが必要です。このコンシューマーは、同じクラスでも、次のような別のBeanでも構いません。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
このBeanは、名前を受け取り、グリーティングメッセージを返します。
これにより、 /bus/quarkus
のすべてのHTTPリクエストは、イベントバスにメッセージを送信し、応答を待ち、その応答が到着すると、HTTPレスポンスを書き込みます。
Hello Quarkus
より理解しやすくするために、HTTP リクエスト/レスポンスがどのように処理されたかを詳しく見てみましょう。
-
リクエストは、
greeting
メソッドで受信されます。 -
name を含むメッセージがイベントバスに送信される
-
別のBeanがこのメッセージを受信し、応答を計算します
-
この応答は、返信メカニズムを使って送り返されます
-
送信者が返信を受信すると、その内容がHTTPレスポンスに書き込まれます
SockJSによるブラウザとの双方向通信
Vert.xが提供するSockJSブリッジは、ブラウザアプリケーションとQuarkusアプリケーションがイベントバスを使って通信できるようにします。双方を接続します。そのため、双方が相手側で受信したメッセージを送信することができます。3つの配信メカニズムをサポートしています。
SockJSは、Quarkusアプリケーションとブラウザの間の通信チャネルをネゴシエートします。WebSocketがサポートされている場合はそれを使用し、そうでない場合はSSEや長いポーリングなどにダウングレードします。
SockJSを使用するためには、ブリッジの設定、特に通信に使用されるアドレスの設定が必要です。
package org.acme.vertx;
import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;
@ApplicationScoped
public class SockJsExample {
@Inject
Vertx vertx;
public void init(@Observes Router router) {
SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
sockJSHandler.bridge(new SockJSBridgeOptions()
.addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
router.route("/eventbus/*").handler(sockJSHandler);
}
}
このコードは、ticks
アドレスをターゲットとするすべてのメッセージを接続されたブラウザに送信するように SockJS ブリッジを設定します。 設定に関するより詳細な説明は、 Vert.x SockJS Bridge ドキュメント に記載されています。
ブラウザは、メッセージを消費するために、 vertx-eventbus
JavaScriptライブラリを使用する必要があります。
<!doctype html>
<html>
<head>
<meta charset="utf-8"/>
<title>SockJS example - Quarkus</title>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"
integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
<script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>
<h1>SockJS Examples</h1>
<p><strong>Last Tick:</strong> <span id="tick"></span></p>
</body>
<script>
var eb = new EventBus('/eventbus');
eb.onopen = function () {
eb.registerHandler('ticks', function (error, message) {
$("#tick").html(message.body);
});
}
</script>
</html>
ネイティブ・トランスポート
GraalVMで作られたバイナリでは、ネイティブトランスポートはサポートされていません。 |
Vert.x は Nettyのネイティブトランスポート を使用することができます。これは特定のプラットフォームでパフォーマンスを向上させます。これらを有効にするには、お使いのプラットフォームに適切な依存関係を含める必要があります。通常、アプリケーションをプラットフォームに依存しないようにするために、両方を持つことは良い考えです。Nettyは賢いので、サポートされていないプラットフォームでは全く使用しないことも含めて、正しい方を使用します。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")
implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")
また、Vert.xでネイティブトランスポートを使用するように明示的に設定する必要があります。 application.properties
に次を追加します。
quarkus.vertx.prefer-native-transport=true
あるいは、 application.yml
で次のように設定します。
quarkus:
vertx:
prefer-native-transport: true
順調にいけばquarkusは以下のようにログ出力します。
[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true
Unixドメインソケットのリスニング
Unixドメインソケットでリスニングすることにより、quarkusサービスへの接続が同じホストから確立されている場合、TCPのオーバーヘッドを省略することができます。これは、サービスへのアクセスにEnvoyのようなプロキシを使ってサービスメッシュを設定している場合によく見られます。
これは、 ネイティブ・トランスポート をサポートしているプラットフォームでのみ動作します。 |
適切な ネイティブ・トランスポート を有効にし、以下の環境プロパティを設定します。
quarkus.http.domain-socket=/var/run/io.quarkus.app.socket quarkus.http.domain-socket-enabled=true quarkus.vertx.prefer-native-transport=true
これだけでは、デフォルトで 0.0.0.0:8080
で開かれる tcp ソケットを無効にすることはできません。これは次のように明示的に無効にすることができます。
quarkus.http.host-enabled=false
これらのプロパティは、Javaの -D
コマンドライン・パラメータまたは application.properties
で設定できます。
ネイティブのトランスポート依存関係を追加することを忘れないでください。 詳細については、ネイティブ・トランスポート を参照してください。 |
アプリケーションがソケットに書き込むための適切なアクセス許可を持っていることを確認してください。 |
読み取り専用のデプロイメント環境
ファイルシステムが読み取り専用の環境では、次のようなエラーが発生することがあります。
java.lang.IllegalStateException: Failed to create cache dir
/tmp/
が書き込み可能である場合、これは vertx.cacheDirBase
プロパティが /tmp/
のディレクトリを指すように設定することで修正できます。例えば、OpenShift では環境変数 JAVA_OPTS
を作成して -Dvertx.cacheDirBase=/tmp/vertx
という値を設定します。
Vert.xインスタンスの設定
マネージド Vert.x インスタンスの設定は、application.properties
ファイルを使用して提供できますが、special bean を使用することもできます。 io.quarkus.vertx.VertxOptionsCustomizer
インターフェイスを公開する CDI Bean を使用して、Vert.x 設定をカスタマイズできます。 たとえば、次のカスタマイザは tmp
ベース ディレクトリを変更します。
@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {
@Override
public void accept(VertxOptions options) {
options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
}
}
customizer Beanは、(アプリケーションの設定に由来する) VertxOptions
を受け取り、それを修正することができます。