The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.
このページを編集

Apache Kafka リファレンスガイド

This reference guide demonstrates how your Quarkus application can utilize Quarkus Messaging to interact with Apache Kafka.

1. はじめに

Apache Kafkaは、人気の高いオープンソースの分散型イベントストリーミングプラットフォームです。高性能なデータパイプライン、ストリーミング分析、データ統合、ミッションクリティカルなアプリケーションなどによく利用されています。メッセージキューやエンタープライズメッセージングプラットフォームに似ており、以下のことが可能です。

  • レコード と呼ばれるイベントのストリームを 発行 (書き込み)したり、 購読 (読み込み)したりすることができます。

  • レコードのストリームを トピック 内に永続的かつ確実に 保存 します。

  • レコードのストリームを発生時または遡及的に 処理 します。

そして、これらの機能はすべて、分散型で、拡張性が高く、弾力性があり、耐障害性があり、安全な方法で提供されます。

2. Apache Kafka のための Quarkus エクステンション

Quarkus は、 SmallRye Reactive Messaging フレームワークを通じて Apache Kafka のサポートを提供します。Eclipse MicroProfile Reactive Messaging 仕様 2.0 に基づいて、CDI とイベント駆動型を橋渡しする柔軟なプログラミングモデルを提案します。

This guide provides an in-depth look on Apache Kafka and SmallRye Reactive Messaging framework. For a quick start take a look at Getting Started to Quarkus Messaging with Apache Kafka.

You can add the messaging-kafka extension to your project by running the following command in your project base directory:

コマンドラインインタフェース
quarkus extension add messaging-kafka
Maven
./mvnw quarkus:add-extension -Dextensions='messaging-kafka'
Gradle
./gradlew addExtension --extensions='messaging-kafka'

これにより、 pom.xml に以下が追加されます:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-kafka")

このエクステンションには、kafka-clients バージョン 3.2.1 が推移的依存関係として含まれており、Kafka ブローカーバージョン 2.x と互換性があります。

3. Smallrye Kafka コネクターの設定

Smallrye Reactive Messaging フレームワークは、Apache Kafka、AMQP、Apache Camel、JMS、MQTT など、さまざまなメッセージングバックエンドをサポートしているため、汎用的な語彙を使用しています。

  • アプリケーションは メッセージ を送受信します。メッセージは payload をラップし、いくつかの metadata で拡張できます。Kafka コネクターを使用すると、メッセージ は Kafka レコード に対応します。

  • メッセージは チャネル を通過します。アプリケーションコンポーネントはチャネルに接続して、メッセージを公開および消費します。Kafka コネクターは チャネル を Kafka トピック にマップします。

  • チャネルは、 コネクター を使用してメッセージバックエンドに接続されます。コネクターは、着信メッセージを特定のチャネル (アプリケーションによって消費される) にマッピングし、特定のチャネルに送信された発信メッセージを収集するように設定されています。各コネクターは、特定のメッセージングテクノロジーに特化しています。たとえば、Kafka を処理するコネクターの名前は smallrye-kafka となっています。

着信チャンネルを持つ Kafka コネクターの最小設定は次のようになります。

%prod.kafka.bootstrap.servers=kafka:9092 (1)
mp.messaging.incoming.prices.connector=smallrye-kafka (2)
1 プロダクションプロファイルのブローカーの場所を設定します。mp.messaging.incoming.$channel.bootstrap.servers プロパティーを使用して、グローバルまたはチャネルごとに設定できます。dev モードとテスト実行時には、Dev Services for Kafka が自動的に Kafka ブローカーを開始します。指定しない場合、このプロパティーのデフォルトは localhost:9092 になります。
2 prices チャネルを管理するようにコネクターを設定します。デフォルトでは、トピック名はチャネル名と同じです。トピック属性を設定することで、それを上書きすることができます。
%prod 接頭辞は、アプリケーションが prod モードで実行されている場合にのみプロパティーが使用されることを示します (つまり、dev または test モードでは使用されません)。詳細は、プロファイルに関するドキュメント を参照してください。
コネクターの自動アタッチ

クラスパスに単一のコネクターがある場合は、connector 属性の設定を省略できます。Quarkus は、orphan チャネルをクラスパスにある (一意の) コネクターに自動的に関連付けます。Orphans チャネルは、ダウンストリームコンシューマーのない outgoing チャネル、またはアップストリームプロデューサーのない incoming チャネルです。

この自動アタッチは、以下を使用して無効にできます。

quarkus.messaging.auto-connector-attachment=false

4. Kafka からのメッセージの受信

直前の最小設定によって、Quarkus アプリケーションはすぐにメッセージペイロードを受信できます。

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

アプリケーションが受信したメッセージを消費する方法は、他にもいくつかあります。

Message
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
    // access record metadata
    var metadata = msg.getMetadata(IncomingKafkaRecordMetadata.class).orElseThrow();
    // process the message payload.
    double price = msg.getPayload();
    // Acknowledge the incoming message (commit the offset)
    return msg.ack();
}

Message タイプを使用すると、消費するメソッドは着信メッセージのメタデータにアクセスし、確認応答を手動で処理することができます。コミットストラテジー で、さまざまな確認応答ストラテジーを検討します。

Kafka レコードオブジェクトに直接アクセスする場合は、次を使用します。

ConsumerRecord
@Incoming("prices")
public void consume(ConsumerRecord<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
    String topic = record.topic();
    int partition = record.partition();
    // ...
}

ConsumerRecord は、基盤となる Kafka クライアントによって提供され、コンシューマーメソッドに直接注入することができます。Record の使用に際して、以下のような別の簡単なアプローチがあります。

Record
@Incoming("prices")
public void consume(Record<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
}

Record は、着信 Kafka レコードのキーとペイロードの単純なラッパーです。

@Channel

または、以下の例のように、アプリケーションで Bean に Multi を注入し、そのイベントをサブスクライブすることもできます。

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }
}

これは、Kafka コンシューマーを別のダウンストリームと統合する方法の良い例で、この例では Server-Sent Events エンドポイントとして公開しています。

@Channel でメッセージを消費する場合、アプリケーションコードがサブスクリプションを行います。 上記の例では、Quarkus REST (旧称 RESTEasy Reactive) エンドポイントがサブスクリプションを処理します。

チャンネルとして注入できるのは、以下のタイプです。

@Inject @Channel("prices") Multi<Double> streamOfPayloads;

@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;

@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;

@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

前出の Message の例と同様に、注入されたチャネルがペイロードを受信した場合 (Multi<T>)、メッセージを自動的に確認応答し、複数のサブスクライバーをサポートします。注入したチャネルが Message (Multi<Message<T>>) を受信した場合は、確認応答とブロードキャストを行う必要があります。ブロードキャストメッセージの送信については、複数のコンシューマーでのメッセージのブロードキャスト で説明します。

@Channel("prices") を注入したり、@Incoming("prices") を持っていたりしても、Kafka からのメッセージを消費するようにアプリケーションが自動的に設定されるわけではありません。mp.messaging.incoming.prices... を使用してインバウンドコネクターを設定するか、アプリケーションのどこかに @Outgoing("prices") メソッドを持っている必要があります (その場合、prices はインメモリーチャネルになります)。

4.1. ブロッキング処理

リアクティブメッセージングは、I/O スレッドでメソッドを呼び出します。このトピックの詳細については、xQuarkus リアクティブアーキテクチャーのドキュメント を参照してください。ただし、多くの場合、リアクティブメッセージングとデータベースインタラクションなどのブロック処理を組み合わせる必要があります。このためには、処理が ブロッキング であり、呼び出し元のスレッドで実行するべきではないことを示す @Blocking アノテーションを使用する必要があります。

例えば、以下のコードは、Hibernate with Panacheを 使用してデータベースに受信ペイロードを格納する方法を示しています。

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

The complete example is available in the kafka-panache-quickstart directory.

@Blocking アノテーションは 2 つあります。

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

効果はどちらも同じです。したがって、両方を使うことができます。最初のものは、使用するワーカープールや順序を保持するかどうかなど、より細かい調整が可能です。2 番目のものは、Quarkus の他のリアクティブ機能でも使用され、デフォルトのワーカープールを使用し、順序を保持します。

@Blocking アノテーションの使用法の詳細については、 SmallRye Reactive Messaging – Handling blocking execution を参照してください。

@RunOnVirtualThread

Java 仮想スレッドでの ブロッキング処理の実行については、 Quarkus Virtual Thread support with Reactive Messagingのドキュメント を参照してください。

@Transactional

メソッドに @Transactional アノテーションが付けられている場合、メソッドに @Blocking アノテーションが付けられていなくても、自動的に blocking と見なされます。

4.2. 確認応答ストラテジー

コンシューマーが受信したすべてのメッセージは確認応答する必要があります。確認応答されない場合、処理はエラーと見なされます。コンシューマーメソッドが Record またはペイロードを受信した場合、メッセージはメソッドの戻り時に確認応答されます。これは、Strategy.POST_PROCESSING としても知られています。コンシューマーメソッドが別のリアクティブストリームまたは CompletionStage を返す場合、ダウンストリームメッセージが確認応答されたときにメッセージが確認応答されます。以下の例のように、デフォルトの動作をオーバーライドして、到着時にメッセージを確認応答する (Strategy.PRE_PROCESSING) か、コンシューマーメソッドでメッセージをまったく確認応答しない (Strategy.NONE) ことができます。

@Incoming("prices")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void process(double price) {
    // process price
}

コンシューマーメソッドが Message を受信した場合、確認応答ストラテジーは Strategy.MANUAL で、コンシューマーメソッドがメッセージの ack/nack を行います。

@Incoming("prices")
public CompletionStage<Void> process(Message<Double> msg) {
    // process price
    return msg.ack();
}

前述のように、このメソッドは確認応答ストラテジーを PRE_PROCESSING または NONE にオーバーライドすることも可能です。

4.3. コミットストラテジー

Kafka レコードから生成されたメッセージが確認応答されると、コネクターはコミットストラテジーを呼び出します。これらのストラテジーは、特定のトピック/パーティションのコンシューマーオフセットがいつコミットされるかを決定します。オフセットをコミットすると、以前のすべてのレコードが処理されたことを示します。また、これは、クラッシュリカバリーまたは再起動後にアプリケーションが処理を再開する位置でもあります。

オフセットを毎回コミットすることは、Kafkaのオフセット管理のオーバーヘッドを増やすため、パフォーマンスペナルティにつながります。ただし、オフセットを十分な頻度でコミットしないと、2 つのコミットの間にアプリケーションがクラッシュした場合に、メッセージが重複する可能性があります。

Kafka コネクターは、以下の 3 つのストラテジーをサポートします。

  • throttled は受信したメッセージを追跡し、最新の確認応答済みメッセージのオフセットを順番にコミットします (つまり、以前のすべてのメッセージも確認応答済みです)。このストラテジーは、チャネルが非同期処理を実行する場合でも、少なくとも 1 回の配信を保証します。コネクターは受信したレコードを追跡し、定期的に (auto.commit.interval.ms で指定された期間、デフォルト: 5000 ms) 最大の連続オフセットをコミットします。レコードに関連付けられたメッセージが throttled.unprocessed-record-max-age.ms (デフォルト: 60000 ms) で確認応答されない場合、コネクターは異常としてマークされます。実際、このストラテジーでは、1 つのレコード処理が失敗してすぐにオフセットをコミットすることはできません (処理失敗時の動作を設定するには、エラー処理ストラテジー を参照)。throttled.unprocessed-record-max-age.ms0 以下に設定されている場合、ヘルスチェックの検証は実行されません。このような設定では、(決して確認応答されない) "poison pill" メッセージがある場合、メモリーが不足する可能性があります。enable.auto.commit が明示的に true に設定されていない場合、このストラテジーがデフォルトになります。

  • checkpoint allows persisting consumer offsets on a state store, instead of committing them back to the Kafka broker. Using the CheckpointMetadata API, consumer code can persist a processing state with the record offset to mark the progress of a consumer. When the processing continues from a previously persisted offset, it seeks the Kafka consumer to that offset and also restores the persisted state, continuing the stateful processing from where it left off. The checkpoint strategy holds locally the processing state associated with the latest offset, and persists it periodically to the state store (period specified by auto.commit.interval.ms (default: 5000)). The connector will be marked as unhealthy if no processing state is persisted to the state store in checkpoint.unsynced-state-max-age.ms (default: 10000). If checkpoint.unsynced-state-max-age.ms is set to less than or equal to 0, it does not perform any health check verification. For more information, see チェックポイントによるステートフル処理

  • latest commits the record offset received by the Kafka consumer as soon as the associated message is acknowledged (if the offset is higher than the previously committed offset). This strategy provides at-least-once delivery if the channel processes the message without performing any asynchronous processing. Specifically, the offset of the most recent acknowledged message will always be committed, even if older messages have not finished being processed. In case of an incident such as a crash, processing would restart after the last commit, leading to older messages never being successfully and fully processed, which would appear as message loss. This strategy should not be used in high load environment, as offset commit is expensive. However, it reduces the risk of duplicates.

  • ignore はコミットを実行しません。このストラテジーは、コンシューマーが enable.auto.commit を true に明示的に設定されている場合のデフォルトのストラテジーです。これは、オフセットコミットを基盤となる Kafka クライアントに委任します。enable.auto.committrue の場合、このストラテジーは少なくとも 1 回の配信を保証しません。SmallRye Reactive Messaging はレコードを非同期で処理するため、ポーリングされたがまだ処理されていないレコードに対してオフセットがコミットされる場合があります。エラーが発生した場合、まだコミットされていないレコードのみが再処理されます。

Kafka コネクターは、明示的に有効にされていない場合、Kafka 自動コミットを無効にします。この動作は、従来の Kafka コンシューマーとは異なります。高スループットが重要であり、ダウンストリームに制限されていない場合は、次のいずれかをお勧めします。

  • throttled ポリシーの使用

  • または、enable.auto.commit を true に設定し、消費メソッドに @Acknowledgment(Acknowledgment.Strategy.NONE) のアノテーションを付けます。

Smallrye Reactive Messaging を使用すると、カスタム コミット戦略を実装できます。 詳細については、https://smallrye.io/smallrye-reactive-messaging/latest/kafka/recoming-kafka-records/#acknowledgement[SmallRye Reactive Messaging のドキュメント] を参照してください。

4.4. エラー処理ストラテジー

Kafka レコードから生成されたメッセージが nack された場合、エラーストラテジーが適用されます。Kafka コネクターは、次の 3 つのストラテジーをサポートしています。

  • fail: アプリケーションを失敗させ、それ以上のレコードは処理されません (デフォルトストラテジー)。正しく処理されなかったレコードのオフセットはコミットされません。

  • ignore: エラーはログに記録されますが、処理は続行されます。正しく処理されなかったレコードのオフセットがコミットされます。

  • dead-letter-queue: 正しく処理されなかったレコードのオフセットはコミットされますが、レコードは Kafka デッドレタートピックに書き込まれます。

ストラテジーは failure-strategy 属性を使用して選択します。

dead-letter-queue の場合、以下の属性を設定することができます。

  • dead-letter-queue.topic: 正しく処理されなかったレコードを書き込むために使用するトピック。デフォルトは dead-letter-topic-$channel で、$channel はチャンネルの名前になります。

  • dead-letter-queue.key.serializer: デッドレターキューにレコードキーを書き込むために使用されるシリアライザー。デフォルトでは、キーデシリアライザーからシリアライザーを推測します。

  • dead-letter-queue.value.serializer: デッドレターキューにレコード値を書き込むために使用されるシリアライザー。デフォルトでは、値デシリアライザーからシリアライザーを推測します。

デッドレターキューに書き込まれたレコードには、元のレコードに関する一連の追加ヘッダーが含まれています。

  • dead-letter-reason: 失敗の理由

  • dead-letter-cause: エラーの原因 (エラーがある場合)

  • dead-letter-topic: レコードの元のトピック

  • dead-letter-partition: レコードの元のパーティション (String にマップされた integer)

  • dead-letter-offset: レコードの元のオフセット (String にマップされた long)

Smallrye Reactive Messaging を使用すると、カスタム 失敗戦略を実装できます。 詳細については、https://smallrye.io/smallrye-reactive-messaging/latest/kafka/recoming-kafka-records/#acknowledgement[SmallRye Reactive Messaging のドキュメント] を参照してください。

4.4.1. 処理のリトライ

Reactive Messaging を SmallRye Fault Tolerance と組み合わせて、失敗した場合は処理をリトライできます。

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
public void consume(String v) {
   // ... retry if this method throws an exception
}

遅延、再試行回数、ジッターなどを設定できます。

メソッドが Uni または CompletionStage を返す場合は、@NonBlocking アノテーションを追加する必要があります。

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
@NonBlocking
public Uni<String> consume(String v) {
   // ... retry if this method throws an exception or the returned Uni produce a failure
}
`@NonBlocking`アノテーションは、 SmallRye Fault Tolerance 5.1.0 以前でのみ必要です。SmallRye Fault Tolerance 5.2.0 以降 (Quarkus 2.1.0.Final 以降で使用可能) では必要ありません。詳細は、 SmallRye Fault Tolerance documentation を参照してください。

着信メッセージは、処理が正常に完了したときにのみ確認応答されます。したがって、着信メッセージは、処理が成功した後にオフセットをコミットします。それでも処理が失敗する場合は、すべての再試行後でも、メッセージは nack され、エラーストラテジーが適用されます。

4.4.2. デシリアライゼーション失敗時の処理

デシリアライゼーションがエラーが発生したとき、それをインターセプトしてエラーストラテジーを提供することができます。これを実現するには、DeserializationFailureHandler<T> インターフェイスを実装した Bean を作成する必要があります。

@ApplicationScoped
@Identifier("failure-retry") // Set the name of the failure handler
public class MyDeserializationFailureHandler
    implements DeserializationFailureHandler<JsonObject> { // Specify the expected type

    @Override
    public JsonObject decorateDeserialization(Uni<JsonObject> deserialization, String topic, boolean isKey,
            String deserializer, byte[] data, Headers headers) {
        return deserialization
                    .onFailure().retry().atMost(3)
                    .await().atMost(Duration.ofMillis(200));
    }
}

このエラーハンドラーを使用するには、Bean を @Identifier 修飾子で公開し、コネクター設定で属性 mp.messaging.incoming.$channel.[key|value]-deserialization-failure-handler を指定する必要があります (キーまたは値のデシリアライザー用)。

ハンドラーは、Uni<T> として表されるアクションを含む、デシリアライゼーションの詳細とともに呼び出されます。再試行などのデシリアライゼーション Uni エラーストラテジーでは、フォールバック値の提供やタイムアウトの適用を実装することができます。

If you don’t configure a deserialization failure handler and a deserialization failure happens, the application is marked unhealthy. You can also ignore the failure, which will log the exception and produce a null value. To enable this behavior, set the mp.messaging.incoming.$channel.fail-on-deserialization-failure attribute to false.

If the fail-on-deserialization-failure attribute is set to false and the failure-strategy attribute is dead-letter-queue the failed record will be sent to the corresponding dead letter queue topic.

4.5. コンシューマーグループ

Kafka では、コンシューマーグループは、トピックからのデータを消費するために協力する一連のコンシューマーです。トピックは一連のパーティションに分割されます。トピックのパーティションは、グループ内のコンシューマー間で割り当てられ、消費スループットを効果的にスケーリングできます。各パーティションは、グループからの単一のコンシューマーに割り当てられることに注意してください。ただし、パーティションの数がグループ内のコンシューマーの数よりも多い場合は、コンシューマーに複数のパーティションを割り当てることができます。

ここでは、さまざまなプロデューサー/コンシューマーパターンと、Quarkus を使用したその実装方法について簡単に説明します。

  1. コンシューマーグループ内の単一のコンシューマースレッド

    これは、Kafka トピックをサブスクライブするアプリケーションのデフォルトの動作です。各 Kafka コネクターは、単一のコンシューマースレッドを作成し、それを単一のコンシューマーグループ内に配置します。コンシューマグループ ID のデフォルトは、quarkus.application.name 設定プロパティーで設定されたアプリケーション名です。これは、kafka.group.id プロパティーを使用して設定することもできます。

    Architecture
  2. コンシューマーグループ内の複数のコンシューマースレッド

    For a given application instance, the number of consumers inside the consumer group can be configured using mp.messaging.incoming.$channel.concurrency property. The partitions of the subscribed topic will be divided among the consumer threads. Note that if the concurrency value exceed the number of partitions of the topic, some consumer threads won’t be assigned any partitions.

    Architecture
    非推奨

    The concurrency attribute provides a connector agnostic way for non-blocking concurrent channels and replaces the Kafka connector specific partitions attribute. The partitions attribute is therefore deprecated and will be removed in future releases.

  3. コンシューマーグループ内の複数のコンシューマーアプリケーション

    前の例と同様に、アプリケーションの複数のインスタンスは、mp.messaging.incoming.$channel.group.id プロパティーを介して設定された単一のコンシューマーグループにサブスクライブすることも、アプリケーション名をデフォルトのままにすることもできます。これにより、トピックのパーティションがアプリケーションインスタンス間で分割されます。

    Architecture
  4. Pub/Sub: トピックにサブスクライブしている複数のコンシューマーグループ

    最後に、異なるアプリケーションは、異なる コンシューマーグループ ID を使用して同じトピックに個別にサブスクライブすることができます。たとえば、orders というトピックに公開されたメッセージは、2 つのコンシューマーアプリケーションで個別に消費することができます。1 つは mp.messaging.incoming.orders.group.id=invoicing で、もう 1 つは mp.messaging.incoming.orders.group.id=shipping で消費されます。したがって、さまざまなコンシューマーグループが、メッセージの消費要件に応じて独立してスケーリングすることができます。

    Architecture

一般的なビジネス要件は、Kafka レコードを順番に消費して処理することです。Kafka ブローカーは、トピック内ではなく、パーティション内のレコードの順序を保持します。したがって、レコードがトピック内でどのようにパーティショニングされるかを考えることが重要となります。デフォルトのパーティショナーは、レコードキーハッシュを使用してレコードのパーティションを計算するか、キーが定義されていない場合は、バッチまたはレコードごとにランダムにパーティションを選択します。

通常の操作中、Kafka コンシューマーは、割り当てられた各パーティション内のレコードの順序を保持します。Smallrye Reactive Messaging は、@Blocking(ordered = false) が使用されていない限り、この順序を処理のために保持します (ブロッキング処理を参照)。

コンシューマーのリバランスにより、Kafka コンシューマーは、単一レコードの少なくとも 1 回の処理のみを保証します。つまり、コミットされていないレコードは、コンシューマーによって再度処理 できます

4.5.1. コンシューマーリバランスリスナー

コンシューマーグループ内では、新しいグループメンバーが到着し、古いメンバーが離れると、パーティションが再割り当てされ、各メンバーにパーティションが比例配分されます。これは、グループのリバランスとして知られています。オフセットコミットとパーティション割り当てを自分で処理するために、コンシューマーリバランスリスナーを提供することができます。これを実現するには、io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener インターフェイスを実装し、@Idenfier 修飾子を使用して CDI Bean として公開します。一般的な使用例として、オフセットを別のデータストアに格納して exactly-once セマンティックを実装したり、特定のオフセットで処理を開始したりすることが挙げられます。

リスナーは、コンシューマートピック/パーティションの割り当てが変更されるたびに呼び出されます。たとえば、アプリケーションが起動すると、コンシューマーに関連付けられたトピック/パーティションの初期セットを使用して partitionsAssigned コールバックが呼び出されます。後でこのセットが変更された場合、partitionsRevoked および partitionsAssigned コールバックが再度呼び出されるため、カスタムロジックを実装することができます。

リバランスリスナーメソッドは Kafka ポーリングスレッドから呼び出され、完了するまで呼び出し元のスレッドをブロックすることに注意してください。これは、リバランスプロトコルに同期バリアがあり、リバランスリスナーで非同期コードを使用すると、同期バリアの後に実行される可能性があるためです。

トピック/パーティションがコンシューマーから割り当てられるか取り消されると、メッセージの配信が一時停止され、リバランスが完了すると再開されます。

リバランスリスナーがユーザーに代わってオフセットコミットを処理する場合 ( NONE コミットストラテジーを使用)、リバランスリスナーは partitionsRevoked コールバックでオフセットを同期的にコミットする必要があります。また、アプリケーションが停止したときに同じロジックを適用することをお勧めします。

Apache Kafka の ConsumerRebalanceListener とは異なり、io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener メソッドは、Kafka コンシューマーとトピック/パーティションのセットを渡します。

以下の例では、最大 10 分前 (またはオフセット 0) からのメッセージで常に開始するコンシューマーを設定します。まず、 io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener を実装し、 io.smallrye.common.annotation.Identifier でアノテーションが付けられた Bean を提供する必要があります。次に、この Bean を使用するようにインバウンドコネクターを設定する必要があります。

package inbound;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.TopicPartition;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

@ApplicationScoped
@Identifier("rebalanced-example.rebalancer")
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener {

    private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName());

    /**
     * When receiving a list of partitions, will search for the earliest offset within 10 minutes
     * and seek the consumer to it.
     *
     * @param consumer   underlying consumer
     * @param partitions set of assigned topic partitions
     */
    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        long now = System.currentTimeMillis();
        long shouldStartAt = now - 600_000L; //10 minute ago

        Map<TopicPartition, Long> request = new HashMap<>();
        for (TopicPartition partition : partitions) {
            LOGGER.info("Assigned " + partition);
            request.put(partition, shouldStartAt);
        }
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(request);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> position : offsets.entrySet()) {
            long target = position.getValue() == null ? 0L : position.getValue().offset();
            LOGGER.info("Seeking position " + target + " for " + position.getKey());
            consumer.seek(position.getKey(), target);
        }
    }

}
package inbound;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class KafkaRebalancedConsumer {

    @Incoming("rebalanced-example")
    @Acknowledgment(Acknowledgment.Strategy.NONE)
    public CompletionStage<Void> consume(Message<ConsumerRecord<Integer, String>> message) {
        // We don't need to ACK messages because in this example,
        // we set offset during consumer rebalance
        return CompletableFuture.completedFuture(null);
    }

}

提供されたリスナーを使用するようにインバウンドコネクターを設定するには、コンシューマリバランスリスナーの識別子を設定します: mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer

または、リスナーの名前をグループ ID と同じにします:

mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer

コンシューマーリバランスリスナーの名前の設定は、グループ ID の使用よりも優先されます。

4.5.2. 一意のコンシューマーグループの活用

あるトピックの (先頭からの) すべてのレコードを処理したい場合は、以下を実行してください。

  1. auto.offset.reset = earliest の設定

  2. 他のアプリケーションで使用されていないコンシューマーグループへのコンシューマーの割り当て

Quarkus は、実行のたびに変更される UUID を生成します (dev モードを含む)。したがって、他のコンシューマーがそれを使用していないことを確認すると、アプリケーションが起動するたびに新しい一意のグループ ID を受け取ることになります。

その生成された UUID をコンシューマーグループとして、以下のように使用することができます。

mp.messaging.incoming.your-channel.auto.offset.reset=earliest
mp.messaging.incoming.your-channel.group.id=${quarkus.uuid}
group.id 属性が設定されていない場合、quarkus.application.name 設定プロパティーがデフォルトになります。

4.5.3. Manual topic-partition assignment

The assign-seek channel attribute allows manually assigning topic-partitions to a Kafka incoming channel, and optionally seek to a specified offset in the partition to start consuming records. If assign-seek is used, the consumer will not be dynamically subscribed to topics, but instead will statically assign the described partitions. In manual topic-partition rebalancing doesn’t happen and therefore rebalance listeners are never called.

The attribute takes a list of triplets separated by commas: <topic>:<partition>:<offset>.

For example, the configuration

mp.messaging.incoming.data.assign-seek=topic1:0:10, topic2:1:20

assigns the consumer to:

  • Partition 0 of topic 'topic1', setting the initial position at offset 10.

  • Partition 1 of topic 'topic2', setting the initial position at offset 20.

The topic, partition, and offset in each triplet can have the following variations:

  • If the topic is omitted, the configured topic will be used.

  • If the offset is omitted, partitions are assigned to the consumer but won’t be sought to offset.

  • If offset is 0, it seeks to the beginning of the topic-partition.

  • If offset is -1, it seeks to the end of the topic-partition.

4.6. バッチでの Kafka レコードの受信

デフォルトでは、着信メソッドは各 Kafka レコードを個別に受信します。内部的には、Kafka コンシューマークライアントはブローカーを絶えずポーリングし、 ConsumerRecords コンテナー内に表示されるレコードをバッチで受信します。

バッチ モードでは、アプリケーションは、コンシューマー ポーリング から返されたすべてのレコードを一度に受信できます。

これを実現するには、すべてのデータを受信するための互換性のあるコンテナータイプを指定する必要があります。

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

The incoming method can also receive Message<List<Payload>>, Message<ConsumerRecords<Key, Payload>>, and ConsumerRecords<Key, Payload> types. They give access to record details such as offset or timestamp:

@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<ConsumerRecords<String, Double>> records) {
    for (ConsumerRecord<String, Double> record : records.getPayload()) {
        String payload = record.getPayload();
        String topic = record.getTopic();
        // process messages
    }
    // ack will commit the latest offsets (per partition) of the batch.
    return records.ack();
}

着信レコードバッチの処理が成功すると、バッチ内で受信した各パーティションの最新のオフセットがコミットされることに注意してください。設定されたコミットストラテジーは、これらのレコードにのみ適用されます。

逆に、処理が例外をスローした場合、すべてのメッセージは nack され、バッチ内のすべてのレコードにエラーストラテジーが適用されます。

Quarkus は、着信チャネルのバッチタイプを自動検出し、バッチ設定を自動的に設定します。mp.messaging.incoming.$channel.batch プロパティーを使用して、バッチモードを明示的に設定できます。

4.7. チェックポイントによるステートフル処理

checkpoint コミットストラテジーは実験的な API であり、将来変更される可能性があります。

Smallrye Reactive Messaging checkpoint commit strategy allows consumer applications to process messages in a stateful manner, while also respecting Kafka consumer scalability. An incoming channel with checkpoint commit strategy persists consumer offsets on an external state store, such as a relational database or a key-value store. As a result of processing consumed records, the consumer application can accumulate an internal state for each topic-partition assigned to the Kafka consumer. This local state will be periodically persisted to the state store and will be associated with the offset of the record that produced it.

この戦略では、Kafkaブローカーにオフセットをコミットしないため、新しいパーティションがコンシューマーに割り当てられたとき、つまりコンシューマーの再起動やコンシューマーグループのインスタンスがスケールしたときに、コンシューマーは保存した状態で最新の チェックポイント済み オフセットから処理を再開する。

@Incoming チャネルのコンシューマーコードは、 CheckpointMetadata API を通して処理状態を操作することができます。例えば、Kafkaトピックで受信した価格の移動平均を計算するコンシューマは、以下のようになります:

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointMetadata;

@ApplicationScoped
public class MeanCheckpointConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> record) {
        // Get the `CheckpointMetadata` from the incoming message
        CheckpointMetadata<AveragePrice> checkpoint = CheckpointMetadata.fromMessage(record);

        // `CheckpointMetadata` allows transforming the processing state
        // Applies the given function, starting from the value `0.0` when no previous state exists
        checkpoint.transform(new AveragePrice(), average -> average.update(record.getPayload()), /* persistOnAck */ true);

        // `persistOnAck` flag set to true, ack will persist the processing state
        // associated with the latest offset (per partition).
        return record.ack();
    }

    static class AveragePrice {
        long count;
        double mean;

        AveragePrice update(double newPrice) {
            mean += ((newPrice - mean) / ++count);
            return this;
        }
    }
}

transform メソッドは、現在の状態に変換関数を適用し、変更された状態を生成して、チェックポイントのためにローカルに登録します。デフォルトでは、ローカルの状態は、 auto.commit.interval.ms で指定された期間、定期的にステートストアに永続化されます(デフォルト:5000)。 persistOnAck フラグが与えられている場合、最新の状態は、メッセージの確認応答時にステートストアにeagerlyに永続化されます。 setNext メソッドも同様に、最新の状態を直接設定するように動作します。

チェックポイントコミット戦略は、各トピックパーティションの処理状態が最後に永続化されたタイミングを追跡します。未解決の状態変更が checkpoint.unsynced-state-max-age.ms (デフォルト: 10000) の間、永続化できない場合、チャネルは不健全とマークされます。

4.7.1. ステートストア

ステートストアの実装は、処理状態をどこに、どのように永続化するかを決定します。これは、 mp.messaging.incoming.[channel-name].checkpoint.state-store プロパティによって設定されます。ステートオブジェクトのシリアライゼーションは、ステートストアの実装に依存します。ステートストアにシリアライズを指示するためには、 mp.messaging.incoming.[channel-name].checkpoint.state-type プロパティを使用してステートオブジェクトのクラス名を設定する必要があります。

Quarkusは、以下のステートストアの実装を提供しています:

  • quarkus-redis: Uses the quarkus-redis-client extension to persist processing states. Jackson is used to serialize processing state in Json. For complex objects it is required to configure the checkpoint.state-type property with the class name of the object. By default, the state store uses the default redis client, but if a named client is to be used, the client name can be specified using the mp.messaging.incoming.[channel-name].checkpoint.quarkus-redis.client-name property. Processing states will be stored in Redis using the key naming scheme [consumer-group-id]:[topic]:[partition].

例えば先のコードの設定は次のようになります:

mp.messaging.incoming.prices.group.id=prices-checkpoint
# ...
mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-redis
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.MeanCheckpointConsumer.AveragePrice
# ...
# if using a named redis client
mp.messaging.incoming.prices.checkpoint.quarkus-redis.client-name=my-redis
quarkus.redis.my-redis.hosts=redis://localhost:7000
quarkus.redis.my-redis.password=<redis-pwd>
  • quarkus-hibernate-reactive: Uses the quarkus-hibernate-reactive extension to persist processing states. Processing state objects are required to be a Jakarta Persistence entity and extend the CheckpointEntity class, which handles object identifiers composed of the consumer group id, topic and partition. Therefore, the class name of the entity needs to be configured using the checkpoint.state-type property.

例えば先のコードの設定は次のようになります:

mp.messaging.incoming.prices.group.id=prices-checkpoint
# ...
mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-hibernate-reactive
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.AveragePriceEntity

With AveragePriceEntity being a Jakarta Persistence entity extending CheckpointEntity:

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;

@Entity
public class AveragePriceEntity extends CheckpointEntity {
    public long count;
    public double mean;

    public AveragePriceEntity update(double newPrice) {
        mean += ((newPrice - mean) / ++count);
        return this;
    }
}
  • quarkus-hibernate-orm : quarkus-hibernate-orm エクステンションを使用して、処理状態を永続化します。先ほどのステートストアと似ていますが、Hibernate Reactiveの代わりにHibernate ORMを使用します。

設定されている場合、チェックポイントの状態保存に名前付き persistence-unit を使用することができます。

mp.messaging.incoming.prices.commit-strategy=checkpoint
mp.messaging.incoming.prices.checkpoint.state-store=quarkus-hibernate-orm
mp.messaging.incoming.prices.checkpoint.state-type=org.acme.AveragePriceEntity
mp.messaging.incoming.prices.checkpoint.quarkus-hibernate-orm.persistence-unit=prices
# ... Setup "prices" persistence unit
quarkus.datasource."prices".db-kind=postgresql
quarkus.datasource."prices".username=<your username>
quarkus.datasource."prices".password=<your password>
quarkus.datasource."prices".jdbc.url=jdbc:postgresql://localhost:5432/hibernate_orm_test
quarkus.hibernate-orm."prices".datasource=prices
quarkus.hibernate-orm."prices".packages=org.acme

カスタムステートストアの実装方法については、https://smallrye.io/smallrye-reactive-messaging/3.22.0/kafka/receiving-kafka-records/#implementing-state-stores[Implementing State Stores]を参照してください。

5. Kafka へのメッセージの送信

Kafka コネクターの送信チャネルの設定は、受信の設定と似ています。

%prod.kafka.bootstrap.servers=kafka:9092 (1)
mp.messaging.outgoing.prices-out.connector=smallrye-kafka (2)
mp.messaging.outgoing.prices-out.topic=prices (3)
1 プロダクションプロファイルのブローカーの場所を設定します。mp.messaging.outgoing.$channel.bootstrap.servers プロパティーを使用して、グローバルまたはチャネルごとに設定できます。dev モードとテスト実行時には、Dev Services for Kafka が自動的に Kafka ブローカーを開始します。指定しない場合、このプロパティーのデフォルトは localhost:9092 になります。
2 prices-out チャネルを管理するためのコネクターを設定します。
3 デフォルトでは、トピック名はチャネル名と同じです。トピック属性を設定することで、それを上書きすることができます。

アプリケーション設定内では、チャネル名は一意です。したがって、同じトピックで着信チャネルと送信チャネルを設定する場合は、チャネルに異なる名前を付ける必要があります (たとえば、このガイドの例のように、mp.messaging.incoming.pricesmp.messaging.outgoing.prices-out など)。

次に、アプリケーションはメッセージを生成し、それらを prices-out チャネルに公開できます。以下のスニペットのように、double ペイロードを使用することができます。

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class KafkaPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

コードから直接 @Incoming@Outgoing のアノテーションが付けられたメソッドを呼び出さないでください。これらはフレームワークによって呼び出されます。ユーザーコードがそれらを呼び出すと、期待された結果にはなりません。

generate メソッドは Multi<Double> を返すことに注意してください。これは、Reactive Streams Publisher インターフェイスを実装します。このパブリッシャーは、メッセージを生成し、設定された Kafka トピックに送信するためにフレームワークによって使用されます。

ペイロードを返す代わりに、io.smallrye.reactive.messaging.kafka.Record を返して、キーと値のペアを送信できます。

@Outgoing("out")
public Multi<Record<String, Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> Record.of("my-key", random.nextDouble()));
}

ペイロードを org.eclipse.microprofile.reactive.messaging.Message 内にラップして、書き込まれたレコードをより詳細に制御できます。

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
                            .withKey("my-key")
                            .withTopic("my-key-prices")
                            .withHeaders(new RecordHeaders().add("my-header", "value".getBytes()))
                            .build()));
}

OutgoingKafkaRecordMetadata を使用すると、Kafka レコードのメタデータ属性 (keytopicpartitiontimestamp など) を設定できます。1 つの使用例は、メッセージの宛先トピックを動的に選択することです。この場合、アプリケーション設定ファイル内でトピックを設定する代わりに、送信メタデータを使用してトピックの名前を設定する必要があります。

Reactive Stream Publisher (MultiPublisher の実装) を返すメソッドシグネチャー以外に、送信メソッドは単一のメッセージを返すこともできます。この場合、プロデューサーはこのメソッドをジェネレーターとして使用して、無限のストリームを作成します。

@Outgoing("prices-out") T generate(); // T excluding void

@Outgoing("prices-out") Message<T> generate();

@Outgoing("prices-out") Uni<T> generate();

@Outgoing("prices-out") Uni<Message<T>> generate();

@Outgoing("prices-out") CompletionStage<T> generate();

@Outgoing("prices-out") CompletionStage<Message<T>> generate();

5.1. Sending messages with Emitter

時には、命令的な方法でメッセージを送ることが必要になる場合もあります。

たとえば、REST エンドポイント内で POST リクエストを受信した際に、ストリームにメッセージを送信する必要があるとします。この場合、メソッドにパラメーターがあるため、@Outgoing を使用することはできません。

この場合には Emitter が利用できます。

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("price-create")
    Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        CompletionStage<Void> ack = priceEmitter.send(price);
    }
}

ペイロードを送信すると、メッセージが確認されたときに完了する CompletionStage が返されます。メッセージの送信が失敗した場合、nack の理由を伴って例外扱いで CompletionStage が完了します。

Emitter の設定は、@Incoming@Outgoing で使用される他のストリーム設定と同じ方法で行われます。

Emitter を使用すると、命令型コードから Reactive Messaging にメッセージを送信します。これらのメッセージは送信されるまでキューに保存されます。Kafka プロデューサークライアントが Kafka に送信しようとするメッセージに対応できない場合、このキューはメモリを大量に消費し、メモリ不足になる可能性があります。 @OnOverflow を使用して、バックプレッシャーストラテジーを設定することができます。これにより、キューのサイズ(デフォルトは 256)およびバッファーサイズに達したときに適用するストラテジーを設定できます。利用可能なストラテジーは、DROPLATESTFAILBUFFERUNBOUNDED_BUFFER および NONE です。

Emitter API を使用すると、Message<T> 内に送信ペイロードをカプセル化することもできます。前出の例のように、Message では、ack/nack のケースを異なる方法で処理することができます。

import java.util.concurrent.CompletableFuture;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(Message.of(price)
            .withAck(() -> {
                // Called when the message is acked
                return CompletableFuture.completedFuture(null);
            })
            .withNack(throwable -> {
                // Called when the message is nacked
                return CompletableFuture.completedFuture(null);
            }));
    }
}

Reactive Stream API を使いたい場合は、send メソッドから Uni<Void> を返す MutinyEmitter を使用することができます。これにより、Mutiny API を使用してダウンストリームのメッセージとエラーを処理することができます。

import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.core.MediaType;

import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("price-create")
    MutinyEmitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public Uni<String> addPrice(Double price) {
        return quoteRequestEmitter.send(price)
                .map(x -> "ok")
                .onFailure().recoverWithItem("ko");
    }
}

sendAndAwait メソッドを使用して、イベントをエミッターに送信することをブロックすることもできます。受信者がイベントを ack または nack したときのみ、このメソッドから戻ります。

非推奨

io.smallrye.reactive.messaging.annotations.Emitterio.smallrye.reactive.messaging.annotations.Channelio.smallrye.reactive.messaging.annotations.OnOverflow クラスは現在非推奨となっており、以下のように置き換えられています。

  • org.eclipse.microprofile.reactive.messaging.Emitter

  • org.eclipse.microprofile.reactive.messaging.Channel

  • org.eclipse.microprofile.reactive.messaging.OnOverflow

新しい Emitter.send メソッドは、生成されたメッセージが確認応答されると、CompletionStage の完了を返します。

非推奨

MutinyEmitter#send(Message msg)` メソッドは非推奨となり、以下のメソッドが Message を受信して発信するようになりました。

  • <M extends Message<? extends T>> Uni<Void> sendMessage(M msg)

  • <M extends Message<? extends T>> void sendMessageAndAwait(M msg)

  • <M extends Message<? extends T>> Cancellable sendMessageAndForget(M msg)

Emitter の使用方法の詳細については、https://smallrye.io/smallrye-reactive-messaging/latest/concepts/emitter/[SmallRye Reactive Messaging – Emitters and Channels] を参照してください。

5.2. 確認応答の書き込み

Kafka ブローカーがレコードを受信すると、設定に応じてその確認応答に時間がかかることがあります。また、書き込めないレコードをインメモリーに保存します。

デフォルトでは、コネクターは Kafka がレコードを確認応答するのを待ち、処理を続行します (受信したメッセージの確認応答)。これを無効にするには、waitForWriteCompletion 属性を false に設定します。

acks 属性は、レコードの確認応答に大きく影響することに注意してください。

レコードを書き込めない場合は、メッセージは nack になります。

5.3. バックプレッシャー

Kafka アウトバウンドコネクターはバックプレッシャーを処理し、Kafka ブローカーへの書き込みを待機しているインフライトメッセージの数を監視します。インフライトメッセージの数は、max-inflight-messages 属性を使用して設定され、デフォルトは 1024 になります。

コネクターは、その量のメッセージのみを同時に送信します。少なくとも 1 つのインフライトメッセージがブローカーによって確認応答されるまで、他のメッセージは送信されません。次に、ブローカーのインフライトメッセージの 1 つが確認応答されると、コネクターは Kafka に新しいメッセージを書き込みます。それに応じて、Kafka の batch.sizelinger.ms を設定してください。

max-inflight-messages0 に設定することで、インフライトメッセージの制限を解除することもできます。ただし、リクエスト数が max.in.flight.requests.per.connection に達すると、Kafka プロデューサーがブロックする可能性があることに注意してください。

5.4. メッセージディスパッチの再試行

Kafka プロデューサーがサーバーからエラーを受信した場合、それが一時的な回復可能なエラーである場合、クライアントはメッセージのバッチの送信を再試行します。この動作は、retries および retry.backoff.ms パラメーターによって制御されます。これに加えて、SmallRye Reactive Messaging は、retries および delivery.timeout.ms パラメーターに応じて、回復可能なエラーで個々のメッセージを再試行します。

信頼性の高いシステムにおいては再試行するのがベストプラクティスですが、max.in.flight.requests.per.connection パラメーターのデフォルトは 5 で、これはメッセージの順序が保証されていないことを意味する点に注意してください。使用例でメッセージの順序が必須である場合、max.in.flight.requests.per.connection1 に設定すると、一度に送信されるメッセージのバッチが 1 つになり、その分プロデューサーのスループットが制限されることになります。

エラーの処理に再試行メカニズムを適用する場合は、 [処理のリトライ] のセクションを参照してください。

5.5. シリアライゼーション失敗時の処理

Kafka プロデューサーの場合、クライアントのシリアライゼーションのエラーは回復できないため、メッセージディスパッチは再試行されません。このような場合、シリアライザーのエラーストラテジーを適用する必要があるかもしれません。これを実現するには、SerializationFailureHandler<T> インターフェイスを実装する Bean を作成する必要があります。

@ApplicationScoped
@Identifier("failure-fallback") // Set the name of the failure handler
public class MySerializationFailureHandler
    implements SerializationFailureHandler<JsonObject> { // Specify the expected type

    @Override
    public byte[] decorateSerialization(Uni<byte[]> serialization, String topic, boolean isKey,
        String serializer, Object data, Headers headers) {
        return serialization
                    .onFailure().retry().atMost(3)
                    .await().indefinitely();
    }
}

このエラーハンドラーを使用するには、Bean を @Identifier 修飾子で公開し、コネクター設定で属性 mp.messaging.outgoing.$channel.[key|value]-serialization-failure-handler を指定する必要があります (キーまたは値のデシリアライザー用)。

ハンドラーは、Uni<byte[]> として表されるアクションを含むシリアライゼーションの詳細とともに呼び出されます。メソッドは結果を待機し、シリアライズされたバイト配列を返す必要があることに注意してください。

5.6. インメモリーチャンネル

ユースケースによっては、メッセージングパターンを使って同じアプリケーション内でメッセージを転送することが便利な場合があります。Kafka のようなメッセージングバックエンドにチャネルを接続しない場合、すべてがインメモリーで行われ、ストリームはメソッドをチェーンすることで作成されます。各チェーンは依然としてリアクティブストリームであり、バックプレッシャープロトコルが適用されます。

フレームワークは、プロデューサー/コンシューマーチェーンが完全であることを確認します。つまり、アプリケーションがメッセージをインメモリーチャネルに書き込む場合 (@Outgoing のみを持つメソッド、または Emitter を使用)、アプリケーション内からメッセージを消費する必要もあります (@Incoming のみを持つメソッド、またはアンマネージドストリームを使用)。

5.7. 複数のコンシューマーでのメッセージのブロードキャスト

デフォルトでは、@Incoming メソッドまたは @Channel リアクティブストリームを使用して、チャネルを単一のコンシューマーにリンクすることができます。アプリケーションの起動時に、チャネルが検証され、単一のコンシューマーとプロデューサーを持つコンシューマーとプロデューサーのチェーンが形成されます。この動作は、チャネルで mp.messaging.$channel.broadcast=true を設定することで、オーバーライドすることができます。

インメモリーチャネルの場合、@Broadcast アノテーションを @Outgoing メソッドで使用できます。以下に例を示します。

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Broadcast;

@ApplicationScoped
public class MultipleConsumer {

    private final Random random = new Random();

    @Outgoing("in-memory-channel")
    @Broadcast
    double generate() {
        return random.nextDouble();
    }

    @Incoming("in-memory-channel")
    void consumeAndLog(double price) {
        System.out.println(price);
    }

    @Incoming("in-memory-channel")
    @Outgoing("prices2")
    double consumeAndSend(double price) {
        return price;
    }
}

反対に、mp.messaging.incoming.$channel.merge=true を設定することにより、同じチャネル上の複数のプロデューサーをマージすることができます。@Incoming メソッドでは、@Merge アノテーションを使用して複数のチャネルをマージする方法を制御できます。

送信メソッドや処理メソッドで @Outgoing アノテーションを繰り返すことで、複数の送信チャネルにメッセージをディスパッチする別の方法が可能になる:

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MultipleProducers {

    private final Random random = new Random();

    @Outgoing("generated")
    @Outgoing("generated-2")
    double priceBroadcast() {
        return random.nextDouble();
    }

}

In the previous example generated price will be broadcast to both outbound channels. The following example selectively sends messages to multiple outgoing channels using the Targeted container object, containing key as channel name and value as message payload.

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.Targeted;

@ApplicationScoped
public class TargetedProducers {

    @Incoming("in")
    @Outgoing("out1")
    @Outgoing("out2")
    @Outgoing("out3")
    public Targeted process(double price) {
        Targeted targeted = Targeted.of("out1", "Price: " + price,
                "out2", "Quote: " + price);
        if (price > 90.0) {
            return targeted.with("out3", price);
        }
        return targeted;
    }

}

Note that the auto-detection for Kafka serializers doesn’t work for signatures using the Targeted.

複数出庫の詳細については、 SmallRye Reactive Messaging のドキュメント を参照してください。

5.8. Kafka トランザクション

Kafka トランザクションにより、複数の Kafka トピックおよびパーティションへのアトミックな書き込みが可能になります。Kafka コネクターは、トランザクション内に Kafka レコードを書き込むための KafkaTransactions カスタムエミッターを提供します。これは、通常のエミッター @Channel として注入することができます。

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@ApplicationScoped
public class KafkaTransactionalProducer {

    @Channel("tx-out-example")
    KafkaTransactions<String> txProducer;

    public Uni<Void> emitInTransaction() {
        return txProducer.withTransaction(emitter -> {
            emitter.send(KafkaRecord.of(1, "a"));
            emitter.send(KafkaRecord.of(2, "b"));
            emitter.send(KafkaRecord.of(3, "c"));
            return Uni.createFrom().voidItem();
        });
    }

}

withTransaction メソッドに指定された関数は、レコードの生成用に TransactionalEmitter を受け取り、トランザクションの結果を提供する Uni を返します。

  • 処理が正常に完了すると、プロデューサーはフラッシュされ、トランザクションはコミットされます。

  • 処理が例外を投げるか、失敗した Uni を返すか、あるいは TransactionalEmitter に中止のマークを付けると、トランザクションは中止されます。

Kafka トランザクションプロデューサーでは、acks=all クライアントプロパティーと transactional.id の一意の ID を設定する必要があります。これは、enable.idempotence=true を意味します。Quarkus が送信チャネルの KafkaTransactions の使用を検出すると、チャンネルでこれらのプロパティーを設定し、transactional.id プロパティーに "${quarkus.application.name}-${channelName}" のデフォルト値を提供します。

本番環境で使用する場合、transactional.id はすべてのアプリケーションインスタンスで一意である必要があることに注意してください。

通常のメッセージエミッターは send メソッドへの同時呼び出しをサポートし、結果として Kafka に書き込まれる送信メッセージをキューに入れますが、KafkaTransactions エミッターは一度に 1 つのトランザクションのみをサポートします。トランザクションは、withTransaction の呼び出しから、返された Uni が成功または失敗するまで進行中であると見なされます。トランザクションの進行中に、指定された関数内のネストされたものを含む withTransaction への後続の呼び出しは、IllegalStateException をスローします。

Note that in Reactive Messaging, the execution of processing methods, is already serialized, unless @Blocking(ordered = false) is used. If withTransaction can be called concurrently, for example from a REST endpoint, it is recommended to limit the concurrency of the execution. This can be done using the @Bulkhead annotation from Microprofile Fault Tolerance.

5.8.1. トランザクションを意識したコンシューマー

Kafka トランザクション内で書き込まれ、コミットされたレコードのみを使用する場合は、着信チャネルの isolation.level プロパティーを以下のように設定する必要があります。

mp.messaging.incoming.prices-in.isolation.level=read_committed

6. Kafka Request-Reply

The Kafka Request-Reply pattern allows to publish a request record to a Kafka topic and then await for a reply record that responds to the initial request. The Kafka connector provides the KafkaRequestReply custom emitter that implements the requestor (or the client) of the request-reply pattern for Kafka outbound channels:

It can be injected as a regular emitter @Channel:

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;

@ApplicationScoped
@Path("/kafka")
public class KafkaRequestReplyEmitter {

    @Channel("request-reply")
    KafkaRequestReply<Integer, String> requestReply;

    @POST
    @Path("/req-rep")
    @Produces(MediaType.TEXT_PLAIN)
    public Uni<String> post(Integer request) {
        return requestReply.request(request);
    }

}

The request method publishes the record to the configured target topic of the outgoing channel, and polls a reply topic (by default, the target topic with -replies suffix) for a reply record. When the reply is received the returned Uni is completed with the record value. The request send operation generates a correlation id and sets a header (by default REPLY_CORRELATION_ID), which it expects to be sent back in the reply record.

The replier can be implemented using a Reactive Messaging processor (see メッセージの処理).

For more information on Kafka Request Reply feature and advanced configuration options, see the Smallrye Reactive Messaging Documentation.

7. メッセージの処理

多くの場合、データをストリーミングするアプリケーションは、トピックからいくつかのイベントを消費し、それらを処理して、結果を別のトピックに公開する必要があります。プロセッサーメソッドは、@Incoming アノテーションと @Outgoing アノテーションの両方を使用して簡単に実装することができます。

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("price-in")
    @Outgoing("price-out")
    public double process(double price) {
        return price * CONVERSION_RATE;
    }

}

process メソッドのパラメーターは着信メッセージのペイロードですが、戻り値は送信メッセージのペイロードとして使用されます。Message<T>Record<K, V> など、前述のパラメーターとリターンタイプのシグネチャーもサポートされています。

リアクティブストリーム Multi<T> タイプを消費して返すことにより、非同期ストリーム処理を適用することができます。

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class PriceProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("price-in")
    @Outgoing("price-out")
    public Multi<Double> process(Multi<Integer> prices) {
        return prices.filter(p -> p > 100).map(p -> p * CONVERSION_RATE);
    }

}

7.1. レコードキーの伝播

メッセージを処理するときに、着信レコードキーを送信レコードに伝播できます。

mp.messaging.outgoing.$channel.propagate-record-key=true の設定で有効にすると、レコードキーの伝播が受信レコードと同じ key を持つ送信レコードを生成します。

送信レコードにすでに key が含まれている場合、着信レコードキーによって オーバーライドされません 。着信レコードに null キーがある場合は、mp.messaging.outgoing.$channel.key プロパティーが使用されます。

7.2. Exactly-Once 処理

Kafka Transactions を使用すると、生成されたメッセージとともに、トランザクション内のコンシューマーオフセットを管理できます。これにより、コンシューマーとトランザクションプロデューサーを consume-transform-produce パターンでカップリングすることができます。これは exactly-once 処理 としても知られています。

KafkaTransactions カスタムエミッターは、トランザクション内の着信 Kafka メッセージに exactly-once 処理を適用する方法を提供します。

次の例には、トランザクション内の Kafka レコードのバッチが含まれています。

import jakarta.enterprise.context.ApplicationScoped;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@ApplicationScoped
public class KafkaExactlyOnceProcessor {

    @Channel("prices-out")
    @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 500) (3)
    KafkaTransactions<Integer> txProducer;

    @Incoming("prices-in")
    public Uni<Void> emitInTransaction(Message<ConsumerRecords<String, Integer>> batch) { (1)
        return txProducer.withTransactionAndAck(batch, emitter -> { (2)
            for (ConsumerRecord<String, Integer> record : batch.getPayload()) {
                emitter.send(KafkaRecord.of(record.key(), record.value() + 1)); (3)
            }
            return Uni.createFrom().voidItem();
        });
    }

}
1 バッチ消費モードと一緒に exactly-once 処理を使用することが推奨されます。単一の Kafka メッセージで使用することは可能ですが、パフォーマンスに大きな影響を与えることになります。
2 The consumed message is passed to the KafkaTransactions#withTransactionAndAck in order to handle the offset commits and message acks.
3 send メソッドは、ブローカーからの送信受信を待たずに、トランザクション内で Kafka にレコードを書き込みます。Kafka への書き込みが保留されているメッセージはバッファーリングされ、トランザクションをコミットする前にフラッシュされます。したがって、十分なメッセージ (たとえば、バッチで返されるレコードの最大量である max.poll.records) に適合するように、@OnOverflow bufferSize を設定することを推奨します。
  • トランザクションをコミットする前に 処理が正常に完了すると、指定されたバッチメッセージのトピックパーティションオフセットがトランザクションにコミットされます。

  • トランザクションを中止した後に 、処理を中止する必要がある場合、コンシューマーの位置は最後にコミットされたオフセットにリセットされ、そのオフセットから消費を効果的に再開します。トピックパーティションにコンシューマーオフセットがコミットされていない場合は、オフセットリセットポリシーが latest であっても、コンシューマーの位置はトピックパーティションの先頭にリセットされます。

exactly-once 処理を使用する場合、消費されたメッセージオフセットコミットはトランザクションによって処理されるため、アプリケーションは他の方法でオフセットをコミットしてはいけません。コンシューマーには enable.auto.commit=false (デフォルト) があり、明示的に commit-strategy=ignore を設定する必要があります。

mp.messaging.incoming.prices-in.commit-strategy=ignore
mp.messaging.incoming.prices-in.failure-strategy=ignore

7.2.1. exactly-once 処理のエラー処理

KafkaTransactions#withTransaction から返された Uni は、トランザクションが失敗して中止された場合に失敗します。アプリケーションはエラーケースの処理を選択できますが、失敗した Uni@Incoming メソッドから返された場合、着信チャネルは事実上失敗し、リアクティブストリームを停止します。

KafkaTransactions#withTransactionAndAck メソッドはメッセージを確認して nack しますが、失敗した Uni返しません 。nack されたメッセージは、着信チャネルのエラーストラテジーによって処理されます (エラー処理ストラテジー を参照)。failure-strategy=ignore を設定すると、Kafka コンシューマーは最後にコミットされたオフセットにリセットされ、そこから消費が再開されます。

8. Kafka クライアントへの直接アクセス

まれに、基盤となる Kafka クライアントにアクセスしなければならない場合があります。KafkaClientService は、ProducerConsumer へのスレッドセーフなアクセスを提供します。

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

import org.apache.kafka.clients.producer.ProducerRecord;

import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.kafka.KafkaClientService;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;

@ApplicationScoped
public class PriceSender {

    @Inject
    KafkaClientService clientService;

    void onStartup(@Observes StartupEvent startupEvent) {
        KafkaProducer<String, Double> producer = clientService.getProducer("generated-price");
        producer.runOnSendingThread(client -> client.send(new ProducerRecord<>("prices", 2.4)))
            .await().indefinitely();
    }
}

KafkaClientService は実験的な API であり、将来変更される可能性があります。

Kafka 設定をアプリケーションに注入して、Kafka プロデューサー、コンシューマー、および管理クライアントを直接作成することもできます。

import io.smallrye.common.annotation.Identifier;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import java.util.HashMap;
import java.util.Map;

@ApplicationScoped
public class KafkaClients {

    @Inject
    @Identifier("default-kafka-broker")
    Map<String, Object> config;

    @Produces
    AdminClient getAdmin() {
        Map<String, Object> copy = new HashMap<>();
        for (Map.Entry<String, Object> entry : config.entrySet()) {
            if (AdminClientConfig.configNames().contains(entry.getKey())) {
                copy.put(entry.getKey(), entry.getValue());
            }
        }
        return KafkaAdminClient.create(copy);
    }

}

default-kafka-broker 設定マップには、接頭辞 kafka. または KAFKA_ が付いたすべてのアプリケーションプロパティーが含まれています。設定オプションの詳細については、Kafka 設定の解決 を参照してください。

9. JSON シリアライゼーション

Quarkus には、JSON Kafka メッセージを扱う機能が組み込まれています。

以下のように Fruit のデータクラスがあると想像してみてください。

public class Fruit {

    public String name;
    public int price;

    public Fruit() {
    }

    public Fruit(String name, int price) {
        this.name = name;
        this.price = price;
    }
}

そして、Kafka からメッセージを受信して、何らかの価格変換を行い、Kafka にメッセージを送り返すために使いたいと考えています。

import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;

/**
* A bean consuming data from the "fruit-in" channel and applying some price conversion.
* The result is pushed to the "fruit-out" channel.
*/
@ApplicationScoped
public class FruitProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("fruit-in")
    @Outgoing("fruit-out")
    @Broadcast
    public Fruit process(Fruit fruit) {
        fruit.price = fruit.price * CONVERSION_RATE;
        return fruit;
    }

}

そのためには、Jackson や JSON-B で JSON シリアライゼーションを設定する必要があります。

JSON シリアライゼーションが正しく設定されていれば、Publisher<Fruit>Emitter<Fruit> も利用できます。

9.1. Jackson を介したシリアライズ

Quarkus has built-in support for JSON serialization and deserialization based on Jackson. It will also generate the serializer and deserializer for you, so you do not have to configure anything. When generation is disabled, you can use the provided ObjectMapperSerializer and ObjectMapperDeserializer as explained below.

Jackson を介してすべてのデータオブジェクトをシリアライズするために使用できる既存の ObjectMapperSerializer があります。シリアライザー/デシリアライザーの自動検出 を使用する場合は、空のサブクラスを作成することができます。

デフォルトでは、ObjectMapperSerializer は null を "null" 文字列としてシリアライズします。これは、null を null としてシリアライズする Kafka 設定プロパティー json.serialize.null-as-null=true を設定することでカスタマイズできます。これは、圧縮されたトピックを使用する場合に便利です。なぜなら、null は、圧縮フェーズで削除されるメッセージを知るためのトゥームストーンとし使用されるからです。

対応するデシリアライザークラスはサブクラス化する必要があります。そこで、ObjectMapperDeserializer を拡張する FruitDeserializer を作成しましょう。

package com.acme.fruit.jackson;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

最後に、Jackson シリアライザーとデシリアライザーを使用するようにチャンネルを設定します。

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jackson.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

これで、Kafka メッセージには、Fruit データオブジェクトの Jackson シリアライズ表現が含まれます。この場合、シリアライザー/デシリアライザーの自動検出 がデフォルトで有効になっているので、deserializer の設定は必要ありません。

fruits のリストをデシリアライズしたい場合は、使用する一般的なコレクションを表す Jackson TypeReference を持つデシリアライザーを作成する必要があります。

package com.acme.fruit.jackson;

import java.util.List;
import com.fasterxml.jackson.core.type.TypeReference;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class ListOfFruitDeserializer extends ObjectMapperDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        super(new TypeReference<List<Fruit>>() {});
    }
}

9.2. JSON-B を介したシリアライズ

まず、quarkus-jsonb エクステンションをインクルードする必要があります。

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-jsonb</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-jsonb")

JSON-B を介してすべてのデータオブジェクトをシリアライズするために使用できる既存の JsonbSerializer があります。シリアライザー/デシリアライザーの自動検出 を使用する場合は、空のサブクラスを作成することができます。

デフォルトでは、JsonbSerializer は null を "null" 文字列としてシリアライズします。これは、null を null としてシリアライズする Kafka 設定プロパティー json.serialize.null-as-null=true を設定することでカスタマイズできます。これは、圧縮されたトピックを使用する場合に便利です。なぜなら、null は、圧縮フェーズで削除されるメッセージを知るためのトゥームストーンとし使用されるからです。

対応するデシリアライザークラスはサブクラス化する必要があります。そこで、一般的な JsonbDeserializer を拡張する FruitDeserializer を作成しましょう。

package com.acme.fruit.jsonb;

import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class FruitDeserializer extends JsonbDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

最後に、JSON-B シリアライザーとデシリアライザーを使用するようにチャネルを設定します。

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jsonb.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

これで、Kafka のメッセージには、JSON-B でシリアライズされた Fruit データオブジェクトの表現が含まれます。

fruits のリストをデシリアライズしたい場合は、使用する一般的なコレクションを表す Type を持つデシリアライザーを作成する必要があります。

package com.acme.fruit.jsonb;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class ListOfFruitDeserializer extends JsonbDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        super(new ArrayList<MyEntity>() {}.getClass().getGenericSuperclass());
    }
}
各データオブジェクトにデシリアライザーを作成したくない場合は、io.vertx.core.json.JsonObject にデシリアライズする汎用の io.vertx.kafka.client.serialization.JsonObjectDeserializer を使用することができます。対応するシリアライザーの io.vertx.kafka.client.serialization.JsonObjectSerializer も使用できます。

10. Avro シリアライゼーション

これは、専用ガイド Schema RegistryとAvroと共にApache Kafkaを使用 で説明されています。

11. JSON Schema Serialization

This is described in a dedicated guide: Using Apache Kafka with Schema Registry and JSON Schema.

12. シリアライザー/デシリアライザーの自動検出

When using Quarkus Messaging with Kafka (io.quarkus:quarkus-messaging-kafka), Quarkus can often automatically detect the correct serializer and deserializer class. This autodetection is based on declarations of @Incoming and @Outgoing methods, as well as injected @Channels.

たとえば、以下のように宣言した場合

@Outgoing("generated-price")
public Multi<Integer> generate() {
    ...
}

設定において generated-price チャネルが smallrye-kafka コネクターを使用することを示している場合、Quarkus は自動的に value.serializer を Kafka の組み込みの IntegerSerializer に設定します。

同様に、以下を宣言した場合

@Incoming("my-kafka-records")
public void consume(Record<Long, byte[]> record) {
    ...
}

設定において my-kafka-records チャネルが smallrye-kafka コネクターを使用することを示している場合、Quarkus は自動的に key.deserializer を Kafka の組み込み LongDeserializer に設定し、同様に value.deserializerByteArrayDeserializer に設定します。

最後に、以下を宣言した場合

@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;

設定において price-create チャネルが smallrye-kafka コネクターを使用することを示している場合、Quarkus は自動的に value.serializer を Kafka の組み込みの DoubleSerializer に設定します。

シリアライザー/デシリアライザーの自動検出でサポートされるタイプの完全なセットは以下のとおりです。

  • short および java.lang.Short

  • int および java.lang.Integer

  • long および java.lang.Long

  • float および java.lang.Float

  • double および`java.lang.Double`

  • byte[]

  • java.lang.String

  • java.util.UUID

  • java.nio.ByteBuffer

  • org.apache.kafka.common.utils.Bytes

  • io.vertx.core.buffer.Buffer

  • io.vertx.core.json.JsonObject

  • io.vertx.core.json.JsonArray

  • org.apache.kafka.common.serialization.Serializer<T> / org.apache.kafka.common.serialization.Deserializer<T> の直接実装があるクラス。

    • この実装は、タイプ引数 T を (デ) シリアライズタイプとして指定する必要があります。

  • Confluent または Apicurio Registry serde が存在する場合、Avro スキーマから生成されるクラスと Avro GenericRecord から生成されるクラス

    • 複数の Avro serde が存在する場合、自動検出は使用できないため、Avro が生成するクラスに対してシリアライザー/デシリアライザーを手動で設定する必要があります

    • Confluent または Apicurio Registry ライブラリーの使用に関する詳細は、 Schema RegistryとAvroと共にApache Kafkaを使用 を参照してください

  • Jackson を介したシリアライズ で説明されているように、ObjectMapperSerializer / ObjectMapperDeserializer のサブクラスが存在するクラス

    • 技術的には ObjectMapperSerializer をサブクラスにする必要はありませんが、その場合は自動検出ができません

  • JSON-B を介したシリアライズ で説明されているように、JsonbSerializer / JsonbDeserializer のサブクラスが存在するクラス

    • 技術的には JsonbSerializer をサブクラスにする必要はありませんが、その場合は自動検出ができません

シリアライザー/デシリアライザーが設定されている場合、自動検出によって置き換えられることはありません。

In case you have any issues with serializer autodetection, you can switch it off completely by setting quarkus.messaging.kafka.serializer-autodetection.enabled=false. If you find you need to do this, please file a bug in the Quarkus issue tracker so we can fix whatever problem you have.

13. JSON シリアライザー/デシリアライザーの生成

Quarkus は、以下の場合のチャネルのシリアライザーおよびデシリアライザーを自動的に生成します。

  1. シリアライザー/デシリアライザーが設定されていない場合

  2. 自動検出が、一致するシリアライザー/デシリアライザーを見つけられなかった場合

これは、水面下で Jackson を使用しています。

この生成を無効にするには、以下を使用します。

quarkus.messaging.kafka.serializer-generation.enabled=false
生成は List<Fruit> などのコレクションをサポートしません。このケースでは、Jackson を介したシリアライズ を参照して、独自のシリアライザー/デシリアライザーを作成してください。

14. スキーマレジストリーの使用

This is described in a dedicated guide for Avro: Using Apache Kafka with Schema Registry and Avro. And a different one for JSON Schema: Using Apache Kafka with Schema Registry and JSON Schema.

15. ヘルスチェック

Quarkusは、Kafkaのヘルスチェックをいくつか提供しています。これらのチェックは、 quarkus-smallrye-health エクステンションと組み合わせて使用します。

15.1. Kafka ブローカー rediness チェック

quarkus-kafka-client エクステンションを使用している場合、application.propertiesquarkus.kafka.health.enabled プロパティーを true に設定することで、readiness ヘルスチェックを有効にすることができます。このチェックでは、default Kafka ブローカー (kafka.bootstrap.servers を使用して設定) とのインタラクションのステータスが報告されます。これには Kafka ブローカーとの admin connection が必要ですが、これはデフォルトでは無効になっています。有効にすると、アプリケーションの /q/health/ready エンドポイントにアクセスしたときに、接続検証のステータスに関する情報が得られます。

15.2. Kafka Reactive Messaging ヘルスチェック

Reactive Messaging と Kafka コネクターを使用する場合、設定済みの各チャンネル(着信または送信)は、startupliveness、および readiness チェックを提供します。

  • startup check は、Kafka クラスターとの通信が確立されていることを確認します。

  • liveness チェックは、Kafka との通信中に発生する回復不可能なエラーをキャプチャーします。

  • readiness チェックは、Kafka コネクターが設定済みの Kafka トピックに対してメッセージを消費/生成する準備ができていることを確認します。

チャネルごとに、以下を使用してチェックを無効にできます。

# Disable both liveness and readiness checks with `health-enabled=false`:

# Incoming channel (receiving records form Kafka)
mp.messaging.incoming.your-channel.health-enabled=false
# Outgoing channel (writing records to Kafka)
mp.messaging.outgoing.your-channel.health-enabled=false

# Disable only the readiness check with `health-readiness-enabled=false`:

mp.messaging.incoming.your-channel.health-readiness-enabled=false
mp.messaging.outgoing.your-channel.health-readiness-enabled=false
mp.messaging.incoming|outgoing.$channel.bootstrap.servers プロパティーを使用して、各チャンネルに bootstrap.servers を設定できます。デフォルトは kafka.bootstrap.servers です。

Reactive Messaging の startup および readiness チェックには、2 つのストラテジーがあります。デフォルトのストラテジーでは、ブローカーとの間にアクティブな接続が確立されていることを確認します。この方法は、組み込みの Kafka クライアントメトリクスに基づいているため、邪魔になることはありません。

health-topic-verification-enabled=true 属性を使用すると、startup プローブは admin client を使用してトピックのリストをチェックします。readiness プローブの場合、受信チャンネル用は、少なくとも 1 つのパーティションが消費のために割り当てられていることをチェックし、送信チャンネル用は、プロデューサーが使用するトピックがブローカーに存在していることをチェックします。

これを行うには、admin connection が必要です。health-topic-verification-timeout 設定を使用して、ブローカーへのトピック検証呼び出しのタイムアウトを調整することができます。

16. Observability

If the OpenTelemetry extension is present, then the Kafka connector channels work out-of-the-box with the OpenTelemetry Tracing. Messages written to Kafka topics propagate the current tracing span. On incoming channels, if a consumed Kafka record contains tracing information the message processing inherits the message span as parent.

Tracing can be disabled explicitly per channel:

mp.messaging.incoming.data.tracing-enabled=false

If the Micrometer extension is present, then Kafka producer and consumer clients metrics are exposed as Micrometer meters.

16.1. Channel metrics

Per channel metrics can also be gathered and exposed as Micrometer meters. Following metrics can be gathered per channel, identified with the channel tag:

  • quarkus.messaging.message.count : 生成または受信したメッセージの数

  • quarkus.messaging.message.acks : 正常に処理されたメッセージの数

  • quarkus.messaging.message.failures : 処理に失敗したメッセージの数

  • quarkus.messaging.message.duration : The duration of the message processing.

For backwards compatibility reasons channel metrics are not enabled by default and can be enabled with:

The message observation depends on intercepting messages and therefore doesn’t support channels consuming messages with a custom message type such as IncomingKafkaRecord, KafkaRecord, IncomingKafkaRecordBatch or KafkaRecordBatch.

The message interception, and observation, still work with channels consuming the generic Message type, or custom payloads enabled by converters.

smallrye.messaging.observation.enabled=true

17. Kafka Streams

詳細は、専用ガイドの Apache Kafka Streamsの使用 で説明されています。

18. メッセージ圧縮での Snappy の使用

outgoing チャンネルでは、 compression.type 属性を snappy に設定することで、Snappy 圧縮を有効にすることができます。

mp.messaging.outgoing.fruit-out.compression.type=snappy

In JVM mode, it will work out of the box. However, to compile your application to a native executable, you need to add quarkus.kafka.snappy.enabled=true to your application.properties.

ネイティブモードでは、Snappyはデフォルトで無効になっています。Snappyを使用するには、ネイティブライブラリを埋め込み、アプリケーションの起動時にそれを展開する必要があるからです。

19. OAuth を使用した認証

Kafka ブローカーが認証メカニズムとして OAuth を使用している場合は、この認証プロセスを有効にするために Kafka コンシューマーを設定する必要があります。まず、以下の依存関係をアプリケーションに追加します。

pom.xml
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-client</artifactId>
</dependency>
<!-- if compiling to native you'd need also the following dependency -->
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-common</artifactId>
</dependency>
build.gradle
implementation("io.strimzi:kafka-oauth-client")
// if compiling to native you'd need also the following dependency
implementation("io.strimzi:kafka-oauth-common")

この依存関係は、OAuth ワークフローを処理するために必要なコールバックハンドラーを提供します。そして、application.properties で追加します。

mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT
mp.messaging.connector.smallrye-kafka.sasl.mechanism=OAUTHBEARER
mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.client.id="team-a-client" \
  oauth.client.secret="team-a-client-secret" \
  oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" ;
mp.messaging.connector.smallrye-kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

quarkus.ssl.native=true

oauth.client.idoauth.client.secretoauth.token.endpoint.uri の値を更新します。

OAuth 認証は、JVM とネイティブモードの両方で動作します。SSL はネイティブモードでデフォルトで有効になっていないため、SSL を使用する JaasClientOauthLoginCallbackHandler をサポートするために、quarkus.ssl.native=true を追加する必要があります(詳細は、ネイティブイメージでのSSLの利用 ガイドを参照)。

20. TLS Configuration

Kafka client extension integrates with the Quarkus TLS registry to configure clients.

To configure the TLS for the default Kafka configuration, you need to provide a named TLS configuration in the application.properties:

quarkus.tls.your-tls-config.trust-store.pem.certs=target/certs/kafka.crt,target/certs/kafka-ca.crt
# ...
kafka.tls-configuration-name=your-tls-config
# enable ssl security protocol
kafka.security.protocol=ssl

This will in turn provide the Kafka client with a ssl.engine.factory.class implementation.

Make sure also to enable the SSL channel security protocol using the security.protocol property configured to SSL or SASL_SSL.

Quarkus Messaging channels can be configured individually to use a specific TLS configuration:

mp.messaging.incoming.your-channel.tls-configuration-name=your-tls-config
mp.messaging.incoming.your-channel.security.protocol=ssl

21. Kafka アプリケーションのテスト

21.1. ブローカーなしでのテスト

Kafka ブローカーを起動しなくてもアプリケーションをテストできるのは便利です。これを行うには、Kafka コネクターで管理しているチャンネルを インメモリー切り替え できます。

このアプローチは、JVM テストでのみ機能します。インジェクションには対応していないため、ネイティブテストには使用できません。

以下のプロセッサーアプリケーションをテストするとします。

@ApplicationScoped
public class BeverageProcessor {

    @Incoming("orders")
    @Outgoing("beverages")
    Beverage process(Order order) {
        System.out.println("Order received " + order.getProduct());
        Beverage beverage = new Beverage();
        beverage.setBeverage(order.getProduct());
        beverage.setCustomer(order.getCustomer());
        beverage.setOrderId(order.getOrderId());
        beverage.setPreparationState("RECEIVED");
        return beverage;
    }

}

まず、以下のテスト依存関係をアプリケーションに追加します。

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-reactive-messaging-in-memory</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")

そして、以下のように Quarkus Test Resource を作成します。

public class KafkaTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {

    @Override
    public Map<String, String> start() {
        Map<String, String> env = new HashMap<>();
        Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("orders");     (1)
        Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("beverages");  (2)
        env.putAll(props1);
        env.putAll(props2);
        return env;  (3)
    }

    @Override
    public void stop() {
        InMemoryConnector.clear();  (4)
    }
}
1 (Kafka からのメッセージが想定される) 受信チャンネル orders をインメモリーに切り替えます。
2 (Kafka へのメッセージを書き込む) 送信チャネル beverages をインメモリーに切り替えます。
3 インメモリーチャネルを使用するためのアプリケーション設定に必要なすべてのプロパティを含む Map をビルドして返します。
4 テストが停止したら、InMemoryConnector をクリアします (受信したメッセージと送信したメッセージをすべて破棄してください)。

上記で作成したテストリソースを使用して Quarkus テストを作成します。

import static org.awaitility.Awaitility.await;

@QuarkusTest
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {

    @Inject
    @Connector("smallrye-in-memory")
    InMemoryConnector connector; (1)

    @Test
    void testProcessOrder() {
        InMemorySource<Order> ordersIn = connector.source("orders");     (2)
        InMemorySink<Beverage> beveragesOut = connector.sink("beverages");  (3)

        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");

        ordersIn.send(order);  (4)

        await().<List<? extends Message<Beverage>>>until(beveragesOut::received, t -> t.size() == 1); (5)

        Beverage queuedBeverage = beveragesOut.received().get(0).getPayload();
        Assertions.assertEquals(Beverage.State.READY, queuedBeverage.getPreparationState());
        Assertions.assertEquals("coffee", queuedBeverage.getBeverage());
        Assertions.assertEquals("Coffee lover", queuedBeverage.getCustomer());
        Assertions.assertEquals("1234", queuedBeverage.getOrderId());
    }

}
1 テストクラスにインメモリーコネクタ-を注入します。
2 受信チャンネルを取得します (orders) - テストリソース内でチャンネルがインメモリーに切り替えられている必要があります。
3 送信チャネルを取得します (beverages) - テストリソース内でチャネルがインメモリーに切り替えられている必要があります。
4 send メソッドを使用して、orders チャンネルにメッセージを送信します。アプリケーションはこのメッセージを処理し、beverages チャンネルにメッセージを送信します。
5 beverages チャンネルで received メソッドを使用して、アプリケーションによって生成されたメッセージを確認します。

If your Kafka consumer is batch based, you will need to send a batch of messages to the channel as by creating them manually.

例えば

@ApplicationScoped
public class BeverageProcessor {

    @Incoming("orders")
    CompletionStage<Void> process(KafkaRecordBatch<String, Order> orders) {
        System.out.println("Order received " + orders.getPayload().size());
        return orders.ack();
    }
}
import static org.awaitility.Awaitility.await;

@QuarkusTest
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {

    @Inject
    @Connector("smallrye-in-memory")

    InMemoryConnector connector;

    @Test
    void testProcessOrder() {
        InMemorySource<IncomingKafkaRecordBatch<String, Order>> ordersIn = connector.source("orders");
        var committed = new AtomicBoolean(false);  (1)
        var commitHandler = new KafkaCommitHandler() {
            @Override
            public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record) {
                committed.set(true);  (2)
                return null;
            }
        };
        var failureHandler = new KafkaFailureHandler() {
            @Override
            public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reason, Metadata metadata) {
                return null;
            }
        };

        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");
        var record = new ConsumerRecord<>("topic", 0, 0, "key", order);
        var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 1), List.of(record)));
        var batch = new IncomingKafkaRecordBatch<>(
            records, "kafka", 0, commitHandler, failureHandler, false, false);  (3)

        ordersIn.send(batch);

        await().until(committed::get);  (4)
    }
}
1 Create an AtomicBoolean to track if the batch has been committed.
2 Update committed when the batch is committed.
3 Create a IncomingKafkaRecordBatch with a single record.
4 Wait until the batch is committed.

インメモリーチャネルを使用すると、Kafka ブローカーを開始せずにメッセージ処理のアプリケーションコードをテストできました。異なるインメモリーチャネルは独立しており、チャネルコネクターをインメモリーに切り替えても、同じ Kafka トピックに設定されたチャネル間でメッセージ配信をシミュレートしないことに注意してください。

21.1.1. Context propagation with InMemoryConnector

By default, in-memory channels dispatch messages on the caller thread, which would be the main thread in unit tests.

quarkus-test-vertx 依存関係は @io.quarkus.test.vertx.RunOnVertxContext アノテーションを提供します。これをテストメソッドで使用すると、Vert.x コンテキストでテストが実行されます。

However, most of the other connectors handle context propagation dispatching messages on separate duplicated Vert.x contexts.

If your tests are dependent on context propagation, you can configure the in-memory connector channels with the run-on-vertx-context attribute to dispatch events, including messages and acknowledgements, on a Vert.x context. Alternatively you can switch this behaviour using the InMemorySource#runOnVertxContext method.

21.2. Kafka ブローカーを使用したテスト

Dev Services for Kafka を使用している場合、%test プロファイルで無効になっていない限り、Kafka ブローカーが起動し、テスト全体で利用することができます。Kafka Clients API を使用してこのブローカーに接続することは可能ですが、 Kafka Companion Library では、Kafka ブローカーと対話し、テスト内でコンシューマー、プロデューサー、および管理アクションを作成する簡単な方法を提案しています。

テストで KafkaCompanion API を使用するには、以下の依存関係を追加して開始します。

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-test-kafka-companion</artifactId>
    <scope>test</scope>
</dependency>

これは、io.quarkus.test.kafka.KafkaCompanionResource (io.quarkus.test.common.QuarkusTestResourceLifecycleManager の実装) を提供します。

次に、@QuarkusTestResource を使用して、テストで Kafka Companion を設定します。以下に例を示します。

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.UUID;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.kafka.InjectKafkaCompanion;
import io.quarkus.test.kafka.KafkaCompanionResource;
import io.smallrye.reactive.messaging.kafka.companion.ConsumerTask;
import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;

@QuarkusTest
@QuarkusTestResource(KafkaCompanionResource.class)
public class OrderProcessorTest {

    @InjectKafkaCompanion (1)
    KafkaCompanion companion;

    @Test
    void testProcessor() {
        companion.produceStrings().usingGenerator(i -> new ProducerRecord<>("orders", UUID.randomUUID().toString())); (2)

        // Expect that the tested application processes orders from 'orders' topic and write to 'orders-processed' topic

        ConsumerTask<String, String> orders = companion.consumeStrings().fromTopics("orders-processed", 10); (3)
        orders.awaitCompletion(); (4)
        assertEquals(10, orders.count());
    }
}
1 @InjectKafkaCompanion は、テスト用に作成された Kafka ブローカーにアクセスするように設定された KafkaCompanion インスタンスを注入します。
2 KafkaCompanion を使用して、10 のレコードを 'orders' トピックに書き込むプロデューサータスクを作成します。
3 'orders-processed' トピックをサブスクライブし、10 のレコードを消費するコンシューマータスクを作成します。
4 コンシューマタスクの完了を待ちます。

テスト中に Kafka Dev Service が利用可能な場合、KafkaCompanionResource は作成された Kafka ブローカーを使用します。そうでない場合は、 Strimzi Test Container を使用して Kafka ブローカーを作成します。

作成された Kafka ブローカーの設定は、@ResourceArg を使用してカスタマイズすることができます。以下に例を示します。

@QuarkusTestResource(value = KafkaCompanionResource.class, initArgs = {
        @ResourceArg(name = "strimzi.kafka.image", value = "quay.io/strimzi-test-container/test-container:0.106.0-kafka-3.7.0"), // Image name
        @ResourceArg(name = "kafka.port", value = "9092"), // Fixed port for kafka, by default it will be exposed on a random port
        @ResourceArg(name = "kraft", value = "true"), // Enable Kraft mode
        @ResourceArg(name = "num.partitions", value = "3"), // Other custom broker configurations
})
public class OrderProcessorTest {
    // ...
}

21.2.1. カスタムテストリソース

あるいは、テストリソースで Kafka ブローカを起動することもできます。次のスニペットは、https://www.testcontainers.org/modules/kafka/[Testcontainers] 使用して Kafka ブローカを起動するテストリソースを示しています。

public class KafkaResource implements QuarkusTestResourceLifecycleManager {

    private final KafkaContainer kafka = new KafkaContainer();

    @Override
    public Map<String, String> start() {
        kafka.start();
        return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());  (1)
    }

    @Override
    public void stop() {
        kafka.close();
    }
}
1 アプリケーションがこのブローカーに接続するように、Kafka ブートストラップの場所を設定します。

22. Dev Services for Kafka

Kafka関連のエクステンション(例: quarkus-messaging-kafka )がある場合、Dev Services for Kafkaは開発モードやテスト実行時に自動的にKafkaブローカーを起動します。 そのため、手動でブローカーを起動する必要はありません。 アプリケーションは自動的に設定されます。

Kafkaブローカーの起動には時間がかかることがあるため、Dev Services for Kafkaは、~1秒で起動するKafka互換ブローカーである Redpanda を使用しています。

22.1. Dev Services for Kafkaの有効化/無効化

以下の場合を除き、Dev Services for Kafkaが自動的に有効になります:

  • quarkus.kafka.devservices.enabledfalse に設定されている場合

  • kafka.bootstrap.servers が設定されている場合

  • すべてのReactive Messaging Kafkaチャンネルに bootstrap.servers 属性が設定されている場合

Dev Services for Kafkaでは、ブローカーの起動にDockerを使用しています。お使いの環境でDockerがサポートされていない場合は、ブローカーを手動で起動するか、すでに稼働しているブローカーに接続する必要があります。ブローカーのアドレスは、 kafka.bootstrap.servers を使用して設定できます。

22.2. 共有ブローカー

ほとんどの場合、アプリケーション間でブローカーを共有する必要があります。Dev Services for Kafkaは、 開発 モードで動作する複数のQuarkusアプリケーションが1つのブローカーを共有するための サービス発見 メカニズムを実装しています。

Dev Services for Kafka は、コンテナを識別するために使用される quarkus-dev-service-kafka のラベルでコンテナを開始します。

複数の(共有)ブローカーが必要な場合は、 quarkus.kafka.devservices.service-name 属性を設定し、ブローカー名を示します。同じ値のコンテナを探し、見つからない場合は新しいコンテナを開始します。デフォルトのサービス名は kafka です。

共有は、devモードではデフォルトで有効ですが、testモードでは無効です。 quarkus.kafka.devservices.shared=false で共有を無効に設定可能です。

22.3. ポートの設定

デフォルトでは、Kafka向けDev Services はランダムなポートを選択してアプリケーションを構成します。ポートは、 quarkus.kafka.devservices.port プロパティを構成することで設定できます。

Kafkaのアドバタイズされたアドレスは、選択したポートで自動的に設定されることに注意してください。

22.4. イメージの設定

Dev Services for Kafkaは、 Redpanda とhttps://github/ozangunalp/kafka-native[kafka-native]、 StrimziKraft モード)イメージをサポートしています。

Redpanda is a Kafka compatible event streaming platform. Because it provides a fast startup times, Dev Services defaults to Redpanda images from vectorized/redpanda. You can select any version from https://hub.docker.com/r/vectorized/redpanda.

kafka-native はQuarkusとGraalVMを使用してネイティブバイナリにコンパイルされた通常のApache Kafkaディストリビューションのイメージを提供します。 まだ experimental ですが、非常に高速な起動時間と小さなフットプリントを提供します。

次のようにイメージの種類を設定することができます。

quarkus.kafka.devservices.provider=kafka-native

Strimzi は、Kubernetes上でApache Kafkaを動作させるためのコンテナイメージとOperatorを提供します。 StrimziはKubernetesに最適化されていますが、イメージは従来のコンテナ環境でも完全に動作します。 Strimziのコンテナイメージは、JVM上で “純正の” Kafkaブローカーを動作させますが、起動が遅くなっています。

quarkus.kafka.devservices.provider=strimzi

Strimzi では、Kafka のバージョンが Kraft に対応しているもの(2.8.1 以上)であれば、 https://quay.io/repository/strimzi-test-container/test-container?tab=tags から任意のイメージを選択することができます。

quarkus.kafka.devservices.image-name=quay.io/strimzi-test-container/test-container:0.106.0-kafka-3.7.0

22.5. Kafkaトピックの設定

ブローカーの起動時にトピックを作成するように、Kafka向けDev Services を構成することができます。トピックは、指定された数のパーティションと1つのレプリカで作成されます。

次の例では、 test という名前のトピックを3つのパーティションで作成し、 messages という名前の2つ目のトピックを2つのパーティションで作成しています。

quarkus.kafka.devservices.topic-partitions.test=3
quarkus.kafka.devservices.topic-partitions.messages=2

指定された名前のトピックがすでに存在する場合、既存のトピックを異なる数のパーティションに再分割しようとはせず、作成はスキップされます。

quarkus.kafka.devservices.topic-partitions-timeout を使用して、トピック作成時に使用される Kafka admin クライアントコールのタイムアウトを設定できます。デフォルトは 2 秒です。

22.6. トランザクションとべき等プロデューサーのサポート

デフォルトではRedpanda ブローカーはトランザクションと冪等機能を有効化するように設定されています。 以下の設定でそれらを無効にすることができます:

quarkus.kafka.devservices.redpanda.transaction-enabled=false
Redpandaのトランザクションは正確に一回(exactly once)の処理をサポートしません。

23. Kafka Dev UI

Kafka関連のエクステンションがある場合(例: quarkus-messaging-kafka )、Quarkus Dev UIはKafkaブローカー管理UIで拡張されます。 アプリケーション用に設定されたKafkaブローカーに自動的に接続されます。

Kafka Dev UI link

Kafka Dev UI を使用すると、Kafkaクラスターを直接管理し、次のようなタスクを実行することができます。

  • トピックの一覧表示と作成

  • レコードの可視化

  • 新レコードの公開

  • コンシューマーグループの一覧とその消費ラグの閲覧

Kafka Dev UI records
Kafka Dev UIはQuarkus Dev UIの一部で、開発モードでのみ利用可能です。

24. Kubernetes サービスバインディング

Quarkus Kafka エクステンションは、Service Binding Specification for Kubernetes をサポートしています。アプリケーションに quarkus-kubernetes-service-binding エクステンションを追加することで、これを有効にすることができます。

適切に設定された Kubernetes クラスターで実行すると、Kafka エクステンションはユーザー設定を必要とせずに、クラスター内で利用可能なサービスバインディングから Kafka ブローカー接続設定を取得します。

25. 実行モデル

Reactive Messaging は、I/O スレッドでユーザーのメソッドを呼び出します。したがって、デフォルトではメソッドはブロックされません。ブロッキング処理 で説明されているように、このメソッドが呼び出し元スレッドをブロックする場合は、メソッドに @Blocking アノテーションを追加する必要があります。

このトピックの詳細については、Quarkus リアクティブアーキテクチャのドキュメント を参照してください。

26. チャンネルデコレーター

SmallRye Reactive Messagingは、監視、トレース、メッセージの傍受などの横断的な関心事を実装するために、送受信チャネルのデコレータをサポートしています。デコレータやメッセージインターセプタの実装に関する詳細は、 SmallRye Reactive Messagingのドキュメント を参照してください。

27. 設定リファレンス

SmallRye Reactive Messaging 設定に関する詳細は、 SmallRye Reactive Messaging - Kafka Connector Documentation を参照してください。

各チャネルは、以下を使用した設定で無効にできます。

mp.messaging.[incoming|outgoing].[channel].enabled=false

最も重要な属性を以下の表に記載しています。

27.1. 着信チャネル設定 (Kafka からのポーリング)

以下の属性は以下のように設定します:

mp.messaging.incoming.your-channel-name.attribute=value

一部のプロパティには、グローバルに設定可能なエイリアスがあります。

kafka.bootstrap.servers=...

基盤となる Kafka consumer でサポートされる任意のプロパティーを渡すこともできます。

たとえば、 max.poll.records プロパティーを設定するには、次を使用します。

mp.messaging.incoming.[channel].max.poll.records=1000

一部のコンシューマクライアントプロパティーは、適切なデフォルト値に設定されています。

設定されていない場合、切断時の高負荷を回避するために、reconnect.backoff.max.ms10000 に設定されます。

設定されていない場合、key.deserializerorg.apache.kafka.common.serialization.StringDeserializer に設定されます。

コンシューマーの client.id は、mp.messaging.incoming.[channel].partitions プロパティーを使用して作成するクライアントの数に応じて設定されます。

  • client.id が指定されている場合は、そのまま使用されるか、partitions プロパティーが設定されている場合はクライアントインデックスの接尾辞が付けられます。

  • client.id が指定されていない場合、[client-id-prefix][channel-name][-index]. として生成されます。

Table 1. 'smallrye-kafka' connector の Incoming 属性
属性 (alias) 説明 必須 デフォルト

bootstrap.servers

(kafka.bootstrap.servers)

Kafka クラスターへの初期接続を確立するために使用する host:port のコンマ区切りリスト

Type: string

false

localhost:9092

topic

消費/投入されるKafkaトピック。このプロパティも topics のプロパティも設定されていない場合は、チャネル名が使用されます。

Type: string

false

health-enabled

ヘルスレポートが有効(デフォルト)か無効か

Type: boolean

false

true

health-readiness-enabled

レディネスレポートが有効(デフォルト)か無効か

Type: boolean

false

true

health-readiness-topic-verification

deprecated - レディネスチェックでトピックがブローカーに存在することを確認する必要があるかどうか。デフォルトは false です。有効にするには、管理者接続が必要です。非推奨: 代わりに health-topic-verification-enabled を使用します。

Type: boolean

false

health-readiness-timeout

deprecated - レディネスヘルスチェック中に、コネクターはブローカーに接続し、トピックのリストを取得します。この属性は、取得の最大期間 (ミリ秒単位) を指定します。超過した場合、チャネルは準備ができていないと見なされます。非推奨: 代わりに health-topic-verification-timeout を使用します。

Type: long

false

health-topic-verification-enabled

ブローカーにトピックが存在するかどうかをスタートアップおよび レディネスチェックで確認するかどうか。デフォルトは false です。これを有効にするには、admin 接続が必要です。

Type: boolean

false

false

health-topic-verification-timeout

スタートアップおよび Readines チェックの間、コネクタはブローカーに接続し、トピックのリストを取得します。この属性では、検索にかける最大時間 (ms) を指定します。これを超えると、チャネルは準備ができていないとみなされます。

Type: long

false

2000

tracing-enabled

トレースを有効(デフォルト)にするか、無効にするか

Type: boolean

false

true

client-id-prefix

Kafka クライアント client.id 属性のプレフィックス。定義されている場合は、設定されたか自動生成された client.id にプリフィックスされます。そうでない場合は kafka-consumer- がプレフィックスとなります。

Type: string

false

checkpoint.state-store

checkpoint コミット戦略を使用している間、io.smallrye.reactive.messaging.kafka.StateStore.Factory を実装する Bean の @Identifier に設定された名前は、状態ストアの実装を指定します。

Type: string

false

checkpoint.state-type

checkpoint commit-strategy を使用する場合、ステートストアに永続化するステートオブジェクトの完全修飾型名を指定します。指定されると、処理ステートオブジェクトの永続化を助けるために、ステートストア実装によって使用されることができます。

false

checkpoint.unsynced-state-max-age.ms

checkpoint` コミット戦略を使用する場合、コネクタが不健全とマークされる前に処理状態を保持する必要がある最大時間をミリ秒単位で指定します。この属性を 0 に設定すると、この監視を無効にすることができます。

Type: int

false

10000

cloud-events

クラウド イベント サポートを有効(デフォルト)または無効にします。 incoming チャネルで有効にすると、コネクタは受信レコードを分析し、Cloud Event メタデータの作成を試みます。 outgoing 側で有効にすると、メッセージに Cloud Event Metadata が含まれている場合、コネクタはoutgoingメッセージを Cloud Event として送信します。

Type: boolean

false

true

kafka-configuration

このチャネルのデフォルトの Kafka コンシューマー/プロデューサー設定を提供する CDIBean の ID。チャネル設定は、引き続き任意の属性をオーバーライドできます。Bean には、ある種のマップ<String, Object> が必要です。また、識別子を設定するには、@io.smallrye.common.annotation.Identifier 修飾子を使用する必要があります。

Type: string

false

topics

消費されるトピックのコンマ区切りのリスト。 topic または pattern のプロパティとは併用できません。

Type: string

false

pattern

topic プロパティが正規表現であることを示す。 topic プロパティと併用する必要があります。 topics プロパティとは併用できません。

Type: boolean

false

false

key.deserializer

レコードのキーをデシリアライズするために使用されるデシリアライザのクラス名

Type: string

false

org.apache.kafka.common.serialization.StringDeserializer

lazy-client

Kafkaクライアントを遅延作成するか(lazy)、即時作成するか(eagerly)。

Type: boolean

false

false

value.deserializer

レコードの値のデシリアライズに使用されるデシリアライザのクラス名

Type: string

true

fetch.min.bytes

フェッチ・リクエストに対してサーバーが返すべきデータの最小量。デフォルトの1バイトの設定は、1バイトのデータが利用可能になるか、データの到着を待ってフェッチリクエストがタイムアウトするとすぐにフェッチリクエストに応答することを意味します。

Type: int

false

1

group.id

アプリケーションが所属するコンシューマーグループを識別するための一意の文字列。

設定されていない場合、デフォルトでは、 quarkus.application.name 設定プロパティで設定されたアプリケーション名になります。

それも設定されていない場合は、生成された一意のIDが使用されます。

常に group.id を定義することをお勧めします。自動生成は、開発用の便利機能にすぎません。 このプロパティを ${quarkus.uuid} に設定することで、自動的に生成される一意の ID を明示的に要求することができます。

Type: string

false

enable.auto.commit

この設定を有効にすると、コンシューマーのオフセットは、レコードの実際の処理結果を無視して、基礎となるKafkaクライアントによってバックグラウンドで定期的にコミットされます。この設定を有効にしないで、Reactive Messaging にコミットを任せることをお勧めします。

Type: boolean

false

false

retry

障害発生時にブローカーへの接続を再試行するかどうか

Type: boolean

false

true

retry-attempts

失敗するまでの最大再接続回数を指定します。-1は無限再試行を意味します。

Type: int

false

-1

retry-max-wait

2回の再接続の間の最大遅延時間(秒)

Type: int

false

30

broadcast

Kafkaレコードが複数のコンシューマーにディスパッチされるべきか

Type: boolean

false

false

auto.offset.reset

Kafka に初期オフセットがない場合の対処方法受け入れられる値は、earliest、latest、none

Type: string

false

latest

failure-strategy

レコードから生成されたメッセージが否定的に確認された(nack)場合に適用する失敗戦略を指定します。値は、 fail (デフォルト)、 ignore 、または dead-letter-queue

Type: string

false

fail

commit-strategy

レコードから生成されたメッセージが確認されたときに適用するコミットストラテジーを指定します。値は、 latestignorethrottled のいずれかです。 enable.auto.commit がtrueであれば、デフォルトは ignore です。そうでなければ throttled です。

Type: string

false

throttled.unprocessed-record-max-age.ms

throttled commit-strategy を使用している場合の、コネクタが不健全であるとマークされるまでの未処理メッセージの最大時間をミリ秒単位で指定します。この属性を 0 に設定すると、このモニタリングが無効になります。

Type: int

false

60000

dead-letter-queue.topic

failure-strategydead-letter-queue が設定されている場合、どのトピックにレコードが送信されるかを示します。デフォルトは dead-letter-topic-$channel

Type: string

false

dead-letter-queue.key.serializer

failure-strategydead-letter-queue が設定されている場合、 使用するキーシリアライザを示します。設定されていない場合は、キーデシリアライザに関連付けられたシリアライザが使用されます。

Type: string

false

dead-letter-queue.value.serializer

failure-strategydead-letter-queue が設定されている場合、使用する値のシリアライザを示します。設定されていない場合は、値のデシリアライザに関連付けられたシリアライザが使用されます。

Type: string

false

partitions

同時に消費されるパーティションの数です。コネクタは、指定された数のKafkaコンシューマーを作成します。これは、対象となるトピックのパーティション数と一致する必要があります。

Type: int

false

1

requests

partitions が 1 より大きい場合、この属性を使用すると、各コンシューマーが毎回要求するレコードの数を設定できます。

Type: int

false

128

consumer-rebalance-listener.name

io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener を実装する Bean の @Identifier で設定された名前です。設定された場合、このリバランスリスナーはコンシューマーに適用されます。

Type: string

false

key-deserialization-failure-handler

io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler を実装する Beanの @Identifier で設定された名前です。設定されている場合、キーをデシリアライズする際に起こるデシリアライズの失敗は、フォールバック値を再試行または提供することができるこのハンドラに委ねられます。

Type: string

false

value-deserialization-failure-handler

io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler を実装する Beanの @Identifier で設定された名前です。設定されている場合、値をデシリアライズする際に起こるデシリアライズの失敗は、フォールバック値を再試行または提供することができるこのハンドラに委ねられます。

Type: string

false

fail-on-deserialization-failure

デシリアライズ失敗ハンドラーが設定されておらず、デシリアライズ失敗が発生した場合は、失敗を報告し、アプリケーションを異常としてマークします。false に設定され、逆シリアル化の失敗が発生した場合、null 値が転送されます。

Type: boolean

false

true

graceful-shutdown

アプリケーションの終了時に、グレースフルシャットダウンを行うかどうか。

Type: boolean

false

true

poll-timeout

ミリ秒単位のポーリングタイムアウト。レコードをポーリングする場合、ポーリングは最大でその期間待機してからレコードを返します。デフォルトは 1000ms です

Type: int

false

1000

pause-if-no-requests

アプリケーションがアイテムを要求しないときにポーリングを一時停止し、要求したときに再開する必要があるかどうか。これにより、アプリケーションの容量に基づいてバックプレッシャを実装できます。ポーリングは停止されませんが、一時停止されたときにレコードを取得しないことに注意してください。

Type: boolean

false

true

batch

Kafka レコードがバッチで消費されるかどうか。チャネルインジェクションポイントは、List<Payload> または KafkaRecordBatch<Payload> などの互換性のあるタイプを消費する必要があります。

Type: boolean

false

false

max-queue-size-factor

max.poll.records * max-queue-size-factor を使用して、処理のためにキューに入れられるレコードの最大数を決定する乗数係数。デフォルトは 2 です。batch モードでは、max.poll.records1 と見なされます。

Type: int

false

2

27.2. outgoingチャンネルの設定(Kafkaへの書き込み)

以下の属性は以下のように設定します:

mp.messaging.outgoing.your-channel-name.attribute=value

一部のプロパティには、グローバルに設定可能なエイリアスがあります。

kafka.bootstrap.servers=...

基盤となる Kafka producer でサポートされている任意のプロパティーを渡すこともできます。

たとえば、max.block.ms プロパティーを設定するには、次を使用します。

mp.messaging.incoming.[channel].max.block.ms=10000

一部のプロデューサークライアントプロパティーは、適切なデフォルト値に設定されています。

設定されていない場合、切断時の高負荷を回避するために、reconnect.backoff.max.ms10000 に設定されます。

設定されていない場合、key.serializerorg.apache.kafka.common.serialization.StringSerializer に設定されます。

設定されていない場合、プロデューサー client.id[client-id-prefix][channel-name] として生成されます。

Table 2. 'smallrye-kafka' connector の Outgoing 属性
属性 (alias) 説明 必須 デフォルト

acks

リクエストを完了とみなす前に、プロデューサーがリーダーに受信したことを要求する確認応答の数。これは、送信されるレコードの耐久性を制御します。許容される値は 0 、 1 、 または all です。

Type: string

false

1

bootstrap.servers

(kafka.bootstrap.servers)

Kafka クラスターへの初期接続を確立するために使用する host:port のコンマ区切りリスト

Type: string

false

localhost:9092

client-id-prefix

Kafka クライアント client.id 属性のプレフィックス。定義されている場合は、設定されたか自動生成された client.id にプリフィックスされます。そうでない場合は kafka-producer- がプレフィックスとなります。

Type: string

false

buffer.memory

サーバーへの送信待ちのレコードをバッファリングするために、プロデューサーが使用できるメモリの総バイト数

Type: long

false

33554432

close-timeout

Kafkaプロデューサーのグレースフルシャットダウンを待つミリ秒の量

Type: int

false

10000

cloud-events

クラウド イベント サポートを有効(デフォルト)または無効にします。 incoming チャネルで有効にすると、コネクタは受信レコードを分析し、Cloud Event メタデータの作成を試みます。 outgoing 側で有効にすると、メッセージに Cloud Event Metadata が含まれている場合、コネクタはoutgoingメッセージを Cloud Event として送信します。

Type: boolean

false

true

cloud-events-data-content-type

(cloud-events-default-data-content-type)

outgoing Cloud Eventのデフォルトの datacontenttype 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが datacontenttype 属性を設定していない場合に使用されます。

タイプ: string

false

cloud-events-data-schema

(cloud-events-default-data-schema)

outgoing Cloud Eventのデフォルトの dataschema 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが dataschema 属性を設定していない場合に使用されます。

タイプ: string

false

cloud-events-insert-timestamp

(cloud-events-default-timestamp)

コネクターが、送信するクラウドイベントに自動的に time 属性を挿入するかどうかを指定します。cloud-eventstrue に設定されていることが必要です。この値は、メッセージに time 属性が設定されていない場合に使用されます。

タイプ: boolean

false

true

cloud-events-mode

Cloud Eventのモード( structured または binary (デフォルト))。outgoing レコードにCloud Eventをどのように書き込むかを示します

タイプ: string

false

binary

cloud-events-source

(cloud-events-default-source)

outgoing Cloud Eventのデフォルトの source 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが source 属性を設定していない場合に使用されます。

タイプ: string

false

cloud-events-subject

(cloud-events-default-subject)

outgoing Cloud Eventのデフォルトの subject 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが subject 属性を設定していない場合に使用されます。

タイプ: string

false

cloud-events-type

(cloud-events-default-type)

outgoing Cloud Eventのデフォルトの type 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが type 属性を設定していない場合に使用されます。

タイプ: string

false

health-enabled

ヘルスレポートが有効(デフォルト)か無効か

Type: boolean

false

true

health-readiness-enabled

レディネスレポートが有効(デフォルト)か無効か

Type: boolean

false

true

health-readiness-timeout

deprecated - レディネスヘルスチェック中に、コネクターはブローカーに接続し、トピックのリストを取得します。この属性は、取得の最大期間 (ミリ秒単位) を指定します。超過した場合、チャネルは準備ができていないと見なされます。非推奨: 代わりに health-topic-verification-timeout を使用します。

Type: long

false

health-readiness-topic-verification

deprecated - レディネスチェックでトピックがブローカーに存在することを確認する必要があるかどうか。デフォルトは false です。有効にするには、管理者接続が必要です。非推奨: 代わりに health-topic-verification-enabled を使用します。

Type: boolean

false

health-topic-verification-enabled

ブローカーにトピックが存在するかどうかをスタートアップおよび レディネスチェックで確認するかどうか。デフォルトは false です。これを有効にするには、admin 接続が必要です。

Type: boolean

false

false

health-topic-verification-timeout

スタートアップおよび Readines チェックの間、コネクタはブローカーに接続し、トピックのリストを取得します。この属性では、検索にかける最大時間 (ms) を指定します。これを超えると、チャネルは準備ができていないとみなされます。

Type: long

false

2000

kafka-configuration

このチャネルのデフォルトの Kafka コンシューマー/プロデューサー設定を提供する CDIBean の ID。チャネル設定は、引き続き任意の属性をオーバーライドできます。Bean には、ある種のマップ<String, Object> が必要です。また、識別子を設定するには、@io.smallrye.common.annotation.Identifier 修飾子を使用する必要があります。

Type: string

false

key

レコードを書き込む際に使用するキー

Type: string

false

key-serialization-failure-handler

io.smallrye.reactive.messaging.kafka.SerializationFailureHandler を実装する Bean の @Identifier で設定された名前です。設定されている場合、キーをシリアライズする際に起こるシリアライズの失敗は、フォールバック値を再試行または提供することができるこのハンドラに委ねられます。

Type: string

false

key.serializer

レコードのキーをシリアライズするために使用されるシリアライザのクラス名

Type: string

false

org.apache.kafka.common.serialization.StringSerializer

lazy-client

Kafkaクライアントを遅延作成するか(lazy)、即時作成するか(eagerly)。

Type: boolean

false

false

max-inflight-messages

Kafkaに同時に書き込まれるメッセージの最大数。ブローカーが書き込みと確認を待っているメッセージの数を制限します。この属性を 0 に設定することで、制限を取り除くことができます。

Type: long

false

1024

merge

コネクタが複数のアップストリームを許可するかどうか

タイプ: boolean

false

false

partition

対象のパーティション ID 。 -1 を設定すると、クライアントがパーティションを決定します。

Type: int

false

-1

propagate-headers

送信レコードに伝搬される、受信レコードヘッダーのカンマ区切りのリスト

Type: string

false

propagate-record-key

受信レコードのキーを送信レコードに伝搬させるかどうか

Type: boolean

false

false

retries

正の数を設定すると、コネクターは、正常に配信されなかったレコード ( 一過性のエラーが発生する可能性があります ) を、再試行回数に達するまで再送しようとします。 0 に設定すると、再試行は無効になります。設定されていない場合、コネクターは、 delivery.timeout.ms で設定された時間の間、 ( 潜在的な一時的エラーのために ) 配信に失敗したすべてのレコードの再送を試みます。

Type: long

false

2147483647

topic

消費/投入されるKafkaトピック。このプロパティも topics のプロパティも設定されていない場合は、チャネル名が使用されます。

Type: string

false

tracing-enabled

トレースを有効(デフォルト)にするか、無効にするか

Type: boolean

false

true

value-serialization-failure-handler

io.smallrye.reactive.messaging.kafka.SerializationFailureHandler を実装する Bean の @Identifier で設定された名前です。設定されている場合、値をシリアライズする際に起こるシリアライズの失敗は、フォールバック値を再試行または提供することができるこのハンドラに委ねられます。

Type: string

false

value.serializer

レコードの値のシリアライズに使用されるシリアライザのクラス名

Type: string

true

waitForWriteCompletion

クライアントがメッセージを確認する前に、Kafkaが書き込まれた記録を確認するのを待つかどうか

Type: boolean

false

true

27.3. Kafka 設定の解決

Quarkus は、default-kafka-broker 名の設定マップ内に接頭辞 kafka. または KAFKA_ が付いたすべての Kafka 関連アプリケーションプロパティーを公開します。この設定は、Kafka ブローカーとの接続を確立するために使用されます。

このデフォルト設定に加えて、kafka-configuration 属性を使用して Map プロデューサーの名前を設定することができます。

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.kafka-configuration=my-configuration

この場合、コネクターは my-configuration 名に関連付けられた Map を探します。kafka-configuration が設定されていない場合、チャネル名 (前の例では my-channel) で公開された Map のオプションの検索が実行されます。

@Produces
@ApplicationScoped
@Identifier("my-configuration")
Map<String, Object> outgoing() {
    return Map.ofEntries(
            Map.entry("value.serializer", ObjectMapperSerializer.class.getName())
    );
}
kafka-configuration が設定されていて、Map が見つからない場合、デプロイメントは失敗します。

属性値は次のように解決されます。

  1. 属性はチャネル設定に直接設定されます (mp.messaging.incoming.my-channel.attribute=value)。

  2. 設定されていない場合、コネクターはチャネル名または設定された kafka-configuration (設定されている場合) を含む Map を検索し、その Map から値を取得します。

  3. 解決された Map に値が含まれていない場合、デフォルトの Map が使用されます (default-kafka-broker 名で公開)

27.4. 条件付きでチャンネルを設定する

特定のプロファイルを使用してチャンネルを設定できます。 そのため、指定したプロファイルが有効な場合にのみ、チャンネルが設定されます(アプリケーションに追加されます)。

To achieve this, you need:

  1. Prefix the mp.messaging.[incoming|outgoing].$channel entries with %my-profile such as %my-profile.mp.messaging.[incoming|outgoing].$channel.key=value

  2. Use the @IfBuildProfile("my-profile") on the CDI beans containing @Incoming(channel) and @Outgoing(channel) annotations that need only to be enabled when the profile is enabled.

Note that reactive messaging verifies that the graph is complete. So, when using such a conditional configuration, ensure the application works with and without the profile enabled.

Note that this approach can also be used to change the channel configuration based on a profile.

28. Kafka との統合 - 一般的なパターン

28.1. HTTP エンドポイントから Kafka への書き込み

HTTP エンドポイントから Kafka にメッセージを送信するには、エンドポイントに Emitter (または MutinyEmitter) を注入します。

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<String> emitter;          (1)

    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) { (2)
        return emitter.send(payload);                   (3)
    }
}
1 Emitter<String> を注入します。
2 HTTPメソッドはメッセージがKafkaに書き込まれると、ペイロードを受け取り、CompletionStage の完了を返します。
3 メッセージをKafkaに送信し、 send メソッドは CompletionStage を返却します。

エンドポイントは、渡されたペイロードを (POST HTTP リクエストから) エミッターに送信します。エミッターのチャネルは、application.properties ファイルの Kafka トピックにマップされます。

mp.messaging.outgoing.kafka.connector=smallrye-kafka
mp.messaging.outgoing.kafka.topic=my-topic

エンドポイントは、メソッドの非同期性を示す CompletionStage を返します。emitter.send メソッドは CompletionStage<Void> を返します。メッセージが Kafka に書き込まれると、返される future は完了します。書き込みが失敗した場合、返された CompletionStage が例外扱いで完了します。

エンドポイントが CompletionStage を返さない場合、メッセージが Kafka に送信される前に HTTP 応答が書き込まれる可能性があるため、ユーザーに失敗が報告されることはありません。

Kafka レコードを送信する必要がある場合は、次を使用します。

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import io.smallrye.reactive.messaging.kafka.Record;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<Record<String,String>> emitter;  (1)


    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) {
        return emitter.send(Record.of("my-key", payload));    (2)
    }
}
1 Emitter<Record<K, V>> の使用法に注意してください
2 Record.of(k, v) を使用してレコードを作成します

28.2. Hibernate with Panache での Kafka メッセージの永続化

Kafka から受信したオブジェクトをデータベースに永続化するには、Hibernate with Panache を使用することができます。

Hibernate Reactive を使用する場合は、Hibernate Reactive を使用した Kafka メッセージの永続化 を参照してください。

Fruit オブジェクトを受け取ったと想像してみましょう。簡単にするために、Fruit クラスは非常に単純です。

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.hibernate.orm.panache.PanacheEntity;

@Entity
public class Fruit extends PanacheEntity {

    public String name;

}

Kafka トピックに保存されている Fruit インスタンスを消費し、それらをデータベースに永続化するには、次のアプローチを使用できます。

package org.acme;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.common.annotation.Blocking;

@ApplicationScoped
public class FruitConsumer {

    @Incoming("fruits")                                     (1)
    @Transactional                                          (2)
    public void persistFruits(Fruit fruit) {                (3)
        fruit.persist();                                    (4)
    }
}
1 着信チャネルの設定。このチャンネルは Kafka から読み取ります。
2 データベースに書き込んでいるので、トランザクション内である必要があります。このアノテーションは新しいトランザクションを開始し、メソッドが返されたときにそれをコミットします。Quarkus は、このメソッドを自動的に blocking と見なします。実際、従来の Hibernate を使用したデータベースへの書き込みはブロックされています。そこで、Quarkus は、ブロックできるワーカースレッド (I/O スレッドではない) でメソッドを呼び出します。
3 メソッドは各 Fruit を受け取ります。Kafka レコードから Fruit インスタンスを再構築するには、デシリアライザーが必要になることに注意してください。
4 受信した fruit オブジェクトを永続化します。

上記 <4> で述べたように、レコードから Fruit を作成できるデシリアライザーが必要です。これは、Jackson デシリアライザーを使用して実行できます。

package org.acme;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

関連する設定は次のようになります。

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

Kafka を使用した Jackson の使い方の詳細については、Jackson を介したシリアライズ を確認してください。また、Avro を使用することもできます。

28.3. Hibernate Reactive を使用した Kafka メッセージの永続化

Kafka から受信したオブジェクトをデータベースに永続化するには、Hibernate Reactive with Panache を使用することができます。

Fruit オブジェクトを受け取ったと想像してみましょう。簡単にするために、Fruit クラスは非常に単純です。

package org.acme;

import jakarta.persistence.Entity;

import io.quarkus.hibernate.reactive.panache.PanacheEntity;  (1)

@Entity
public class Fruit extends PanacheEntity {

    public String name;

}
1 必ずリアクティブバリアントを使用してください

Kafka トピックに保存されている Fruit インスタンスを消費し、それらをデータベースに永続化するには、次のアプローチを使用できます。

package org.acme;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class FruitStore {

    @Inject
    Mutiny.Session session;                    (1)

    @Incoming("in")
    @ActivateRequestContext (2)
    public Uni<Void> consume(Fruit entity) {
        return session.withTransaction(t -> {  (3)
            return entity.persistAndFlush()    (4)
                    .replaceWithVoid();        (5)
        }).onTermination().call(() -> session.close()); (6)
    }

}
1 Hibernate Reactive Session を注入します。
2 Hibernate Reactive Session および Panache API は、アクティブな CDI リクエストコンテキストを必要とします。 @ActivateRequestContext アノテーションは新しいリクエストコンテキストを作成し、メソッドから返された Uni が完了すると、それを破棄します。 Panache を使用しない場合は、 Mutiny.SessionFactory を注入して同様に使用することができ、 リクエストコンテキストをアクティブにしたりセッションを手動で閉じたりする必要はありません。
3 新しいトランザクションを要求します。渡されたアクションが完了すると、トランザクションが完了します。
4 エンティティーを永続化します。これは Uni<Fruit> を返します。
5 Uni<Void> に切り替えます。
6 セッションを閉じる - これは、データベースとの接続を閉じることです。その後、接続を再利用することができます。

classic Hibernate とは異なり、@Transactional は使用できません。代わりに、 session.withTransaction を使用して、エンティティーを永続化します。 map は、 Uni<Fruit> ではなく Uni<Void> を返すために使用されます。

レコードから Fruit を作成できるデシリアライザーが必要です。これは、Jackson デシリアライザーを使用して実行できます。

package org.acme;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

関連する設定は次のようになります。

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

Kafka を使用した Jackson の使い方の詳細については、Jackson を介したシリアライズ を確認してください。また、Avro を使用することもできます。

28.4. Hibernate が管理するエンティティーの Kafka への書き込み

以下のプロセスを想像してみましょう。

  1. ペイロードを含む HTTP リクエストを受信します。

  2. このペイロードから Hibernate エンティティーインスタンスを作成します。

  3. そのエンティティーをデータベースに永続化します。

  4. エンティティーを Kafka トピックに送信します。

Hibernate Reactive を使用する場合は、Hibernate Reactive が管理するエンティティーの Kafka への書き込み を参照してください。

データベースに書き込むため、トランザクションでこのメソッドを実行する必要があります。しかし、エンティティーを Kafka に送信することは非同期で行われます。操作が完了すると、操作は CompletionStage (または MutinyEmitter を使用する場合は Uni ) レポートを返します。オブジェクトが書き込まれるまで、トランザクションがまだ実行されていることを確認する必要があります。そうしないと、トランザクションの外側でオブジェクトにアクセスする可能性がありますが、これは許可されていません。

このプロセスを実装するには、次のアプローチが必要です。

package org.acme;

import java.util.concurrent.CompletionStage;

import jakarta.transaction.Transactional;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<Fruit> emitter;

    @POST
    @Path("/fruits")
    @Transactional                                                      (1)
    public CompletionStage<Void> storeAndSendToKafka(Fruit fruit) {     (2)
        fruit.persist();
        return emitter.send(new FruitDto(fruit));                       (3)
    }
}
1 データベースに書き込んでいるときは、トランザクション内で実行していることを確認してください
2 メソッドは、永続化する fruit インスタンスを受け取ります。そして、トランザクション区切りに使用される CompletionStage を返します。トランザクションは、返された CompletionStage が完了するとコミットされます。この例の場合は、メッセージが Kafka に書き込まれるときになります。
3 管理対象エンティティをData transferオブジェクトの中に包み込み、Kafkaに送信します。これにより、Kafkaのシリアライズの影響をマネージドエンティティが受けないようにします。

28.5. Hibernate Reactive が管理するエンティティーの Kafka への書き込み

Hibernate Reactive によって管理されている Kafka エンティティーに送信するには、以下を使用することをお勧めします。

  • Quarkus REST to serve HTTP requests

  • チャネルにメッセージを送信するための MutinyEmitter 。これにより、Hibernate Reactive または Hibernate Reactive with Panache によって公開される Mutiny API と簡単に統合できます。

次の例は、ペイロードを受信し、Hibernate Reactive with Panache を使用してデータベースに保存し、永続化されたエンティティーを Kafka に送信する方法を示しています。

package org.acme;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/")
public class ReactiveGreetingResource {

    @Channel("kafka") MutinyEmitter<Fruit> emitter;     (1)

    @POST
    @Path("/fruits")
    public Uni<Void> sendToKafka(Fruit fruit) {         (2)
        return Panache.withTransaction(() ->            (3)
            fruit.<Fruit>persist()
        )
            .chain(f -> emitter.send(f));               (4)
    }
}
1 Mutiny API を公開する MutinyEmitter を注入します。これにより、Hibernate Reactive with Panache によって公開された Mutiny API との統合が簡素化されます。
2 ペイロードを受信する HTTP メソッドは Uni<Void> を返します。HTTP 応答は、操作が完了すると書き込まれます (エンティティーは永続化され、Kafka に書き込まれます)。
3 トランザクションでエンティティーをデータベースに書き込む必要があります。
4 永続化操作が完了すると、エンティティーを Kafka に送信します。send メソッドは Uni<Void> を返します。

28.6. サーバー送信イベントとしての Kafka トピックのストリーミング

Kafka トピックをサーバー送信イベント (SSE) としてストリーミングするのは簡単です。

  1. Kafka トピックを表すチャネルを HTTP エンドポイントに注入します

  2. そのチャネルを HTTP メソッドから Publisher または Multi として返します

以下のコードはその一例です。

@Channel("fruits")
Multi<Fruit> fruits;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<Fruit> stream() {
    return fruits;
}

一部の環境では、十分なアクティビティーがない場合に SSE 接続が切断されます。回避策として、定期的に ping メッセージ (または空のオブジェクト) を送信することが挙げられます。

@Channel("fruits")
Multi<Fruit> fruits;

@Inject
ObjectMapper mapper;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
    return Multi.createBy().merging()
            .streams(
                    fruits.map(this::toJson),
                    emitAPeriodicPing()
            );
}

Multi<String> emitAPeriodicPing() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(10))
            .onItem().transform(x -> "{}");
}

private String toJson(Fruit f) {
    try {
        return mapper.writeValueAsString(f);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}

Kafka からの fruits を送信する以外に、定期的に ping を送信する必要があるため、回避策は少し複雑になっています。これを実現するために、Kafka からのストリームと、10 秒ごとに {} を放出する定期的なストリームをマージします。

28.7. Kafka トランザクションと Hibernate Reactive トランザクションとのチェーン

Kafka トランザクションを Hibernate Reactive トランザクションとチェーンすることにより、Kafka トランザクションにレコードを送信し、データベースの更新を実行して、データベーストランザクションが成功した場合にのみ Kafka トランザクションをコミットすることができます。

以下の例は、次のことを示しています。

  • Receive a payload by serving HTTP requests using Quarkus REST,

  • Smallrye Fault Tolerance を使用して、その HTTP エンドポイントの同時実行を制限します。

  • Kafka トランザクションを開始し、ペイロードを Kafka レコードに送信します。

  • Hibernate Reactive with Panache を使用して、ペイロードをデータベースに保存します。

  • エンティティーが正常に永続化された場合にのみ、Kafka トランザクションをコミットします。

package org.acme;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.hibernate.reactive.mutiny.Mutiny;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@Path("/")
public class FruitProducer {

    @Channel("kafka") KafkaTransactions<Fruit> kafkaTx; (1)

    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1) (2)
    public Uni<Void> post(Fruit fruit) { (3)
        return kafkaTx.withTransaction(emitter -> { (4)
            emitter.send(fruit); (5)
            return Panache.withTransaction(() -> { (6)
                return fruit.<Fruit>persist(); (7)
            });
        }).replaceWithVoid();
    }
}
1 Mutiny API を公開する KafkaTransactions を注入します。これにより、Hibernate Reactive with Panache によって公開された Mutiny API との統合が可能になります。
2 HTTP エンドポイントの同時実行を "1" に制限し、特定の時間に複数のトランザクションを開始しないようにします。
3 ペイロードを受信する HTTP メソッドは Uni<Void> を返します。HTTP 応答は、操作が完了すると書き込まれます (エンティティーは永続化され、Kafka トランザクションはコミットされます)。
4 Kafka トランザクションを開始します。
5 Kafka トランザクション内でペイロードを Kafka に送信します。
6 Hibernate Reactive トランザクションでエンティティーをデータベースに永続化します。
7 永続化操作が完了し、エラーが発生しない場合は、Kafka トランザクションがコミットされます。結果は省略され、HTTP 応答として返されます。

前の例では、データベーストランザクション (内部) がコミットされ、続いて Kafka トランザクション (外部) がコミットされます。最初に Kafka トランザクションをコミットし、次にデータベーストランザクションをコミットしたい場合は、それらを逆の順序でネストする必要があります。

以下は、Hibernate Reactive API (Panache なし) を使用する場合の例になります。

import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;

import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.hibernate.reactive.mutiny.Mutiny;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

@Path("/")
public class FruitProducer {

    @Channel("kafka") KafkaTransactions<Fruit> kafkaTx;

    @Inject Mutiny.SessionFactory sf; (1)

    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1)
    public Uni<Void> post(Fruit fruit) {
        Context context = Vertx.currentContext(); (2)
        return sf.withTransaction(session -> (3)
                kafkaTx.withTransaction(emitter -> (4)
                        session.persist(fruit).invoke(() -> emitter.send(fruit)) (5)
                ).emitOn(context::runOnContext) (6)
        );
    }
}
1 Hibernate Reactive SessionFactory を注入します。
2 呼び出し元の Vert.x コンテキストをキャプチャーします。
3 Hibernate Reactive トランザクションを開始します。
4 Kafka トランザクションを開始します。
5 ペイロードを永続化し、エンティティーを Kafka に送信します。
6 Kafka トランザクションは、Kafka プロデューサー送信者スレッドで終了します。開始したのと同じコンテキストで Hibernate Reactive トランザクションを終了するために、以前にキャプチャーした Vert.x コンテキストに切り替える必要があります。

29. ロギング

Kafkaクライアントによって書き込まれるログの量を減らすために、Quarkusは以下のログカテゴリーのレベルを WARNING に設定しています。

  • org.apache.kafka.clients

  • org.apache.kafka.common.utils

  • org.apache.kafka.common.metrics

以下の行を application.properties に追加することで、設定を上書きすることができます。

quarkus.log.category."org.apache.kafka.clients".level=INFO
quarkus.log.category."org.apache.kafka.common.utils".level=INFO
quarkus.log.category."org.apache.kafka.common.metrics".level=INFO

30. マネージド Kafka クラスターへの接続

このセクションでは、ちょっとクセのある Kafka Cloud Services に接続する方法について説明します。

30.1. Azure Event Hub

Azure Event Hub は、Apache Kafka と互換性のあるエンドポイントを提供します。

Azure Event Hubs for Kafka は、basic 層では使用できません。Kafka を使用するには、少なくとも standard 層が必要です。他のオプションについては、 Azure Event Hubs Pricing を参照してください。

TLS で Kafka プロトコルを使用して Azure Event Hub に接続するには、以下の設定が必要です。

kafka.bootstrap.servers=my-event-hub.servicebus.windows.net:9093 (1)
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ (2)
    username="$ConnectionString" \ (3)
    password="<YOUR.EVENTHUBS.CONNECTION.STRING>"; (4)
1 ポートは 9093 です。
2 JAAS の PlainLoginModule を使用する必要があります。
3 ユーザー名は $ConnectionString 文字列になります。
4 Azure が提供する Event Hub 接続文字列。

<YOUR.EVENTHUBS.CONNECTION.STRING> を Event Hubs 名前空間の接続文字列に置き換えます。接続文字列を取得する手順については、 Get an Event Hubs connection string を参照してください。結果は、以下のようになります。

kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://my-event-hub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

この設定は、(上記のように) グローバルにすることも、チャネル設定で設定することもできます。

mp.messaging.incoming.$channel.bootstrap.servers=my-event-hub.servicebus.windows.net:9093
mp.messaging.incoming.$channel.security.protocol=SASL_SSL
mp.messaging.incoming.$channel.sasl.mechanism=PLAIN
mp.messaging.incoming.$channel.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://my-event-hub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...";

30.2. Red Hat OpenShift Streams for Apache Kafka

Red Hat OpenShift Streams for Apache Kafka は、マネージド Kafka ブローカーを提供します。まず、 Getting started with the rhoas CLI for Red Hat OpenShift Streams for Apache Kafka の指示に従い、Kafka ブローカーインスタンスを作成します。作成した ServiceAccount に関連付けられているクライアント ID とクライアントシークレットをコピーしたことを確認してください。

続いて、以下のようにブローカーに接続するように Quarkus アプリケーションを設定できます。

kafka.bootstrap.servers=<connection url> (1)
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="${KAFKA_USERNAME}" \ (2)
  password="${KAFKA_PASSWORD}"; (3)
1 管理コンソールに指定されている接続文字列 (例: demo-c—​bjsv-ldd-cvavkc-a.bf2.kafka.rhcloud.com:443)
2 kafka ユーザー名 (サービスアカウントのクライアント ID)
3 kafka パスワード (サービスアカウントからのクライアントシークレット)
一般に、これらのプロパティーには %prod という接頭辞を付け、本番モードで動作しているときのみ有効化します。
Getting started with the rhoas CLI for Red Hat OpenShift Streams for Apache Kafka で説明されているように、Red Hat OpenShift Streams for Apache Kafka を使用するには、事前にトピックを作成し、Service Account を作成し、そのサービスアカウントからトピックの読み取りと書き込みを行うためのパーミッションを提供する必要があります。認証データ (クライアント ID とシークレット) はサービスアカウントに関連しています。つまり、詳細なアクセスパーミッションを実装し、トピックへのアクセスを制限できます。

Kubernetes を使用する場合は、クライアント ID とシークレットを Kubernetes シークレットに設定することをお勧めします。

apiVersion: v1
kind: Secret
metadata:
  name: kafka-credentials
stringData:
  KAFKA_USERNAME: "..."
  KAFKA_PASSWORD: "..."

Quarkus アプリケーションがそのシークレットを使用できるようにするには、application.properties ファイルに以下の行を追加します。

%prod.quarkus.openshift.env.secrets=kafka-credentials

30.2.1. Red Hat OpenShift Service Registry

Red Hat OpenShift Service Registry は、Kafka スキーマを処理するためのフルマネージドサービスレジストリーを提供します。

Getting started with Red Hat OpenShift Service Registry の説明に従って、または rhoas CLI を使用して、新しいサービスレジストリーのインスタンスを作成することができます。

rhoas service-registry create --name my-schema-registry

作成されたインスタンスの Registry URL を書き留めてください。認証には、以前に作成したものと同じ ServiceAccount を使用できます。サービスレジストリーにアクセスするために必要なパーミッションがあることを確認する必要があります。

たとえば、rhoas CLI を使用して、サービスアカウントに MANAGER ロールを付与することができます。

rhoas service-registry role add --role manager --service-account [SERVICE_ACCOUNT_CLIENT_ID]

続いて、以下のようにスキーマレジストリーに接続するように Quarkus アプリケーションを設定できます。

mp.messaging.connector.smallrye-kafka.apicurio.registry.url=${RHOAS_SERVICE_REGISTRY_URL} (1)
mp.messaging.connector.smallrye-kafka.apicurio.auth.service.token.endpoint=${RHOAS_OAUTH_TOKEN_ENDPOINT} (2)
mp.messaging.connector.smallrye-kafka.apicurio.auth.client.id=${RHOAS_CLIENT_ID} (3)
mp.messaging.connector.smallrye-kafka.apicurio.auth.client.secret=${RHOAS_CLIENT_ID} (4)
1 管理コンソールで指定されているサービスレジストリーの URL (例: https://bu98.serviceregistry.rhcloud.com/t/0e95af2c-6e11-475e-82ee-f13bd782df24/apis/registry/v2)
2 OAuth トークンエンドポイント URL (例: https://identity.api.openshift.com/auth/realms/rhoas/protocol/openid-connect/token)
3 クライアント ID (サービスアカウントから)
4 クライアントシークレット (サービスアカウントから)

30.2.2. Service Binding Operator を使用した Red Hat OpenShift マネージドサービスの Quarkus アプリケーションへのバインド

Quarkus アプリケーションが、Service Binding Operator および OpenShift Application Services オペレーターをインストールした Kubernetes または OpenShift クラスターにデプロイされている場合、Red Hat OpenShift Streams for Apache Kafka と Service Registry へのアクセスに必要な設定は、Kubernetes Service Binding を使用してアプリケーションに注入することができます。

Service Binding をセットアップするには、最初に OpenShift マネージドサービスをクラスターに接続する必要があります。OpenShift クラスターの場合は、Connecting a Kafka and Service Registry instance to your OpenShift cluster の指示に従います。

クラスタとRHOAS KafkaおよびService Registryインスタンスを接続したら、新しく作成したサービスアカウントに必要なパーミッションが付与されていることを確認してください。

次に、Kubernetes Service Binding 拡張を使用して、これらのサービス用の ServiceBinding リソースを生成するように Quarkus アプリケーションを構成できます。

quarkus.kubernetes-service-binding.detect-binding-resources=true

quarkus.kubernetes-service-binding.services.kafka.api-version=rhoas.redhat.com/v1alpha1
quarkus.kubernetes-service-binding.services.kafka.kind=KafkaConnection
quarkus.kubernetes-service-binding.services.kafka.name=my-kafka

quarkus.kubernetes-service-binding.services.serviceregistry.api-version=rhoas.redhat.com/v1alpha1
quarkus.kubernetes-service-binding.services.serviceregistry.kind=ServiceRegistryConnection
quarkus.kubernetes-service-binding.services.serviceregistry.name=my-schema-registry

この例では、Quarkus のビルドにより、次の ServiceBinding リソースが生成されます。

apiVersion: binding.operators.coreos.com/v1alpha1
kind: ServiceBinding
metadata:
  name: my-app-kafka
spec:
  application:
    group: apps.openshift.io
    name: my-app
    version: v1
    kind: DeploymentConfig
  services:
    - group: rhoas.redhat.com
      version: v1alpha1
      kind: KafkaConnection
      name: my-kafka
  detectBindingResources: true
  bindAsFiles: true
---
apiVersion: binding.operators.coreos.com/v1alpha1
kind: ServiceBinding
metadata:
  name: my-app-serviceregistry
spec:
  application:
    group: apps.openshift.io
    name: my-app
    version: v1
    kind: DeploymentConfig
  services:
    - group: rhoas.redhat.com
      version: v1alpha1
      kind: ServiceRegistryConnection
      name: my-schema-registry
  detectBindingResources: true
  bindAsFiles: true

Deploying to OpenShiftに従って、生成した ServiceBinding リソースを含むアプリケーションをデプロイすることが可能です。 KafkaとSchema Registryのインスタンスにアクセスするために必要な設定プロパティは、デプロイ時に自動的にアプリケーションに注入されます。

31. さらに詳しく

This guide has shown how you can interact with Kafka using Quarkus. It utilizes Quarkus Messaging to build data streaming applications.

さらに詳しく知りたい場合は、Quarkusで使用されている実装、 SmallRye Reactive Messaging のドキュメントを確認してください。

関連コンテンツ