Quarkus Messaging エクステンション
イベント駆動型メッセージングシステムは、ほとんどの最新アプリケーションのバックボーンになっています。 これにより、メッセージ駆動型マイクロサービスや複雑なデータストリーミングパイプラインのビルドが可能になります。
Quarkus では、主要なメッセージングテクノロジーと簡単に同期できるように設計された包括的なメッセージングエクステンションスイートを提供しています。 これにより、開発者はコアアプリケーションロジックの作成に集中できるようになり、個々の API やメッセージングインフラストラクチャーの複雑さを詳しく調べる必要がなくなります。
このページでは、すべてのメッセージングエクステンションに共通する機能と開発モデルに焦点を当てています。
これらのエクステンションの一部は、コア Quarkus リポジトリーで管理されています。
-
メッセージング: コアエクステンションは、メッセージングアプリケーションを開発するための基本的な概念と API を定義します。
-
Messaging - MQTT Connector
いくつかのエクステンションはコミュニティーによって提供され、維持されています。
-
リンク:https://docs.quarkiverse.io/quarkus-reactive-messsaging-nats-jetstream/dev/index.html[Nats Jetstream Connector]
-
リンク:https://solacelabs.github.io/solace-quarkus/[Solace Messaging Connector]
-
リンク:https://quarkus.io/extensions/io.quarkiverse.reactivemessaging.http/quarkus-reactive-messaging-http/[Reactive HTTP および WebSocket Connector]
-
AWS SQS Connector
JMS Connector や Google PubSub Connector などの他のコネクターでは、同じレベルのインテグレーションのメリットが得られず、セットアップには手動での設定がさらに必要となります。
一方、メッセージング関連のエクステンションの中には、低レベルのプロバイダー固有のインテグレーションを提案するものもあります。 このページで説明されているサポートレベルには、これらの低レベルのエクステンションは含まれません。 以下は、この種のエクステンションの一部を含むリストです。
-
リンク:https://quarkus.io/guides/kafka-streams[Kafka Streams エクステンション]
-
リンク:https://docs.quarkiverse.io/quarkus-rabbitmq-client/dev/index.html[RabbitMQ Client]
-
リンク:https://docs.quarkiverse.io/quarkus-hivemq-client/dev/index.html[HiveMQ Client]
-
リンク:https://docs.quarkiverse.io/quarkus-artemis/dev/quarkus-artemis-jms.html[Artemis Core & JMS]
-
リンク:https://docs.quarkiverse.io/quarkus-google-cloud-services/main/pubsub.html[Google Cloud Pubsub]
Quarkus Messaging 開発モデル
Quarkus は、基盤となるブローカーテクノロジーがメッセージキューイングまたはイベントストリーミングを使用するかどうかに関係なく、メッセージの公開、消費、および処理のための統一モデルを確立することで、メッセージ駆動型アプリケーションの開発を簡素化します。 MicroProfile Reactive Messaging 仕様に基づいてビルドされた Quarkus Messaging エクステンションは、シームレスかつ確実にこれらのテクノロジーと統合します。 重要なのは、これらの機能を活用するために、リアクティブプログラミングを習熟していることが前提条件ではないという点です。
Reactive Messaging 仕様は、イベント駆動型およびメッセージ駆動型のアプリケーションを実装するための CDI ベースのプログラミングモデルを定義します。 少数のアノテーションセットを使用することで、CDI Bean はメッセージブローカーとの対話を実装するための設定要素になります。 これらのやり取りは、アプリケーションコンポーネントがメッセージを読み書きする チャネル を通じて行われます。
チャネル は一意の名前で識別され、一連のアノテーションを使用して宣言されます。
@Incoming
および @Outgoing
アノテーション
@Incoming
および @Outgoing
メソッドアノテーションは、メッセージブローカーからのメッセージの消費とメッセージブローカーへのメッセージの生成を可能にする チャネル を定義します。
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MessageProcessingBean {
@Incoming("source")
@Outgoing("sink")
public String process(String consumedPayload) {
// Process the incoming message payload and return an updated payload
return consumedPayload.toUpperCase();
}
}
@Outgoing
は、メッセージを生成するメソッドで単独で使用できます。
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MessageGeneratorBean {
@Outgoing("sink")
public Multi<String> generate() {
return Multi.createFrom().items("a", "b", "c");
}
}
@Incoming
は、メッセージを消費するために単独で使用できます。
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MessageProcessingBean {
@Incoming("source")
public void process(String consumedPayload) {
// process the payload
consumedPayload.toUpperCase();
}
}
コードから直接 |
サポートされるメソッド署名の詳細は、リンク:https://smallrye.io/smallrye-reactive-messaging/latest/concepts/signatures/[SmallRye Reactive Messaging – サポートされる署名] を参照してください。
Emitter と @Channel
アノテーション
アプリケーションでは、多くの場合、メッセージングをアプリケーションの他の部分と組み合わせる必要があります。たとえば、HTTP エンドポイントからメッセージを生成したり、消費されたメッセージをレスポンスとしてストリーミングしたりする必要があります。
命令型コードから特定のチャネルにメッセージを送信するには、@Channel
アノテーションで識別される Emitter
オブジェクトを注入する必要があります。
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@ApplicationScoped
@Path("/")
public class MyImperativeBean {
@Channel("prices")
Emitter<Double> emitter;
@GET
@Path("/send")
public CompletionStage<Void> send(double d) {
return emitter.send(d);
}
}
@Channel
アノテーションを使用すると、ペイロードまたはメッセージを送信するチャネルを指定できます。
Emitter
を使用すると、チャネルに送信されたメッセージをバッファリングできます。
さらに制御するには、リンク:https://smallrye.io/smallrye-mutiny/latest/[Mutiny] APIs を使用して、MutinyEmitter
エミッターインターフェイスを使用できます。
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.MutinyEmitter;
@ApplicationScoped
@Path("/")
public class MyImperativeBean {
@Channel("prices")
MutinyEmitter<Double> emitter;
@GET
@Path("/send")
public void send(double d) {
emitter.sendAndAwait(d);
}
}
@Channel
アノテーションは、着信チャネルからのメッセージストリームを注入するためにも使用できます。
import org.eclipse.microprofile.reactive.messaging.Channel;
@ApplicationScoped
@Path("/")
public class SseResource {
@Channel("prices")
Multi<Double> prices;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<Double> stream() {
return prices;
}
}
@Channel
でメッセージを消費する場合、アプリケーションコードがストリームへのサブスクライブを行います。
上記の例では、Quarkus REST (旧称 RESTEasy Reactive) エンドポイントがサブスクリプションを処理します。
エミッターとチャネルの詳細は、リンク:https://smallrye.io/smallrye-reactive-messaging/latest/concepts/emitter/[SmallRye Reactive Messaging – エミッターとチャネル] のドキュメントを参照してください。
メッセージとメタデータ
Message
はペイロードを包むエンベロープです。
上記の例ではペイロードのみが使用されていましたが、Quarkus Messaging ではすべてのペイロードが内部的に Message
でラップされています。
Message<T>
インターフェイスは、確認応答 (ack) および否定応答 (nack) 用に <T>
タイプのペイロードを Metadata
、
任意のオブジェクトと非同期アクションのセットに関連付けます。
import org.eclipse.microprofile.reactive.messaging.Message;
@Incoming("source")
@Outgoing("sink")
public Message<String> process(Message<String> consumed) {
// Access the metadata
MyMetadata my = consumed.getMetadata(MyMetadata.class).get();
// Process the incoming message and return an updated message
return consumed.withPayload(consumed.getPayload().toUpperCase());
}
メッセージの処理または受信が成功すると、ブローカーに確認応答が返されます。
メッセージ間の確認応答は連鎖しており、メッセージを処理する際には、
発信メッセージの確認は、着信メッセージの確認をトリガーします。
ほとんどの場合、確認応答と否定応答は自動的に管理され、コネクターを使用するとチャネルごとに異なるストラテジーを設定できます。
したがって、通常は Message
インターフェイスと直接やり取りする必要はありません。
高度なユースケースでのみ、Message と直接処理する必要があります。
一方、Metadata
にアクセスすることは、多くの場合実用的です。
コネクターは、メッセージヘッダー、プロパティー、およびその他のコネクター固有の情報にアクセスできるように、特定のメタデータオブジェクトをメッセージに追加します。
コネクター固有のメタデータにアクセスするために、Message
インターフェイスを操作する必要はありません。
ペイロードパラメーターの後に、メタデータオブジェクトをメソッドパラメーターとして挿入するだけになります。
import org.eclipse.microprofile.reactive.messaging.Metadata;
@Incoming("source")
@Outgoing("sink")
public String process(String payload, MyMetadata my) {
// Access the metadata
Map<String, Object> props = my.getProperties();
// Process the payload and return an updated payload
return payload.toUpperCase();
}
コネクターに応じて、処理方法で使用できるペイロードタイプが異なります。
カスタム MessageConverter
を実装して、ペイロードをアプリケーションで受け入れられる型に変換できます。
チャネル設定
チャネル属性は、mp.messaging.incoming.<channel-name>
および mp.messaging.outgoing.<channel-name>
設定プロパティーを使用して設定できます。
たとえば、カスタムデシリアライザーを使用して my-topic
トピックからのメッセージを消費するように Kafka コネクターを設定するには、以下のようにします。
mp.messaging.incoming.source.connector=smallrye-kafka
mp.messaging.incoming.source.topic=my-topic
mp.messaging.incoming.source.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.source.auto.offset.reset=earliest
connector
属性はすべてのチャネルに必須であり、使用するコネクターを指定します。
クラスパスにコネクターが 1 つしかない場合は、Quarkus が自動的にコネクターを選択するため、この設定を省略できます。
グローバルチャネル属性は、コネクター名を使用して設定できます。
mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092
コネクター固有の属性は、コネクターのドキュメントに記載されています。
チャネルワイヤリングとメッセージングパターン
起動時に、Quarkus は宣言されたチャネルを分析してそれらをワイヤリングし、すべてのチャネルが接続されていることを確認します。 具体的には、各チャネルは、別のチャネルのメッセージの リアクティブストリーム に接続されたメッセージの リアクティブストリーム を作成します。 リアクティブストリームプロトコルに準拠し、チャネル間でバックプレッシャーメカニズムが適用され、アプリケーションリソースの使用を制御し、過剰なコミットやシステムの一部への過負荷を防ぐことができます。
その一方で、ランタイム時にプログラムで新しいチャネルを作成することはできません。 ただし、すべてではないにしても、ほとんどのメッセージングおよびインテグレーションユースケースを実装できるパターンは多数あります。
一部のメッセージングテクノロジーでは、コンシューマーがトピックまたはキューのセットをサブスクライブし、プロデューサーがメッセージベースで特定のトピックにメッセージを送信できます。 ランタイム時にクライアントを動的に設定および作成する必要があることが確実な場合は、低レベルのクライアントを直接使用することを検討する必要があります。 |
内部チャネル
ユースケースによっては、メッセージングパターンを使用して同じアプリケーション内でメッセージを転送すると便利です。 チャネルをメッセージングバックエンド、つまりコネクターに接続しない場合は、すべてがアプリケーション内部で行われます。 そして、ストリームはメソッドを連鎖することで作成されます。 各チェーンは引き続きリアクティブストリームで、バックプレッシャープロトコルが適用されます。
フレームワークは、プロデューサー/コンシューマーチェーンが完全であることを確認します。
つまり、アプリケーションがメッセージをインターナルチャネルに書き込む場合 (@Outgoing
のみを持つメソッド、または Emitter
を使用)、
アプリケーション内からメッセージを消費する必要もあります (@Incoming
のみを持つメソッド、またはアンマネージドストリームを使用)。
チャネルを有効化/無効化
定義されたすべてのチャネルはデフォルトで有効化されていますが、以下の設定でチャネルを無効化することもできます。
mp.messaging.incoming.my-channel.enabled=false
これを Quarkus ビルドプロファイルと一緒に使用して、ターゲット環境などのビルド時の条件に基づいてチャネルを有効化/無効化することができます。 チャネルを無効化するときは、以下の 2 つの点を確認する必要があります。
-
無効化されたチャネルの使用方法が、ビルド時にフィルタリングできる Bean 内に設定されていること。
-
このチャネルがなくても、残りのチャネルが引き続き正常に動作すること。
@ApplicationScoped
@IfBuildProfile("my-profile")
public class MyProfileBean {
@Outgoing("my-channel")
public Multi<String> generate() {
return Multi.createFrom().items("a", "b", "c");
}
}
Multiple Outgoings と @Broadcast
デフォルトでは、チャネルで送信されるメッセージは単一のコンシューマーにのみディスパッチされます。 複数のコンシューマーが存在するとエラーとみなされ、デプロイメント時に報告されます。
@Broadcast
アノテーションはこの動作を変更し、チャネルを通過するメッセージがすべてのコンシューマーにディスパッチされることを示します。
@Broadcast
は @Outgoing
アノテーションと一緒に使用する必要があります。
import org.eclipse.microprofile.reactive.messaging.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@Incoming("in")
@Outgoing("out")
@Broadcast
public int increment(int i) {
return i + 1;
}
@Incoming("out")
public void consume1(int i) {
//...
}
@Incoming("out")
public void consume2(int i) {
//...
}
@Broadcast
と同様に、同じメソッドで @Outgoing
アノテーションを複数回使用して、このメソッドが複数のチャネルにメッセージを生成することを示すことができます。
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
public String process(String s) {
// send messages from channel in to both channels out1 and out2
return s.toUpperCase();
}
Multiple Outgoings を使用すると、単一のメッセージが複数のターゲットチャネルによって処理されるファンアウトパターンを実装する場合に役立ちます。
処理メソッドから Targeted
を返すことで、メッセージを複数の送信先に選択的にディスパッチできます。
@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
@Outgoing("out3")
public Targeted process(double price) {
// send messages from channel-in to both channel-out1 and channel-out2
Targeted targeted = Targeted.of("out1", "Price: " + price, "out2", "Quote: " + price);
if (price > 90.0) {
return targeted.with("out3", price);
}
return targeted;
}
Multiple Incomings と @Merge
デフォルトでは、単一のプロデューサーがチャネル内でメッセージを送信できます。
複数のプロデューサーが存在するとエラーとみなされ、デプロイメント時に報告されます。
@Merge
アノテーションはこの動作を変更し、チャネルに複数のプロデューサーが存在する可能性があることを示します。
@Merge
は @Incoming
アノテーションと一緒に使用する必要があります。
@Incoming("in1")
@Outgoing("out")
public int increment(int i) {
return i + 1;
}
@Incoming("in2")
@Outgoing("out")
public int multiply(int i) {
return i * 2;
}
@Incoming("out")
@Merge
public void getAll(int i) {
//...
}
@Merge
と同様に、同じメソッドで @Incoming
アノテーションを複数回使用して、メソッドが複数のチャネルからのメッセージを消費することを示すことができます。
@Incoming("in1")
@Incoming("in2")
public String process(String s) {
// get messages from channel-1 and channel-2
return s.toUpperCase();
}
ストリーム処理
一部の高度なシナリオでは、個々のメッセージではなく、メッセージのストリームを直接操作できます。
着信署名と発信署名でリンク:https://smallrye.io/smallrye-mutiny/latest/[Mutiny APIs] を使用すると、メッセージのストリームを処理できます。
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class StreamProcessor {
@Incoming("source")
@Outgoing("sink")
public Multi<String> process(Multi<String> in) {
return in.map(String::toUpperCase);
}
}
実行モデル
Quarkus Messaging は、Quarkus の リアクティブエンジン の上に配置され、リンク:https://vertx.io/[Eclipse Vert.x] を活用して処理用にメッセージをディスパッチします。 以下の 3 つの実行モードをサポートしています。
-
イベントループ: メッセージが Vert.x I/O スレッドにディスパッチされます。 イベントループではブロッキング操作を実行しないでください。
-
ワーカースレッド: メッセージはワーカースレッドプールにディスパッチされます。
-
仮想スレッド: メッセージは仮想スレッドにディスパッチされます (Java 21 以降が必要です)。 仮想スレッドはプールされないため、メッセージごとに新しい仮想スレッドが作成されます。 詳細は、専用の Quarkus Virtual Thread サポート ガイドを参照してください。
Quarkus はメソッド署名に基づいてデフォルトの実行モードを選択します。 メソッド署名が synchronous の場合、メッセージは ワーカースレッド にディスパッチされ、それ以外の場合はデフォルトで イベントループ にディスパッチされます。
メソッド署名 | デフォルトの実行モード |
---|---|
@Incoming("source") void process(String payload) |
ワークスレッド |
@Incoming("source") Uni<Void> process(String payload) |
イベントループ |
@Incoming("source") CompletionStage<Void> process(Message<String> message) |
イベントループ |
@Incoming("source") @Outgoing("sink") Multi<R> process(Multi<T> in) |
ストリーム処理メソッドは起動時に実行され、各メッセージはイベントループにディスパッチされます。 |
アノテーションを使用すると、実行モデルを細かく制御できます。
-
リンク:https://javadoc.io/doc/io.smallrye.reactive/smallrye-reactive-messaging-api/latest/io/smallrye/reactive/messaging/annotations/Blocking.html [
@Blocking
] は、メソッドをワーカースレッドプールで強制的に実行します。 ワーカースレッドのデフォルトプールは、すべてのチャネル間で共有されます。@Blocking("my-custom-pool")
を使用すると、カスタムスレッドプールを使用してチャネルを設定できます。 設定プロパティーsmallrye.messaging.worker.my-custom-pool.max-concurrency
は、プール内のスレッドの最大数を指定します。 ブロッキング処理の詳細は、リンク:http://smallrye.io/smallrye-reactive-messaging/4.8.0/concepts/blocking/[SmallRye Reactive Messaging のドキュメント] を参照してください。 -
@NonBlocking
は、メソッドをイベントループスレッドで強制的に実行します。 -
@RunOnVirtualThread
は、メソッドを仮想スレッド上で強制的に実行します。 仮想スレッドの軽量な性質を活用するために、@RunOnVirtualThread
アノテーションが付けられたメソッドのデフォルトの最大同時実行数は 1024 です。 これは、smallrye.messaging.worker.<virtual-thread>.max-concurrency
設定プロパティーを設定することで変更できます。 または、@Blocking("my-custom-pool")
アノテーションと一緒に使用することで変更できます。
@Transactional
アノテーションの存在は、実行をブロックすることを意味します。
メッセージングアプリケーションでは、生成および消費されるメッセージは順序付けられたイベントのストリームを構成します。
これは、(トピックまたはキュー内に) ブローカーによって強制されるか、
アプリケーションの受付および発行の順序によって強制されます。
この順序を維持するために、Quarkus Messaging はデフォルトでメッセージを順番にディスパッチします。
@Blocking(ordered = false)`または `@RunOnVirtualThread
アノテーションを使用して、この動作をオーバーライドできます。
着信チャネルの同時実行
一部のコネクターは、着信チャネルの同時実行レベルの設定をサポートしています。
mp.messaging.incoming.my-channel.concurrency=4
これにより、内部的に着信チャネルのコピーが 4 つ作成され、同じ処理方法にワイヤリングされます。 これは、ブローカーテクノロジーによっては、異なるコピーで受信されたメッセージの部分的な順序を保持したまま、 複数のメッセージを同時に処理することでアプリケーションのスループットを向上させる際に役立つ場合があります。 たとえば、複数のコンシューマーが異なるトピックパーティションを消費できる Kafka の場合がこれに該当します。
ヘルスチェック
Together with the SmallRye Health extension, Quarkus Messaging extensions provide health check support per channel. The implementation of startup, readiness and liveness checks depends on the connector. Some connectors allow configuring the health check behavior or disabling them completely or per channel.
チャネルのヘルスチェックは、quarkus.messaging.health.<channel-name>.enabled
を使用して無効化できます。またはヘルスチェックタイプごとに無効化できます。
(例: quarkus.messaging.health.<channel-name>.liveness.enabled
)
quarkus.messaging.health.enabled
設定プロパティーを false
に設定すると、メッセージングのヘルスチェックが完全に無効になります。
可観測性
Micrometer メトリクス
Quarkus Messaging エクステンションは、メッセージングシステムのヘルスを監視するためのシンプルかつ便利なメトリクスを提供します。 Micrometer エクステンション はこれらのメトリクスを公開します。
channel
タグで識別されるチャネルごとに、以下のメトリクスを収集できます。
-
quarkus.messaging.message.count
: 生成または受信したメッセージの数 -
quarkus.messaging.message.acks
: 正常に処理されたメッセージの数 -
quarkus.messaging.message.failures
: 処理に失敗したメッセージの数 -
quarkus.messaging.message.duration
: メッセージの処理時間
下位互換性のため、チャネルメトリクスはデフォルトでは有効化されていませんが、smallrye.messaging.observation.enabled=true
で有効化することができます。
OpenTelemetry トレース
一部の Quarkus Messaging コネクターは、OpenTelemetry Tracing とすぐに統合されます。 OpenTelemetry エクステンション が存在する場合、発信メッセージは現在のトレース範囲を伝播します。 着信チャネルでは、受信したメッセージにトレース情報が含まれている場合、メッセージ処理はメッセージスパンを親として継承します。
次の設定を使用して、特定のチャネルのトレースを無効にすることができます。
mp.messaging.incoming.data.tracing-enabled=false
TLS Configuration
Some messaging extensions integrate with the Quarkus TLS Registry to configure the underlying client.
To configure the TLS on a channel, you need to provide the named TLS configuration to the tls-configuration-name
property:
quarkus.tls.my-tls-config.trust-store=truststore.jks
quarkus.tls.my-tls-config.trust-store-password=secret
mp.messaging.incoming.my-channel.tls-configuration-name=my-tls-config
Or you can configure it globally on all channels of a connector:
mp.messaging.connector.smallrye-pulsar.tls-configuration-name=my-tls-config
Currently, the following messaging extensions support configuration through the Quarkus TLS Registry:
-
Kafka: Provides the
ssl.engine.factory.class
property for the Kafka client. -
Pulsar: Only mTLS authentication is supported.
-
RabbitMQ
-
AMQP 1.0
-
MQTT
テスト
Dev Services を使用したテスト
ほとんどの Quarkus Messaging エクステンションは、アプリケーションの開発とテストを簡素化する Dev Service を提供します。 Dev Service は、Quarkus Messaging エクステンションですぐに使用できるように設定されたブローカーインスタンスを作成します。
During testing Quarkus creates a separate broker instance to run the tests against it.
プラットフォームエクステンションによって提供される Dev Services のリストを含む Dev Services の詳細は、Dev Services ガイドを参照してください。
InMemoryConnector を使用したテスト
ブローカーを起動せずにアプリケーションをテストすると便利です。 これを実現するには、コネクターで管理しているチャンネルを in-memory に 切り替え できます。
このアプローチは、JVM テストでのみ機能します。インジェクションには対応していないため、ネイティブテストには使用できません。 |
以下のサンプルアプリケーションをテストするとします。
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class MyMessagingApplication {
@Inject
@Channel("words-out")
Emitter<String> emitter;
public void sendMessage(String out) {
emitter.send(out);
}
@Incoming("words-in")
@Outgoing("uppercase")
public Message<String> toUpperCase(Message<String> message) {
return message.withPayload(message.getPayload().toUpperCase());
}
}
まず、以下のテスト依存関係をアプリケーションに追加します。
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-in-memory</artifactId>
<scope>test</scope>
</dependency>
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")
そして、以下のように Quarkus Test Resource を作成します。
public class InMemoryConnectorLifecycleManager implements QuarkusTestResourceLifecycleManager {
@Override
public Map<String, String> start() {
Map<String, String> env = new HashMap<>();
Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("words-in"); (1)
Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("uppercase"); (2)
Map<String, String> props3 = InMemoryConnector.switchOutgoingChannelsToInMemory("words-out"); (3)
env.putAll(props1);
env.putAll(props2);
env.putAll(props3);
return env; (4)
}
@Override
public void stop() {
InMemoryConnector.clear(); (5)
}
}
1 | 着信チャネルl words-in (消費されたメッセージ) をin-memory に切り替えます。 |
2 | 発信チャネル words-out (生成されたメッセージ) を in-memory に切り替えます。 |
3 | 発信チャネル uppercase (処理済みメッセージ) を in-memory に切り替えます。 |
4 | in-memory チャネルを使用するためのアプリケーション設定に必要なすべてのプロパティーを含む Map をビルドして返します。 |
5 | テストが停止したら、InMemoryConnector をクリアします (受信したメッセージと送信したメッセージをすべて破棄してください)。 |
上記で作成したテストリソースを使用して @QuarkusTest
を作成します。
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import io.smallrye.reactive.messaging.memory.InMemorySink;
import io.smallrye.reactive.messaging.memory.InMemorySource;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.junit.jupiter.api.Test;
import jakarta.inject.Inject;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.awaitility.Awaitility.await;
@QuarkusTest
@QuarkusTestResource(InMemoryConnectorLifecycleManager.class)
class MyMessagingApplicationTest {
@Inject
@Connector("smallrye-in-memory")
InMemoryConnector connector; (1)
@Inject
MyMessagingApplication app;
@Test
void test() {
InMemorySink<String> wordsOut = connector.sink("words-out"); (2)
InMemorySource<String> wordsIn = connector.source("words-in"); (3)
InMemorySink<String> uppercaseOut = connector.sink("uppercase"); (4)
app.sendMessage("Hello"); (5)
assertEquals("Hello", wordsOut.received().get(0).getPayload()); (6)
wordsIn.send("Bonjour"); (7)
await().untilAsserted(() -> assertEquals("BONJOUR", uppercaseOut.received().get(0).getPayload())); (8)
}
}
1 | @Connector または @Any 修飾子を使用して、テストクラスに in-memory コネクターを注入します。 |
2 | 発信チャネルの取得 (words-out ) - テストリソース内でチャネルが in-memory に切り替えられている必要があります。 |
3 | 着信チャネルの取得 (words-in ) |
4 | 発信チャネルの取得 (uppercase ) |
5 | 注入されたアプリケーション Bean を使用して sendMessage メソッドを呼び出し、チャネル words-out を持つエミッターを使用してメッセージを送信します。 |
6 | アプリケーションによって生成されたメッセージを確認するには、words-out in-memory チャネルの received メソッドを使用します。 |
7 | メッセージを送信するには、words-in in-memory チャネルの send メソッドを使用します。
アプリケーションはこのメッセージを処理し、uppercase チャネルにメッセージを送信します。 |
8 | アプリケーションによって生成されたメッセージを確認するには、uppercase チャネルの received メソッドを使用します。 |
in-memory コネクターはテスト目的のみに使用されます。 in-memory コネクターを使用する際に考慮すべき注意事項がいくつかあります。
テストがコンテキストの伝播に依存している場合は、 |