Using the event bus
Quarkusでは、異なる Bean が非同期イベントを使用して相互作用することができるため、疎結合化が促進されます。メッセージは 仮想アドレス に送信されます。3種類の配信メカニズムを提供しています。
-
point-to-point - send the message, one consumer receives it. If several consumers listen to the address, a round-robin is applied;
-
publish/subscribe - メッセージをパブリッシュし、アドレスをリスンしているすべてのコンシューマがメッセージを受信します。
-
request/reply - メッセージを送信し、応答を待ちます。受信者は非同期的にメッセージに応答することができます。
All these delivery mechanisms are non-blocking, and are providing one of the fundamental brick to build reactive applications.
非同期メッセージパッシング機能は、Reactive Messagingではサポートされていないメッセージへの返信を可能にします。ただし、シングルイベント動作(ストリームなし)とローカルメッセージに限定されます。 |
インストール
この仕組みはVert.x EventBusを利用しているので、この機能を利用するには vertx
エクステンションを有効にする必要があります。新規プロジェクトを作成する場合は、 extensions
パラメーターを以下のように設定します。
すでに作成済みのプロジェクトがある場合は、 vertx
エクステンションを既存のQuarkusプロジェクトに add-extension
コマンドで追加することができます。
quarkus extension add 'vertx'
./mvnw quarkus:add-extension -Dextensions="vertx"
./gradlew addExtension --extensions="vertx"
Otherwise, you can manually add this to the dependencies section of your build file:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
implementation("io.quarkus:quarkus-vertx")
イベントのコンシューム
イベントをコンシュームするには、 io.quarkus.vertx.ConsumeEvent
アノテーションを使用します。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | 設定されていない場合、アドレスは Bean の完全修飾名となり、例えばこのスニペットでは org.acme.vertx.GreetingService となります。 |
2 | メソッドのパラメーターはメッセージ本体です。メソッドが 何か を返す場合は、それがメッセージのレスポンスになります。 |
デフォルトでは、Vert.x イベントループで呼び出されるため、イベントをコンシュームするコードは ノンブロッキング でなければなりません。処理がブロッキングである場合は、
あるいは、メソッドに
When using |
io.smallrye.mutiny.Uni
または java.util.concurrent.CompletionStage
のどちらかを返すことで、非同期処理も可能です。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public CompletionStage<String> consume(String name) {
// return a CompletionStage completed when the processing is finished.
// You can also fail the CompletionStage explicitly
}
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
The previous example uses Mutiny reactive types. If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library. |
アドレスの設定
@ConsumeEvent
アノテーションでアドレスを設定することができます。
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | greeting アドレスに送信されたメッセージを受信する |
応答
@ConsumeEvent
でアノテーションされたメソッドの 戻り値 が、着信メッセージに対するレスポンスとして使用されます。例えば、次のスニペットでは、 String
がレスポンスとして返されます。
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
また、 Uni<T>
や CompletionStage<T>
を返すことで、非同期応答を扱うこともできます。。
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
Context Propagation エクステンションを使えば、
Alternatively, you can use the default Quarkus worker pool using:
|
fire and forget インタラクションの実装
You don’t have to reply to received messages. Typically, for a fire and forget interaction, the messages are consumed and the sender does not need to know about it. To implement this, your consumer method just returns void
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
Message の取り扱い
上でも述べたように、この仕組みはVert.xイベントバスをベースにしています。なので、直接 Message
を使うこともできます。
@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
System.out.println(msg.address());
System.out.println(msg.body());
}
失敗のハンドリング
If a method annotated with @ConsumeEvent
throws an exception then:
-
if a reply handler is set then the failure is propagated back to the sender via an
io.vertx.core.eventbus.ReplyException
with codeConsumeEvent#FAILURE_CODE
and the exception message, -
if no reply handler is set then the exception is rethrown (and wrapped in a
RuntimeException
if necessary) and can be handled by the default exception handler, i.e.io.vertx.core.Vertx#exceptionHandler()
.
メッセージの送信
さて、メッセージを受信する方法を見てきましたが、次は送信者 側 に切り替えましょう。メッセージの送信とパブリッシュは Vert.x イベントバスを使います。
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
1 | イベントバスのインジェクト |
2 | アドレス greeting にメッセージを送信。メッセージのペイロードは name |
EventBus
オブジェクトは、以下のメソッドを提供します。
-
send
特定のアドレスへのメッセージ - 一人のコンシューマーがメッセージを受信する。 -
publish
特定のアドレスにメッセージを送る - すべてのコンシューマーがメッセージを受け取る。 -
send
メッセージを送って返信を待つ
// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
物事をまとめる - HTTPとメッセージのブリッジング
greeting HTTP エンドポイントを再訪し、非同期メッセージパッシングを使用して分離された Bean に呼び出しを委譲してみましょう。リクエスト/レスポンスディスパッチの仕組みを利用しています。JAX-RS エンドポイント内にビジネスロジックを実装する代わりに、メッセージを送信しています。このメッセージは別の Bean によってコンシュームされ、応答は 返信 機構を使用して送信されます。
まず、新しいプロジェクトを作成します。
You can already start the application in dev mode using:
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
そして、以下の内容の JAX-RS リソースを新規に作成します。
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
1 | greeting アドレスに name を送信し、レスポンスを要求します。 |
2 | レスポンスを取得したら、ボディを抽出してユーザーに送信します。 |
このエンドポイントを呼び出すと、しばらく待ってからタイムアウトになります。実際、誰もリスンしていません。そこで、 greeting
アドレスをリスンするコンシューマーが必要です。以下の内容の GreetingService
Bean を作成します。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
この Bean は name を受け取り、 greeting メッセージを返します.
さて、ブラウザで http://localhost:8080/async/Quarkus を開くと、以下が表示されるはずです。
Hello Quarkus
より理解しやすくするために、HTTP リクエスト/レスポンスがどのように処理されたかを詳しく見てみましょう。
-
リクエストは
hello
メソッドが受け取ります。 -
name を含むメッセージがイベントバスに送信されます。
-
別の Bean がこのメッセージを受信して、レスポンスを計算します.
-
このレスポンスは、応答機構を使用して返信されます。
-
送信者が応答を受信すると、HTTP レスポンスに内容が書き込まれます。
このアプリケーションは、以下の方法でパッケージングできます。
quarkus build
./mvnw clean package
./gradlew build
以下でネイティブ実行可能ファイルとしてコンパイルすることもできます。
quarkus build --native
./mvnw package -Dnative
./gradlew build -Dquarkus.package.type=native
コーデックを使う
Vert.xイベントバス では、コーデックを使用してオブジェクトの シリアライズ と デシリアライズ を行います。Quarkusでは、ローカル配信用のデフォルトのコーデックを提供しています。そのため、以下のようにオブジェクトを交換することができます。
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
特定のコーデックを使用したい場合は、両サイドで明示的に設定する必要があります。
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 | メッセージの送信に使用するコーデックの名前を設定します。 |
2 | メッセージの受信に使用するコーデックを設定します。 |