The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.
このページを編集

Quarkus Messaging エクステンション

イベント駆動型メッセージングシステムは、ほとんどの最新アプリケーションのバックボーンになっています。 これにより、メッセージ駆動型マイクロサービスや複雑なデータストリーミングパイプラインのビルドが可能になります。

Quarkus では、主要なメッセージングテクノロジーと簡単に同期できるように設計された包括的なメッセージングエクステンションスイートを提供しています。 これにより、開発者はコアアプリケーションロジックの作成に集中できるようになり、個々の API やメッセージングインフラストラクチャーの複雑さを詳しく調べる必要がなくなります。

Quarkus Messaging

このページでは、すべてのメッセージングエクステンションに共通する機能と開発モデルに焦点を当てています。

これらのエクステンションの一部は、コア Quarkus リポジトリーで管理されています。

いくつかのエクステンションはコミュニティーによって提供され、維持されています。

  • リンク:https://camel.apache.org/camel-quarkus/3.8.x/reference/extensions/smallrye-reactive-messaging.html[Camel Smallrye Reactive Messaging]

  • リンク:https://docs.quarkiverse.io/quarkus-reactive-messsaging-nats-jetstream/dev/index.html[Nats Jetstream Connector]

  • リンク:https://solacelabs.github.io/solace-quarkus/[Solace Messaging Connector]

  • リンク:https://quarkus.io/extensions/io.quarkiverse.reactivemessaging.http/quarkus-reactive-messaging-http/[Reactive HTTP および WebSocket Connector]

  • AWS SQS Connector

JMS ConnectorGoogle PubSub Connector などの他のコネクターでは、同じレベルのインテグレーションのメリットが得られず、セットアップには手動での設定がさらに必要となります。

一方、メッセージング関連のエクステンションの中には、低レベルのプロバイダー固有のインテグレーションを提案するものもあります。 このページで説明されているサポートレベルには、これらの低レベルのエクステンションは含まれません。 以下は、この種のエクステンションの一部を含むリストです。

  • リンク:https://quarkus.io/guides/kafka-streams[Kafka Streams エクステンション]

  • リンク:https://docs.quarkiverse.io/quarkus-rabbitmq-client/dev/index.html[RabbitMQ Client]

  • リンク:https://docs.quarkiverse.io/quarkus-hivemq-client/dev/index.html[HiveMQ Client]

  • リンク:https://docs.quarkiverse.io/quarkus-artemis/dev/quarkus-artemis-jms.html[Artemis Core & JMS]

  • リンク:https://docs.quarkiverse.io/quarkus-google-cloud-services/main/pubsub.html[Google Cloud Pubsub]

Quarkus Messaging 開発モデル

Quarkus は、基盤となるブローカーテクノロジーがメッセージキューイングまたはイベントストリーミングを使用するかどうかに関係なく、メッセージの公開、消費、および処理のための統一モデルを確立することで、メッセージ駆動型アプリケーションの開発を簡素化します。 MicroProfile Reactive Messaging 仕様に基づいてビルドされた Quarkus Messaging エクステンションは、シームレスかつ確実にこれらのテクノロジーと統合します。 重要なのは、これらの機能を活用するために、リアクティブプログラミングを習熟していることが前提条件ではないという点です。

Reactive Messaging 仕様は、イベント駆動型およびメッセージ駆動型のアプリケーションを実装するための CDI ベースのプログラミングモデルを定義します。 少数のアノテーションセットを使用することで、CDI Bean はメッセージブローカーとの対話を実装するための設定要素になります。 これらのやり取りは、アプリケーションコンポーネントがメッセージを読み書きする チャネル を通じて行われます。

チャネル は一意の名前で識別され、一連のアノテーションを使用して宣言されます。

@Incoming および @Outgoing アノテーション

@Incoming および @Outgoing メソッドアノテーションは、メッセージブローカーからのメッセージの消費とメッセージブローカーへのメッセージの生成を可能にする チャネル を定義します。

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MessageProcessingBean {

   @Incoming("source")
   @Outgoing("sink")
   public String process(String consumedPayload) {
       // Process the incoming message payload and return an updated payload
       return consumedPayload.toUpperCase();
   }

}

@Outgoing は、メッセージを生成するメソッドで単独で使用できます。

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MessageGeneratorBean {

   @Outgoing("sink")
   public Multi<String> generate() {
       return Multi.createFrom().items("a", "b", "c");
   }

}

@Incoming は、メッセージを消費するために単独で使用できます。

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MessageProcessingBean {

   @Incoming("source")
   public void process(String consumedPayload) {
       // process the payload
       consumedPayload.toUpperCase();
   }

}

コードから直接 @Incoming@Outgoing のアノテーションが付けられたメソッドを呼び出してはならない点に注意してください。 これらはフレームワークによって呼び出されます。 ユーザーコードがそれらを呼び出すと、期待どおりの結果は得られません。

サポートされるメソッド署名の詳細は、リンク:https://smallrye.io/smallrye-reactive-messaging/latest/concepts/signatures/[SmallRye Reactive Messaging – サポートされる署名] を参照してください。

Emitter と @Channel アノテーション

アプリケーションでは、多くの場合、メッセージングをアプリケーションの他の部分と組み合わせる必要があります。たとえば、HTTP エンドポイントからメッセージを生成したり、消費されたメッセージをレスポンスとしてストリーミングしたりする必要があります。

命令型コードから特定のチャネルにメッセージを送信するには、@Channel アノテーションで識別される Emitter オブジェクトを注入する必要があります。

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@ApplicationScoped
@Path("/")
public class MyImperativeBean {

   @Channel("prices")
   Emitter<Double> emitter;

   @GET
   @Path("/send")
   public CompletionStage<Void> send(double d) {
       return emitter.send(d);
   }
}

@Channel アノテーションを使用すると、ペイロードまたはメッセージを送信するチャネルを指定できます。 Emitter を使用すると、チャネルに送信されたメッセージをバッファリングできます。

さらに制御するには、リンク:https://smallrye.io/smallrye-mutiny/latest/[Mutiny] APIs を使用して、MutinyEmitter エミッターインターフェイスを使用できます。

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.MutinyEmitter;

@ApplicationScoped
@Path("/")
public class MyImperativeBean {

   @Channel("prices")
   MutinyEmitter<Double> emitter;

   @GET
   @Path("/send")
   public void send(double d) {
       emitter.sendAndAwait(d);
   }

}

@Channel アノテーションは、着信チャネルからのメッセージストリームを注入するためにも使用できます。

import org.eclipse.microprofile.reactive.messaging.Channel;

@ApplicationScoped
@Path("/")
public class SseResource {

    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }

}

@Channel でメッセージを消費する場合、アプリケーションコードがストリームへのサブスクライブを行います。 上記の例では、Quarkus REST (旧称 RESTEasy Reactive) エンドポイントがサブスクリプションを処理します。

エミッターとチャネルの詳細は、リンク:https://smallrye.io/smallrye-reactive-messaging/latest/concepts/emitter/[SmallRye Reactive Messaging – エミッターとチャネル] のドキュメントを参照してください。

メッセージとメタデータ

Message はペイロードを包むエンベロープです。 上記の例ではペイロードのみが使用されていましたが、Quarkus Messaging ではすべてのペイロードが内部的に Message でラップされています。

Message<T> インターフェイスは、確認応答 (ack) および否定応答 (nack) 用に <T> タイプのペイロードを Metadata、 任意のオブジェクトと非同期アクションのセットに関連付けます。

import org.eclipse.microprofile.reactive.messaging.Message;

@Incoming("source")
@Outgoing("sink")
public Message<String> process(Message<String> consumed) {
    // Access the metadata
    MyMetadata my = consumed.getMetadata(MyMetadata.class).get();
    // Process the incoming message and return an updated message
    return consumed.withPayload(consumed.getPayload().toUpperCase());
}

メッセージの処理または受信が成功すると、ブローカーに確認応答が返されます。 メッセージ間の確認応答は連鎖しており、メッセージを処理する際には、 発信メッセージの確認は、着信メッセージの確認をトリガーします。 ほとんどの場合、確認応答と否定応答は自動的に管理され、コネクターを使用するとチャネルごとに異なるストラテジーを設定できます。 したがって、通常は Message インターフェイスと直接やり取りする必要はありません。 高度なユースケースでのみ、Message と直接処理する必要があります。

一方、Metadata にアクセスすることは、多くの場合実用的です。 コネクターは、メッセージヘッダー、プロパティー、およびその他のコネクター固有の情報にアクセスできるように、特定のメタデータオブジェクトをメッセージに追加します。 コネクター固有のメタデータにアクセスするために、Message インターフェイスを操作する必要はありません。 ペイロードパラメーターの後に、メタデータオブジェクトをメソッドパラメーターとして挿入するだけになります。

import org.eclipse.microprofile.reactive.messaging.Metadata;
@Incoming("source")
@Outgoing("sink")
public String process(String payload, MyMetadata my) {
    // Access the metadata
    Map<String, Object> props = my.getProperties();
    // Process the payload and return an updated payload
    return payload.toUpperCase();
}

コネクターに応じて、処理方法で使用できるペイロードタイプが異なります。 カスタム MessageConverter を実装して、ペイロードをアプリケーションで受け入れられる型に変換できます。

チャネル設定

チャネル属性は、mp.messaging.incoming.<channel-name> および mp.messaging.outgoing.<channel-name> 設定プロパティーを使用して設定できます。

たとえば、カスタムデシリアライザーを使用して my-topic トピックからのメッセージを消費するように Kafka コネクターを設定するには、以下のようにします。

mp.messaging.incoming.source.connector=smallrye-kafka
mp.messaging.incoming.source.topic=my-topic
mp.messaging.incoming.source.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.source.auto.offset.reset=earliest

connector 属性はすべてのチャネルに必須であり、使用するコネクターを指定します。 クラスパスにコネクターが 1 つしかない場合は、Quarkus が自動的にコネクターを選択するため、この設定を省略できます。

グローバルチャネル属性は、コネクター名を使用して設定できます。

mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092

コネクター固有の属性は、コネクターのドキュメントに記載されています。

チャネルワイヤリングとメッセージングパターン

起動時に、Quarkus は宣言されたチャネルを分析してそれらをワイヤリングし、すべてのチャネルが接続されていることを確認します。 具体的には、各チャネルは、別のチャネルのメッセージの リアクティブストリーム に接続されたメッセージの リアクティブストリーム を作成します。 リアクティブストリームプロトコルに準拠し、チャネル間でバックプレッシャーメカニズムが適用され、アプリケーションリソースの使用を制御し、過剰なコミットやシステムの一部への過負荷を防ぐことができます。

その一方で、ランタイム時にプログラムで新しいチャネルを作成することはできません。 ただし、すべてではないにしても、ほとんどのメッセージングおよびインテグレーションユースケースを実装できるパターンは多数あります。

一部のメッセージングテクノロジーでは、コンシューマーがトピックまたはキューのセットをサブスクライブし、プロデューサーがメッセージベースで特定のトピックにメッセージを送信できます。 ランタイム時にクライアントを動的に設定および作成する必要があることが確実な場合は、低レベルのクライアントを直接使用することを検討する必要があります。

内部チャネル

ユースケースによっては、メッセージングパターンを使用して同じアプリケーション内でメッセージを転送すると便利です。 チャネルをメッセージングバックエンド、つまりコネクターに接続しない場合は、すべてがアプリケーション内部で行われます。 そして、ストリームはメソッドを連鎖することで作成されます。 各チェーンは引き続きリアクティブストリームで、バックプレッシャープロトコルが適用されます。

フレームワークは、プロデューサー/コンシューマーチェーンが完全であることを確認します。 つまり、アプリケーションがメッセージをインターナルチャネルに書き込む場合 (@Outgoing のみを持つメソッド、または Emitter を使用)、 アプリケーション内からメッセージを消費する必要もあります (@Incoming のみを持つメソッド、またはアンマネージドストリームを使用)。

チャネルを有効化/無効化

定義されたすべてのチャネルはデフォルトで有効化されていますが、以下の設定でチャネルを無効化することもできます。

mp.messaging.incoming.my-channel.enabled=false

これを Quarkus ビルドプロファイルと一緒に使用して、ターゲット環境などのビルド時の条件に基づいてチャネルを有効化/無効化することができます。 チャネルを無効化するときは、以下の 2 つの点を確認する必要があります。

  • 無効化されたチャネルの使用方法が、ビルド時にフィルタリングできる Bean 内に設定されていること。

  • このチャネルがなくても、残りのチャネルが引き続き正常に動作すること。

@ApplicationScoped
@IfBuildProfile("my-profile")
public class MyProfileBean {

    @Outgoing("my-channel")
    public Multi<String> generate() {
        return Multi.createFrom().items("a", "b", "c");
    }

}

Multiple Outgoings と @Broadcast

デフォルトでは、チャネルで送信されるメッセージは単一のコンシューマーにのみディスパッチされます。 複数のコンシューマーが存在するとエラーとみなされ、デプロイメント時に報告されます。

@Broadcast アノテーションはこの動作を変更し、チャネルを通過するメッセージがすべてのコンシューマーにディスパッチされることを示します。 @Broadcast@Outgoing アノテーションと一緒に使用する必要があります。

import org.eclipse.microprofile.reactive.messaging.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@Incoming("in")
@Outgoing("out")
@Broadcast
public int increment(int i) {
    return i + 1;
}

@Incoming("out")
public void consume1(int i) {
    //...
}

@Incoming("out")
public void consume2(int i) {
    //...
}

@Broadcast と同様に、同じメソッドで @Outgoing アノテーションを複数回使用して、このメソッドが複数のチャネルにメッセージを生成することを示すことができます。

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
public String process(String s) {
    // send messages from channel in to both channels out1 and out2
    return s.toUpperCase();
}

Multiple Outgoings を使用すると、単一のメッセージが複数のターゲットチャネルによって処理されるファンアウトパターンを実装する場合に役立ちます。

処理メソッドから Targeted を返すことで、メッセージを複数の送信先に選択的にディスパッチできます。

@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
@Outgoing("out3")
public Targeted process(double price) {
    // send messages from channel-in to both channel-out1 and channel-out2
    Targeted targeted = Targeted.of("out1", "Price: " + price, "out2", "Quote: " + price);
    if (price > 90.0) {
        return targeted.with("out3", price);
    }
    return targeted;
}

Multiple Incomings と @Merge

デフォルトでは、単一のプロデューサーがチャネル内でメッセージを送信できます。 複数のプロデューサーが存在するとエラーとみなされ、デプロイメント時に報告されます。 @Merge アノテーションはこの動作を変更し、チャネルに複数のプロデューサーが存在する可能性があることを示します。 @Merge@Incoming アノテーションと一緒に使用する必要があります。

@Incoming("in1")
@Outgoing("out")
public int increment(int i) {
    return i + 1;
}

@Incoming("in2")
@Outgoing("out")
public int multiply(int i) {
    return i * 2;
}

@Incoming("out")
@Merge
public void getAll(int i) {
    //...
}

@Merge と同様に、同じメソッドで @Incoming アノテーションを複数回使用して、メソッドが複数のチャネルからのメッセージを消費することを示すことができます。

@Incoming("in1")
@Incoming("in2")
public String process(String s) {
    // get messages from channel-1 and channel-2
    return s.toUpperCase();
}

ストリーム処理

一部の高度なシナリオでは、個々のメッセージではなく、メッセージのストリームを直接操作できます。

着信署名と発信署名でリンク:https://smallrye.io/smallrye-mutiny/latest/[Mutiny APIs] を使用すると、メッセージのストリームを処理できます。

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class StreamProcessor {

    @Incoming("source")
    @Outgoing("sink")
    public Multi<String> process(Multi<String> in) {
        return in.map(String::toUpperCase);
    }

}

実行モデル

Quarkus Messaging は、Quarkus の リアクティブエンジン の上に配置され、リンク:https://vertx.io/[Eclipse Vert.x] を活用して処理用にメッセージをディスパッチします。 以下の 3 つの実行モードをサポートしています。

  • イベントループ: メッセージが Vert.x I/O スレッドにディスパッチされます。 イベントループではブロッキング操作を実行しないでください。

  • ワーカースレッド: メッセージはワーカースレッドプールにディスパッチされます。

  • 仮想スレッド: メッセージは仮想スレッドにディスパッチされます (Java 21 以降が必要です)。 仮想スレッドはプールされないため、メッセージごとに新しい仮想スレッドが作成されます。 詳細は、専用の Quarkus Virtual Thread サポート ガイドを参照してください。

Quarkus はメソッド署名に基づいてデフォルトの実行モードを選択します。 メソッド署名が synchronous の場合、メッセージは ワーカースレッド にディスパッチされ、それ以外の場合はデフォルトで イベントループ にディスパッチされます。

メソッド署名 デフォルトの実行モード

@Incoming("source") void process(String payload)

ワークスレッド

@Incoming("source") Uni<Void> process(String payload)

イベントループ

@Incoming("source") CompletionStage<Void> process(Message<String> message)

イベントループ

@Incoming("source") @Outgoing("sink") Multi<R> process(Multi<T> in)

ストリーム処理メソッドは起動時に実行され、各メッセージはイベントループにディスパッチされます。

アノテーションを使用すると、実行モデルを細かく制御できます。

  • リンク:https://javadoc.io/doc/io.smallrye.reactive/smallrye-reactive-messaging-api/latest/io/smallrye/reactive/messaging/annotations/Blocking.html [@Blocking] は、メソッドをワーカースレッドプールで強制的に実行します。 ワーカースレッドのデフォルトプールは、すべてのチャネル間で共有されます。 @Blocking("my-custom-pool") を使用すると、カスタムスレッドプールを使用してチャネルを設定できます。 設定プロパティー smallrye.messaging.worker.my-custom-pool.max-concurrency は、プール内のスレッドの最大数を指定します。 ブロッキング処理の詳細は、リンク:http://smallrye.io/smallrye-reactive-messaging/4.8.0/concepts/blocking/[SmallRye Reactive Messaging のドキュメント] を参照してください。

  • @NonBlocking は、メソッドをイベントループスレッドで強制的に実行します。

  • @RunOnVirtualThread は、メソッドを仮想スレッド上で強制的に実行します。 仮想スレッドの軽量な性質を活用するために、@RunOnVirtualThread アノテーションが付けられたメソッドのデフォルトの最大同時実行数は 1024 です。 これは、smallrye.messaging.worker.<virtual-thread>.max-concurrency 設定プロパティーを設定することで変更できます。 または、@Blocking("my-custom-pool") アノテーションと一緒に使用することで変更できます。

@Transactional アノテーションの存在は、実行をブロックすることを意味します。

メッセージングアプリケーションでは、生成および消費されるメッセージは順序付けられたイベントのストリームを構成します。 これは、(トピックまたはキュー内に) ブローカーによって強制されるか、 アプリケーションの受付および発行の順序によって強制されます。 この順序を維持するために、Quarkus Messaging はデフォルトでメッセージを順番にディスパッチします。 @Blocking(ordered = false)`または `@RunOnVirtualThread アノテーションを使用して、この動作をオーバーライドできます。

着信チャネルの同時実行

一部のコネクターは、着信チャネルの同時実行レベルの設定をサポートしています。

mp.messaging.incoming.my-channel.concurrency=4

これにより、内部的に着信チャネルのコピーが 4 つ作成され、同じ処理方法にワイヤリングされます。 これは、ブローカーテクノロジーによっては、異なるコピーで受信されたメッセージの部分的な順序を保持したまま、 複数のメッセージを同時に処理することでアプリケーションのスループットを向上させる際に役立つ場合があります。 たとえば、複数のコンシューマーが異なるトピックパーティションを消費できる Kafka の場合がこれに該当します。

ヘルスチェック

Quarkus Messaging エクステンションは、Smalllye Health エクステンションと組み合わせることで、チャネルごとにヘルスチェックのサポートを提供します。 startupreadiness、および liveness チェックの実装はコネクターによって異なります。 一部のコネクターでは、ヘルスチェックの動作を設定したり、完全にまたはチャネルごとに無効にしたりできます。

チャネルのヘルスチェックは、quarkus.messaging.health.<channel-name>.enabled を使用して無効化できます。またはヘルスチェックタイプごとに無効化できます。 (例: quarkus.messaging.health.<channel-name>.liveness.enabled)

quarkus.messaging.health.enabled 設定プロパティーを false に設定すると、メッセージングのヘルスチェックが完全に無効になります。

可観測性

Micrometer メトリクス

Quarkus Messaging エクステンションは、メッセージングシステムのヘルスを監視するためのシンプルかつ便利なメトリクスを提供します。 Micrometer エクステンション はこれらのメトリクスを公開します。

channel タグで識別されるチャネルごとに、以下のメトリクスを収集できます。

  • quarkus.messaging.message.count : 生成または受信したメッセージの数

  • quarkus.messaging.message.acks : 正常に処理されたメッセージの数

  • quarkus.messaging.message.failures : 処理に失敗したメッセージの数

  • quarkus.messaging.message.duration : メッセージの処理時間

下位互換性のため、チャネルメトリクスはデフォルトでは有効化されていませんが、smallrye.messaging.observation.enabled=true で有効化することができます。

OpenTelemetry トレース

一部の Quarkus Messaging コネクターは、OpenTelemetry Tracing とすぐに統合されます。 OpenTelemetry エクステンション が存在する場合、発信メッセージは現在のトレース範囲を伝播します。 着信チャネルでは、受信したメッセージにトレース情報が含まれている場合、メッセージ処理はメッセージスパンを親として継承します。

次の設定を使用して、特定のチャネルのトレースを無効にすることができます。

mp.messaging.incoming.data.tracing-enabled=false

TLS Configuration

Some messaging extensions integrate with the Quarkus TLS Registry to configure the underlying client. To configure the TLS on a channel, you need to provide the named TLS configuration to the tls-configuration-name property:

quarkus.tls.my-tls-config.trust-store=truststore.jks
quarkus.tls.my-tls-config.trust-store-password=secret
mp.messaging.incoming.my-channel.tls-configuration-name=my-tls-config

Or you can configure it globally on all channels of a connector:

mp.messaging.connector.smallrye-pulsar.tls-configuration-name=my-tls-config

Currently, the following messaging extensions support configuration through the Quarkus TLS Registry:

  • Kafka: Provides the ssl.engine.factory.class property for the Kafka client.

  • Pulsar: Only mTLS authentication is supported.

  • RabbitMQ

  • AMQP 1.0

  • MQTT

テスト

Dev Services を使用したテスト

ほとんどの Quarkus Messaging エクステンションは、アプリケーションの開発とテストを簡素化する Dev Service を提供します。 Dev Service は、Quarkus Messaging エクステンションですぐに使用できるように設定されたブローカーインスタンスを作成します。

During testing Quarkus creates a separate broker instance to run the tests against it.

プラットフォームエクステンションによって提供される Dev Services のリストを含む Dev Services の詳細は、Dev Services ガイドを参照してください。

InMemoryConnector を使用したテスト

ブローカーを起動せずにアプリケーションをテストすると便利です。 これを実現するには、コネクターで管理しているチャンネルを in-memory切り替え できます。

このアプローチは、JVM テストでのみ機能します。インジェクションには対応していないため、ネイティブテストには使用できません。

以下のサンプルアプリケーションをテストするとします。

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

@ApplicationScoped
public class MyMessagingApplication {

    @Inject
    @Channel("words-out")
    Emitter<String> emitter;

    public void sendMessage(String out) {
        emitter.send(out);
    }

    @Incoming("words-in")
    @Outgoing("uppercase")
    public Message<String> toUpperCase(Message<String> message) {
        return message.withPayload(message.getPayload().toUpperCase());
    }

}

まず、以下のテスト依存関係をアプリケーションに追加します。

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-reactive-messaging-in-memory</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")

そして、以下のように Quarkus Test Resource を作成します。

public class InMemoryConnectorLifecycleManager implements QuarkusTestResourceLifecycleManager {

    @Override
    public Map<String, String> start() {
        Map<String, String> env = new HashMap<>();
        Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("words-in");   (1)
        Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("uppercase");  (2)
        Map<String, String> props3 = InMemoryConnector.switchOutgoingChannelsToInMemory("words-out");  (3)
        env.putAll(props1);
        env.putAll(props2);
        env.putAll(props3);
        return env;  (4)
    }

    @Override
    public void stop() {
        InMemoryConnector.clear();  (5)
    }
}
1 着信チャネルl words-in (消費されたメッセージ) をin-memory に切り替えます。
2 発信チャネル words-out (生成されたメッセージ) を in-memory に切り替えます。
3 発信チャネル uppercase (処理済みメッセージ) を in-memory に切り替えます。
4 in-memory チャネルを使用するためのアプリケーション設定に必要なすべてのプロパティーを含む Map をビルドして返します。
5 テストが停止したら、InMemoryConnector をクリアします (受信したメッセージと送信したメッセージをすべて破棄してください)。

上記で作成したテストリソースを使用して @QuarkusTest を作成します。

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import io.smallrye.reactive.messaging.memory.InMemorySink;
import io.smallrye.reactive.messaging.memory.InMemorySource;

import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.junit.jupiter.api.Test;

import jakarta.inject.Inject;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.awaitility.Awaitility.await;

@QuarkusTest
@QuarkusTestResource(InMemoryConnectorLifecycleManager.class)
class MyMessagingApplicationTest {

    @Inject
    @Connector("smallrye-in-memory")
    InMemoryConnector connector; (1)

    @Inject
    MyMessagingApplication app;

    @Test
    void test() {
        InMemorySink<String> wordsOut = connector.sink("words-out"); (2)
        InMemorySource<String> wordsIn = connector.source("words-in"); (3)
        InMemorySink<String> uppercaseOut = connector.sink("uppercase"); (4)

        app.sendMessage("Hello"); (5)
        assertEquals("Hello", wordsOut.received().get(0).getPayload()); (6)

        wordsIn.send("Bonjour"); (7)
        await().untilAsserted(() -> assertEquals("BONJOUR", uppercaseOut.received().get(0).getPayload())); (8)
    }
}
1 @Connector または @Any 修飾子を使用して、テストクラスに in-memory コネクターを注入します。
2 発信チャネルの取得 (words-out) - テストリソース内でチャネルが in-memory に切り替えられている必要があります。
3 着信チャネルの取得 (words-in)
4 発信チャネルの取得 (uppercase)
5 注入されたアプリケーション Bean を使用して sendMessage メソッドを呼び出し、チャネル words-out を持つエミッターを使用してメッセージを送信します。
6 アプリケーションによって生成されたメッセージを確認するには、words-out in-memory チャネルの received メソッドを使用します。
7 メッセージを送信するには、words-in in-memory チャネルの send メソッドを使用します。 アプリケーションはこのメッセージを処理し、uppercase チャネルにメッセージを送信します。
8 アプリケーションによって生成されたメッセージを確認するには、uppercase チャネルの received メソッドを使用します。

in-memory コネクターはテスト目的のみに使用されます。 in-memory コネクターを使用する際に考慮すべき注意事項がいくつかあります。

  • in-memory コネクターは、InMemorySource#send メソッドを使用して送信されたオブジェクト (ペイロードまたは設定されたメッセージ) のみを送信します。 アプリケーションメソッドによって受信されるメッセージには、コネクター固有のメタデータは含まれません。

  • デフォルトでは、in-memory チャネルは、ユニットテストのメインスレッドとなる InMemorySource#send メソッドの呼び出し元スレッドでメッセージをディスパッチします。 ただし、他のほとんどのコネクターは、個別の複製された Vert.x コンテキストでコンテキスト伝播ディスパッチメッセージを処理します。

quarkus-test-vertx 依存関係は @io.quarkus.test.vertx.RunOnVertxContext アノテーションを提供します。これをテストメソッドで使用すると、Vert.x コンテキストでテストが実行されます。

テストがコンテキストの伝播に依存している場合は、run-on-vertx-context 属性を使用して in-memory コネクターチャネルを設定し、メッセージや確認応答などのイベントを Vert.x コンテキストにディスパッチできます。 あるいは、InMemorySource#runOnVertxContext メソッドを使用して、この動作を切り替えることもできます。

さらに詳しく

このガイドでは、Quarkus Messaging エクステンションの一般的な原則について説明します。

さらに詳しく知りたい場合は、リンク:http://smallrye.io/smallrye-reactive-messaging/[SmallRye Reactive Messaging] のドキュメントを参照してください。 これには、それぞれのコンセプトなどに関する詳細なドキュメントが含まれています。

関連コンテンツ