イベントバスの利用
Quarkusでは、異なる Bean が非同期イベントを使用して相互作用することができるため、疎結合化が促進されます。メッセージは 仮想アドレス に送信されます。3種類の配信メカニズムを提供しています。
-
point-to-point - メッセージを送信し、1 つのコンシューマーがそのメッセージを受信します。複数のコンシューマーがアドレスで待ち受けている場合、ラウンドロビンが適用されます。
-
publish/subscribe - メッセージを発行し、アドレスで待ち受けているすべてのコンシューマーがメッセージを受信します。
-
request/reply - メッセージを送信し、応答を待ちます。受信者は非同期的にメッセージに応答することができます。
これらの配信メカニズムはすべてノンブロッキングであり、リアクティブアプリケーションを構築するための基本的な要素の1つを提供しています。
非同期メッセージパッシング機能は、Reactive Messagingではサポートされていないメッセージへの返信を可能にします。ただし、シングルイベント動作(ストリームなし)とローカルメッセージに限定されます。 |
インストール
このメカニズムは Vert.x EventBus を使用するため、この機能を使用するには vertx
エクステンションを有効にする必要があります。
新しいプロジェクトを作成する場合は、 extensions
パラメータを次のように設定します:
Windowsユーザーの場合:
-
cmdを使用する場合、(バックスラッシュ
\
を使用せず、すべてを同じ行に書かないでください)。 -
Powershellを使用する場合は、
-D
パラメータを二重引用符で囲んでください。例:"-DprojectArtifactId=vertx-quickstart"
すでに作成済みのプロジェクトがある場合は、 vertx
エクステンションを既存のQuarkusプロジェクトに add-extension
コマンドで追加することができます。
quarkus extension add vertx
./mvnw quarkus:add-extension -Dextensions='vertx'
./gradlew addExtension --extensions='vertx'
そうでない場合は、 pom.xml
ファイルの依存関係セクションに手動で追加することができます。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
implementation("io.quarkus:quarkus-vertx")
イベントの消費
イベントを消費するには、 io.quarkus.vertx.ConsumeEvent
アノテーションを使用します。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | 設定されていない場合、アドレスは Bean の完全修飾名となり、例えばこのスニペットでは org.acme.vertx.GreetingService となります。 |
2 | メソッドのパラメーターはメッセージボディです。メソッドが 何か を返す場合は、それがメッセージのレスポンスになります。 |
デフォルトでは、Vert.x イベントループで呼び出されるため、イベントをコンシュームするコードは ノンブロッキング でなければなりません。処理がブロッキングである場合は、
あるいは、メソッドに
|
io.smallrye.mutiny.Uni
または java.util.concurrent.CompletionStage
のどちらかを返すことで、非同期処理を行うことも可能です。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public CompletionStage<String> consume(String name) {
// return a CompletionStage completed when the processing is finished.
// You can also fail the CompletionStage explicitly
}
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
前の例はMutinyリアクティブ型を使用しています。Mutinyに慣れていない方は、 Mutiny - 直感的なリアクティブプログラミングライブラリをご覧ください。 |
アドレスの設定
@ConsumeEvent
アノテーションでアドレスを設定することができます。
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | greeting アドレスに送信されたメッセージを受信 |
応答
@ConsumeEvent
でアノテーションされたメソッドの 戻り値 が、着信メッセージに対するレスポンスとして使用されます。例えば、次のスニペットでは、 String
がレスポンスとして返されます。
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
また、 Uni<T>
や CompletionStage<T>
を返すことで、非同期レスポンスを扱うことも可能です。
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
Context Propagation エクステンションを利用することで、
あるいは、デフォルトの Quarkus ワーカープールを使用することもできます。
|
fire and forget (撃ち放し) のやりとりの実装
受信したメッセージに返信する必要はありません。通常、 fire and forget(撃ち放し) のやりとりでは、送信者はメッセージが消費されたことを知る必要はありません。これを実装するためには、コンシューマーメソッドは void
を返します。
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
メッセージの取り扱い
先述のように、この仕組みは Vert.x イベントバスをベースにしているため、直接 Message
を使うこともできます。
@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
System.out.println(msg.address());
System.out.println(msg.body());
}
失敗時の対応
@ConsumeEvent
がアノテートされたメソッドが例外を発生させた場合、以下のようになります。
-
返信ハンドラーが設定されている場合、失敗はコード
ConsumeEvent#FAILURE_CODE
と例外メッセージを含んだio.vertx.core.eventbus.ReplyException
を通して送信者に伝搬されます。 -
返答ハンドラーが設定されていない場合、例外は必要に応じて
RuntimeException
でラップされて再スローされ、デフォルトの例外ハンドラー、すなわちio.vertx.core.Vertx#exceptionHandler()
で処理されます。
メッセージの送信
さて、メッセージを受信する方法を見てきましたが、次は送信者 側 に切り替えましょう。メッセージの送信とパブリッシュは Vert.x イベントバスを使います。
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
1 | イベントバスの注入 |
2 | アドレス greeting に、ペイロード name を持つメッセージを送信します。 |
EventBus
オブジェクトは、以下のメソッドを提供します。
-
特定のアドレスにメッセージを
send
(送信) - 一つのコンシューマーがメッセージを受信する。 -
特定のアドレスにメッセージを
publish
(発行) - すべてのコンシューマーがメッセージを受信する。 -
メッセージを
send
(送信)し、返信を非同期に待つ -
メッセージを
send
(送信)し、返信を同期に待つ
// Case 1
bus.<String>requestAndForget("greeting", name);
// Case 2
bus.publish("greeting", name);
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
// Case 4
String response = bus.<String>requestAndAwait("greeting", name).body();
登場した要素の組み立て - HTTP とメッセージの橋渡し
グリーティングHTTPエンドポイントをもう一度修正し,非同期メッセージパッシングを使用して,呼び出しを別のBeanに委譲してみよう。これは、request/replyディスパッチ機構を使用しています。Jakarta RESTエンドポイント内でビジネスロジックを実装する代わりに、メッセージを送信しています。このメッセージは別の Bean によって消費され、応答は reply メカニズムを使用して送信されます。
最初に、以下を使用して新しいプロジェクトを作成します。
Windowsユーザーの場合:
-
cmdを使用する場合、(バックスラッシュ
\
を使用せず、すべてを同じ行に書かないでください)。 -
Powershellを使用する場合は、
-D
パラメータを二重引用符で囲んでください。例:"-DprojectArtifactId=vertx-http-quickstart"
./mvnw compile quarkus:dev
を実行すると、自動的に 開発者モード でアプリケーションを起動することができます。
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
次に、以下の内容で新しいJakarta RESTリソースを作成します:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
1 | greeting アドレスに name を送信し、レスポンスを要求します。 |
2 | レスポンスを取得したら、ボディを抽出してユーザーに送信します。 |
このエンドポイントを呼び出すと、しばらく待ってからタイムアウトになります。実際、誰もリスンしていません。そこで、 greeting
アドレスをリスンするコンシューマーが必要です。以下の内容の GreetingService
Bean を作成します。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
この Bean は name を受け取り、 greeting メッセージを返します.
ブラウザで http://localhost:8080/async/Quarkus を開くと、以下が表示されます。
Hello Quarkus
より理解しやすくするために、HTTP リクエスト/レスポンスがどのように処理されたかを詳しく見てみます。
-
リクエストは
hello
メソッドが受け取ります。 -
name を含むメッセージがイベントバスに送信されます。
-
別の Bean がこのメッセージを受信して、レスポンスを計算します。
-
このレスポンスは、応答メカニズムによって返信されます。
-
送信者が応答を受信すると、HTTP レスポンスに内容が書き込まれます。
このアプリケーションは、以下の方法でパッケージングすることができます。
quarkus build
./mvnw install
./gradlew build
以下でネイティブ実行可能ファイルとしてコンパイルすることもできます。
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
コーデックの利用
Vert.xイベントバス では、コーデックを使用してオブジェクトの シリアライズ と デシリアライズ を行います。Quarkusでは、ローカル配信用のデフォルトのコーデックを提供しています。そのため、以下のようにオブジェクトを交換することができます。
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
特定のコーデックを使用したい場合は、両側で明示的に設定する必要があります。
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 | メッセージの送信に使用するコーデックの名前を設定します。 |
2 | メッセージの受信に使用するコーデックを設定します。 |