The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Apache Pulsar リファレンスガイド

このリファレンスガイドでは、Quarkus アプリケーションが Quarkus Messaging を利用して Apache Pulsar と対話する仕組みを説明します。

1. はじめに

Apache Pulsar は、クラウド向けに構築されたオープンソースの分散メッセージングおよびストリーミングプラットフォームです。 階層型ストレージ機能により、マルチテナントの高性能ソリューションをサーバーメッセージングに提供します。

Pulsar はパブリッシュ - サブスクライブパターンを実装します。

  • プロデューサーは トピック にメッセージを公開します。

  • コンシューマーは、そのトピックへの サブスクリプション を作成し、着信メッセージを受信して処理します。処理が完了すると、ブローカーに 完了通知 を送信します。

  • サブスクリプションが作成されると、コンシューマーが切断されても、Pulsar はすべてのメッセージを保持します。 保持されたメッセージは、これらのメッセージがすべて正常に処理されたことをコンシューマーが確認した場合にのみ破棄されます。

Pulsar クラスターは、以下の要素で構成されます。

  • ステートレスコンポーネントである 1 つ以上のブローカー。

  • トピックのメタデータ、スキーマ、調整、クラスター設定を維持するための メタデータストア

  • メッセージの永続的な保存に使用される bookies のセット。

2. Apache Pulsar のための Quarkus エクステンション

Quarkus は、 SmallRye Reactive Messaging フレームワークを通じて Apache Pulsar のサポートを提供します。 Eclipse MicroProfile Reactive Messaging 仕様 3.0 に基づいて、CDI とイベント駆動型を橋渡しする柔軟なプログラミングモデルを提案します。

このガイドでは、Apache Pulsar と SmallRye Reactive Messaging フレームワークについて詳しく説明します。 クイックスタートについては、Apache Pulsar を使用した Quarkus Messaging の概要 を参照してください。

プロジェクトのベースディレクトリーで次のコマンドを実行すると、messaging-pulsar エクステンションをプロジェクトに追加できます。

コマンドラインインタフェース
quarkus extension add messaging-pulsar
Maven
./mvnw quarkus:add-extension -Dextensions='messaging-pulsar'
Gradle
./gradlew addExtension --extensions='messaging-pulsar'

これにより、ビルドファイルに次の内容が追加されます。

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-pulsar</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-pulsar")

このエクステンションには、pulsar-clients-original バージョン 3.0.0 が推移的依存関係として含まれており、Pulsar ブローカーバージョン 2.10.x との互換性があります。

3. SmallRye Pulsar コネクターの設定

SmallRye Reactive Messaging フレームワークは、Apache Kafka、Apache Pulsar、AMQP、Apache Camel、JMS、MQTT などのさまざまなメッセージングバックエンドをサポートしているため、汎用的な語彙を使用しています。

  • アプリケーションは メッセージ を送受信します。 Message は、ペイロード をラップし、いくつかの メタデータ で拡張できます。 これは、値とキーで構成される Pulsar Message と混同しないでください。 Pulsar コネクターでは、Reactive Messaging メッセージ が Pulsar メッセージ に対応します。

  • メッセージは チャネル を通過します。アプリケーションコンポーネントは、チャネルに接続してメッセージを公開および消費します。Pulsar コネクターは、チャネル を Pulsar トピック にマッピングします。

  • チャネルは コネクター を使用してメッセージバックエンドに接続します。 コネクターは、着信メッセージを特定のチャネル (アプリケーションが消費するチャネル) にマッピングし、特定のチャネルに送信された発信メッセージを収集するように設定されています。 各コネクターは、特定のメッセージングテクノロジーに特化しています。 たとえば、Pulsar を処理するコネクターの名前は、smallrye-pulsar となっています。

着信チャネルを持つ Pulsar コネクターの最小設定は次のようになります。

%prod.pulsar.client.serviceUrl=pulsar:6650 (1)
mp.messaging.incoming.prices.connector=smallrye-pulsar (2)
1 プロダクションプロファイルの Pulsar ブローカーサービス URL を設定します。 この URL は、mp.messaging.incoming.$channel.serviceUrl プロパティーを使用して、グローバルに、またはチャネルごとに設定できます。 開発モードおよびテスト実行時には、Dev Services for Pulsar が Pulsar ブローカーを自動的に起動します。
2 prices チャネルを管理するためのコネクターを設定します。 デフォルトでは、トピック 名はチャネル名と同じです。

トピック名は、トピック属性を設定することで上書きできます。

%prod 接頭辞は、アプリケーションが本番モードで実行される場合にのみプロパティーが使用されることを示します (つまり、開発モードまたはテストモードでは使用されません)。詳細は、プロファイルに関するドキュメント を参照してください。
コネクターの自動アタッチ

クラスパスに単一のコネクターがある場合は、connector 属性の設定を省略できます。 Quarkus は、孤立した チャネルをクラスパスにある (一意の) コネクターに自動的に関連付けます。 孤立した チャネルとは、ダウンストリームコンシューマーのない発信チャネル、またはアップストリームプロデューサーのない着信チャネルです。

この自動アタッチは、以下を使用して無効にできます。

quarkus.messaging.auto-connector-attachment=false

設定オプションの詳細は、Pulsar クライアントの設定 を参照してください。

4. Pulsar からのメッセージの受信

Pulsar コネクターは、Pulsar クライアントを使用して Pulsar ブローカーに接続し、コンシューマーを作成して Pulsar ブローカーからメッセージを受信します。また、各 Pulsar Message を Reactive Messaging Message にマッピングします。

4.1. 例

Pulsar ブローカーが実行中で、pulsar:6650 アドレスを使用してアクセスできるとします。 次のようにして、prices チャネルで Pulsar メッセージを受信するようにアプリケーションを設定します。

mp.messaging.incoming.prices.serviceUrl=pulsar://pulsar:6650 (1)
mp.messaging.incoming.prices.subscriptionInitialPosition=Earliest (2)
  1. Pulsar ブローカーサービスの URL を設定します。

  2. コンシューマーのサブスクリプションが Earliest 位置からメッセージの受信を開始することを確認します。

Pulsar トピックやコンシューマー名を設定する必要はありません。 デフォルトでは、コネクターはチャネル名 (prices) を使用します。 トピックとコンシューマー名は、topic 属性と consumerName 属性を設定してオーバーライドすることができます。

Pulsar では、コンシューマーがトピックサブスクリプションの subscriptionName を指定する必要があります。 指定されていない場合、コネクターが一意の サブスクリプション名 を生成します。

すると、アプリケーションが double ペイロードを直接受信できるようになります。

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

または、Reactive Messaging タイプ Message<Double> を取得することもできます。

@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
    // access record metadata
    var metadata = msg.getMetadata(PulsarIncomingMessageMetadata.class).orElseThrow();
    // process the message payload.
    double price = msg.getPayload();
    // Acknowledge the incoming message (acknowledge the Pulsar message back to the broker)
    return msg.ack();
}

Reactive Messaging の Message タイプを使用すると、コンシューマーメソッドが着信メッセージのメタデータにアクセスし、確認応答を手動で処理できます。

Pulsar メッセージオブジェクトに直接アクセスする場合は、次を使用します。

@Incoming("prices")
public void consume(org.apache.pulsar.client.api.Message<Double> msg) {
    String key = msg.getKey();
    String value = msg.getValue();
    String topic = msg.topicName();
    // ...
}

org.apache.pulsar.client.api.Message は、基盤となる Pulsar クライアントによって提供され、コンシューマーメソッドで直接使用できます。

または、次の例のように、アプリケーションで、チャネル名によって識別される Multi を Bean に注入し、そのイベントをサブスクライブすることもできます。

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }
}

@Channel でメッセージを消費する場合、アプリケーションコードがサブスクリプションを行います。 上記の例では、Quarkus REST (旧称 RESTEasy Reactive) エンドポイントがサブスクリプションを処理します。

チャネルとして注入できるのは、以下のタイプです。

@Inject @Channel("prices") Multi<Double> streamOfPayloads;

@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;

@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;

@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

前出の Message の例と同様に、注入されたチャネルがペイロード (Multi<T>) を受信した場合は、そのチャネルが自動的にメッセージを確認応答し、複数のサブスクライバーをサポートします。 注入されたチャネルが Message (Multi<Message<T>>) を受信した場合は、手動で確認応答とブロードキャストを行う必要があります。

4.2. ブロッキング処理

リアクティブメッセージングは、I/O スレッドでメソッドを呼び出します。このトピックの詳細については、 Quarkus リアクティブアーキテクチャーのドキュメント を参照してください。ただし、多くの場合、リアクティブメッセージングとデータベースインタラクションなどのブロック処理を組み合わせる必要があります。このためには、処理が ブロッキング であり、呼び出し元のスレッドで実行するべきではないことを示す @Blocking アノテーションを使用する必要があります。

例えば、以下のコードは、Hibernate with Panacheを 使用してデータベースに受信ペイロードを格納する方法を示しています。

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

@Blocking アノテーションは 2 つあります。

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

効果はどちらも同じです。したがって、両方を使うことができます。1番目のものは、使用するワーカープールや順序を保持するかどうかなど、より細かい調整が可能です。2 番目のものは、Quarkus の他のリアクティブ機能でも使用され、デフォルトのワーカープールを使用し、順序を保持します。

@Blocking アノテーションの使用法の詳細については、 SmallRye Reactive Messaging – Handling blocking execution を参照してください。

@RunOnVirtualThread

Java 仮想スレッドでの ブロッキング処理の実行については、 Quarkus Virtual Thread support with Reactive Messagingのドキュメント を参照してください。

@Transactional

メソッドに @Transactional アノテーションが付けられている場合、メソッドに @Blocking アノテーションが付けられていなくても、自動的に blocking と見なされます。

4.3. Pulsar のサブスクリプションタイプ

Pulsar の subscriptionType コンシューマー設定は、パブリッシュ - サブスクライブやキューイングなど、さまざまなメッセージングシナリオを実現するために柔軟に使用できます。

  • Exclusive サブスクリプションタイプ使用すると、"ファンアウト型の pub/sub メッセージング" 用に 一意のサブスクリプション名 を指定できます。これがデフォルトのサブスクリプションタイプです。

  • SharedKey_Shared、または Failover サブスクリプションタイプを使用すると、複数のコンシューマーで 同じサブスクリプション名 を共有して、コンシューマー間で "メッセージキューイング" を実現できます。

サブスクリプション名が指定されていない場合、Quarkus は一意の ID を生成します。

4.4. デシリアライゼーションと Pulsar Schema

Pulsar コネクターを使用すると、基盤となる Pulsar コンシューマーのスキーマ設定を設定できます。 詳細は、Pulsar Schema 設定と自動スキーマ検出 を参照してください。

4.5. 確認応答ストラテジー

Pulsar メッセージから生成されたメッセージが 確認応答 されると、コネクターが 確認応答リクエスト を Pulsar ブローカーに送信します。 すべての Reactive Messaging メッセージが 確認応答 される必要があります。ほとんどの場合、これは自動的に処理されます。 確認応答リクエストは、次の 2 つのストラテジーを使用して Pulsar ブローカーに送信できます。

  • 個別確認応答 は、デフォルトのストラテジーです。確認応答リクエストが各メッセージに対してブローカーに送信されます。

  • 累積確認応答 は、ack-strategy=cumulative を使用して設定されるストラテジーです。コンシューマーが受信した最後のメッセージのみを確認します。 提供されたメッセージまでのストリーム内の全メッセージは、そのコンシューマーに再配信されません。

デフォルトでは、Pulsar コンシューマーは、ブローカーからの確認応答の確認を待たずに確認応答を検証します。 ackReceiptEnabled=true を使用してこれを有効にできます。

4.6. 失敗処理ストラテジー

Pulsar メッセージから生成されたメッセージが 否定応答 された場合、失敗ストラテジーが適用されます。 Quarkus Pulsar エクステンションは、4 つのストラテジーをサポートしています。

  • nack (デフォルト) は、ブローカーに 否定応答 を送信し、ブローカーがこのメッセージをコンシューマーに再配信するようにトリガーします。 否定応答は、negativeAckRedeliveryDelayMicros プロパティーと negativeAck.redeliveryBackoff プロパティーを使用してさらに設定できます。

  • fail の場合、アプリケーションが失敗します。メッセージはそれ以上処理されません。

  • ignore の場合、失敗がログに記録されますが、確認応答ストラテジーが適用され、処理が続行されます。

  • continue の場合、失敗がログに記録されますが、確認応答や否定応答を適用せずに処理が続行されます。このストラテジーは、確認応答タイムアウト 設定とともに使用できます。

  • reconsume-later は、reconsumeLater API を使用して、遅延して再消費されるメッセージを 再試行レタートピック に送信します。 遅延は reconsumeLater.delay プロパティーを使用して設定でき、デフォルトは 3 秒です。 失敗メタデータに io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata のインスタンスを追加することで、メッセージごとにカスタムの遅延またはプロパティーを設定できます。

4.6.1. 確認応答タイムアウト

Pulsar クライアントは、否定応答と同様に、 確認応答タイムアウト メカニズムにより、指定された ackTimeout 期間にわたって、確認応答されていないメッセージを追跡し、ブローカーに 確認応答されていないメッセージの再配信リクエスト を送信します。これを受けて、ブローカーは確認応答されていないメッセージをコンシューマーに再送信します。

タイムアウトと再配信バックオフメカニズムを設定するには、ackTimeoutMillis プロパティーと ackTimeout.redeliveryBackoff プロパティーを設定します。 ackTimeout.redeliveryBackoff には、最小遅延 (ミリ秒単位)、最大遅延 (ミリ秒単位)、および乗数をコンマ区切り値として指定できます。

mp.messaging.incoming.out.failure-strategy=continue
mp.messaging.incoming.out.ackTimeoutMillis=10000
mp.messaging.incoming.out.ackTimeout.redeliveryBackoff=1000,60000,2

4.6.2. 後の再消費と再試行レタートピック

再試行レタートピック は、正常に消費されなかったメッセージをデッドレタートピックにプッシュし、メッセージの消費を継続します。 デッドレタートピックは、確認応答タイムアウト、否定応答、再試行レタートピックなどのさまざまなメッセージ再配信方法で使用できることに注意してください。

mp.messaging.incoming.data.failure-strategy=reconsume-later
mp.messaging.incoming.data.reconsumeLater.delay=5000
mp.messaging.incoming.data.enableRetry=true
mp.messaging.incoming.data.negativeAck.redeliveryBackoff=1000,60000,2

4.6.3. デッドレタートピック

デッドレタートピック は、正常に消費されなかったメッセージをデッドレタートピックにプッシュし、メッセージの消費を継続します。 デッドレタートピックは、確認応答タイムアウト、否定応答、再試行レタートピックなどのさまざまなメッセージ再配信方法で使用できることに注意してください。

mp.messaging.incoming.data.failure-strategy=nack
mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount=2
mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic
mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName=my-dlq-subscription
mp.messaging.incoming.data.subscriptionType=Shared

否定応答 または 確認応答タイムアウト 再配信方法では、少なくとも未処理のメッセージを含むメッセージのバッチ全体が再配信されます。 詳細は、プロデューサーのバッチ処理 を参照してください。

4.7. Pulsar メッセージの一括受信

デフォルトでは、incoming メソッドは各 Pulsar メッセージを個別に受信します。 batchReceive=true プロパティーを使用するか、コンシューマー設定で batchReceivePolicy を指定することで、バッチモードを有効にすることができます。

@Incoming("prices")
public CompletionStage<Void> consumeMessage(PulsarIncomingBatchMessage<Double> messages) {
    for (PulsarMessage<Double> msg : messages) {
        msg.getMetadata(PulsarIncomingMessageMetadata.class).ifPresent(metadata -> {
            String key = metadata.getKey();
            String topic = metadata.getTopicName();
            long timestamp = metadata.getEventTime();
            //... process messages
        });
    }
    // ack will commit the latest offsets (per partition) of the batch.
    return messages.ack();
}

@Incoming("prices")
public void consumeRecords(Messages<Double> messages) {
    for (Message<Double> msg : messages) {
        //... process messages
    }
}

または、consume メソッドにペイロードのリストを直接受信することもできます。

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

Quarkus は、着信チャネルのバッチタイプを自動検出し、バッチ設定を自動的に設定します。 mp.messaging.incoming.$channel.batchReceive プロパティーを使用して、バッチモードを明示的に設定できます。

5. Pulsar へのメッセージの送信

Pulsar コネクターは、Reactive Messaging メッセージを Pulsar メッセージとして書き込むことができます。

5.1. 例

Pulsar ブローカーが実行中で、pulsar:6650 アドレスを使用してアクセスできるとします。 次のように、prices チャネルからのメッセージを Pulsar メッセージに書き込むようにアプリケーションを設定します。

mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650 (1)
  1. Pulsar ブローカーサービスの URL を設定します。

Pulsar トピックやプロデューサー名を設定する必要はありません。 デフォルトでは、コネクターはチャネル名 (prices) を使用します。 トピックとコンシューマー名は、topic 属性と producerName 属性を設定してオーバーライドすることができます。

次に、アプリケーションは、Message<Double>prices チャネルに送信する必要があります。次のスニペットのように、double ペイロードを使用できます。

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class PulsarPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

generate メソッドは、Multi<Double> を返します。これは、Flow.Publisher インターフェイスを実装します。 このパブリッシャーは、メッセージを生成し、設定された Pulsar トピックに送信するために、フレームワークによって使用されます。

ペイロードを返す代わりに、io.smallrye.reactive.messaging.pulsar.OutgoingMessage を返して、Pulsar メッセージを送信することもできます。

@Outgoing("out")
public Multi<OutgoingMessage<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> OutgoingMessage.of("my-key", random.nextDouble()));
}

ペイロードを org.eclipse.microprofile.reactive.messaging.Message 内にラップして、書き込まれたレコードをより詳細に制御できます。

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(PulsarOutgoingMessageMetadata.builder()
                            .withKey("my-key")
                            .withProperties(Map.of("property-key", "value"))
                            .build()));
}

Messages を送信するときに io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata のインスタンスを追加して、メッセージが Pulsar に書き込まれる方法に影響を与えることができます。

発信メソッドは、Flow.Publisher を返すメソッドシグネチャーの他に、単一のメッセージを返すこともできます。 この場合、プロデューサーがこのメソッドをジェネレーターとして使用して無限のストリームを作成します。

@Outgoing("prices-out") T generate(); // T excluding void

@Outgoing("prices-out") Message<T> generate();

@Outgoing("prices-out") Uni<T> generate();

@Outgoing("prices-out") Uni<Message<T>> generate();

@Outgoing("prices-out") CompletionStage<T> generate();

@Outgoing("prices-out") CompletionStage<Message<T>> generate();

5.2. シリアライゼーションと Pulsar Schema

Pulsar コネクターを使用すると、基盤となる Pulsar プロデューサーのスキーマ設定を設定できます。 詳細は、Pulsar Schema 設定と自動スキーマ検出 を参照してください。

5.3. キー/値のペアの送信

キー/値のペアを Pulsar に送信するには、Pulsar プロデューサースキーマを KeyValue スキーマを使用して設定できます。

package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class PulsarKeyValueExample {

    @Identifier("out")
    @Produces
    Schema<KeyValue<String, Long>> schema = Schema.KeyValue(Schema.STRING, Schema.INT64);

    @Incoming("in")
    @Outgoing("out")
    public KeyValue<String, Long> process(long in) {
        return new KeyValue<>("my-key", in);
    }

}

書き込まれたレコードをさらに制御する必要がある場合は、PulsarOutgoingMessageMetadata を使用します。

5.4. 確認応答

Pulsar ブローカーは、プロデューサーからメッセージを受信すると、メッセージに MessageId を割り当ててプロデューサーに送り返し、メッセージが公開されたことを確認します。

デフォルトでは、コネクターは Pulsar がレコードを確認応答するのを待ってから、処理 (受信した Message の確認応答) を続行します。 これを無効にするには、waitForWriteCompletion 属性を false に設定します。

レコードを書き込めない場合、メッセージは nacked になります。

Pulsar クライアントは、失敗した場合に、 送信タイムアウト に達するまで、自動的にメッセージの送信を再試行します。 送信タイムアウト は、 sendTimeoutMs 属性で設定可能で、デフォルトでは 30 秒です。

5.5. バックプレッシャーとインフライトレコード

Pulsar 発信コネクターは、バックプレッシャーを処理し、Pulsar ブローカーへの書き込みを待機している保留メッセージの数を監視します。 保留メッセージの数は、maxPendingMessages 属性を使用して設定され、デフォルトは 1000 です。

コネクターは、その数のメッセージのみを同時に送信します。 少なくとも 1 つの保留メッセージがブローカーによって確認応答されるまで、他のメッセージは送信されません。 次に、ブローカーの保留メッセージの 1 つが確認応答されると、コネクターが Pulsar に新しいメッセージを書き込みます。

また、maxPendingMessages0 に設定することで、保留メッセージの制限を解除することもできます。 Pulsar では、maxPendingMessagesAcrossPartitions を使用して、パーティションごとの保留メッセージの数を設定することもできます。

5.6. プロデューサーのバッチ処理

デフォルトでは、Pulsar プロデューサーは個々のメッセージをまとめてバッチ処理し、ブローカーに公開します。 バッチ処理のパラメーターは、batchingMaxPublishDelayMicrosbatchingPartitionSwitchFrequencyByPublishDelaybatchingMaxMessagesbatchingMaxBytes 設定プロパティーを使用して設定できます。batchingEnabled=false で完全に無効にすることもできます。

Key_Shared コンシューマーサブスクリプションを使用する場合、batcherBuilderBatcherBuilder.KEY_BASED に設定できます。

6. Pulsar トランザクションと exactly-once 処理

Pulsar トランザクション を使用すると、イベントストリーミングアプリケーションが 1 つのアトミック操作でメッセージを消費、処理、生成できます。

トランザクションを使用すると、1 つまたは複数のプロデューサーが複数のトピックにメッセージのバッチを送信できるようになります。バッチ内のすべてのメッセージが、最終的にすべてのコンシューマーにみえるか、またはコンシューマーに一切みえなくなります。

使用するには、ブローカー設定で transactionCoordinatorEnabled=true および systemTopicEnabled=true を使用して、トランザクションのサポートを有効にする必要があります。

クライアント側でも、PulsarClient 設定でトランザクションのサポートを有効にする必要があります。

mp.messaging.outgoing.tx-producer.enableTransaction=true

Pulsar コネクターは、トランザクション内にレコードを書き込むための PulsarTransactions カスタムエミッターを提供します。

これは、通常のエミッター @Channel として使用できます。

package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.OutgoingMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;

@ApplicationScoped
public class PulsarTransactionalProducer {

    @Inject
    @Channel("tx-out-example")
    PulsarTransactions<OutgoingMessage<Integer>> txProducer;

    @Inject
    @Channel("other-producer")
    PulsarTransactions<String> producer;

    @Incoming("in")
    public Uni<Void> emitInTransaction(Message<Integer> in) {
        return txProducer.withTransaction(emitter -> {
            emitter.send(OutgoingMessage.of("a", 1));
            emitter.send(OutgoingMessage.of("b", 2));
            emitter.send(OutgoingMessage.of("c", 3));
            producer.send(emitter, "4");
            producer.send(emitter, "5");
            producer.send(emitter, "6");
            return Uni.createFrom().completionStage(in::ack);
        });
    }

}

withTransaction メソッドに渡される関数は、レコードを生成する TransactionalEmitter を受け取り、トランザクションの結果を提供する Uni を返します。 処理が正常に完了すると、プロデューサーがフラッシュされ、トランザクションがコミットされます。 処理中に例外が発生したり、失敗した Uni が返されたり、TransactionalEmitter が中止の対象としてマークされたりすると、トランザクションは中止されます。

複数のトランザクションプロデューサーが 1 つのトランザクションに参加できます。 これにより、すべてのメッセージが開始したトランザクションを使用して確実に送信されます。また、トランザクションがコミットされる前に、参加しているすべてのプロデューサーがフラッシュされます。

このメソッドが Vert.x コンテキストで呼び出されると、処理関数もそのコンテキストで呼び出されます。 それ以外の場合は、プロデューサーの送信スレッドで呼び出されます。

6.1. Exactly-Once 処理

Pulsar Transactions API を使用すると、生成されたメッセージとともに、トランザクション内のコンシューマーオフセットを管理することもできます。 これにより、コンシューマーとトランザクションプロデューサーを consume-transform-produce パターンでカップリングすることができます。これは exactly-once 処理としても知られています。 これは、アプリケーションがメッセージを消費および処理し、結果をトピックに公開して、消費されたメッセージのオフセットをトランザクションでコミットすることを意味します。

PulsarTransactions エミッターは、トランザクション内の着信 Pulsar メッセージに exactly-once 処理を適用する方法も提供します。

次の例には、トランザクション内の Pulsar メッセージのバッチが含まれています。

mp.messaging.outgoing.tx-out-example.enableTransaction=true
# ...
mp.messaging.incoming.in-channel.enableTransaction=true
mp.messaging.incoming.in-channel.batchReceive=true
package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;

@ApplicationScoped
public class PulsarExactlyOnceProcessor {

    @Inject
    @Channel("tx-out-example")
    PulsarTransactions<Integer> txProducer;

    @Incoming("in-channel")
    public Uni<Void> emitInTransaction(PulsarIncomingBatchMessage<Integer> batch) {
        return txProducer.withTransactionAndAck(batch, emitter -> {
            for (PulsarMessage<Integer> record : batch) {
                emitter.send(PulsarMessage.of(record.getPayload() + 1, record.getKey()));
            }
            return Uni.createFrom().voidItem();
        });
    }

}

処理が正常に完了すると、トランザクション内でメッセージが確認応答され、トランザクションがコミットされます。

exactly-once を使用する場合、累積的にではなく、個別にのみメッセージを確認応答できます。

処理を中止する必要がある場合、メッセージは否定応答されます。処理を再試行したり、フェイルストップしたりするために、いずれかの失敗ストラテジーを使用することができます。 トランザクションが失敗して中止された場合、withTransaction から返される Uni は失敗することに注意してください。

アプリケーションでエラーケースを処理することもできます。しかし、メッセージの消費を続行するには、@Incoming メソッドから返される Uni が失敗しないようにする必要があります。 PulsarTransactions#withTransactionAndAck メソッドは、メッセージの確認応答と否定応答を行いますが、リアクティブストリームを停止しません。 失敗を無視すると、最後にコミットされたオフセットにコンシューマーがリセットされ、そこから処理が再開されます。

失敗発生時に重複を回避するために、ブローカー側でメッセージの重複排除とバッチインデックスレベルの確認応答を有効にすることを推奨します。

quarkus.pulsar.devservices.broker-config.brokerDeduplicationEnabled=true
quarkus.pulsar.devservices.broker-config.brokerDeduplicationEntriesInterval=1000
quarkus.pulsar.devservices.broker-config.brokerDeduplicationSnapshotIntervalSeconds=3000
quarkus.pulsar.devservices.broker-config.acknowledgmentAtBatchIndexLevelEnabled=3000

mp.messaging.incoming.data.batchIndexAckEnabled=true

7. Pulsar Schema 設定と自動スキーマ検出

Pulsar メッセージは、ペイロードとともに非構造化バイト配列として保存されます。 Pulsar スキーマ は、構造化データを raw メッセージバイトにシリアライズする方法を定義します。 スキーマ は、強制的なデータ構造での書き込みと読み取りを行うために、プロデューサーとコンシューマーに適用されます。 スキーマは、データがトピックに公開される前にデータを raw バイトにシリアライズします。また、raw バイトがコンシューマーに配信される前に raw バイトをデシリアライズします。

Pulsar は、登録されたスキーマ情報を保存する中央リポジトリーとしてスキーマレジストリーを使用します。 これにより、プロデューサー/コンシューマーがブローカーを通じてトピックのメッセージのスキーマを調整できるようになります。 デフォルトでは、スキーマの保存に Apache BookKeeper が使用されます。

Pulsar API は、多数の プリミティブ型 および complex 型 (Key/Value、Avro、Protobuf など) に関する組み込みのスキーマ情報を提供します。

Pulsar コネクターでは、schema プロパティーを使用してスキーマをプリミティブ型として指定できます。

mp.messaging.incoming.prices.connector=smallrye-pulsar
mp.messaging.incoming.prices.schema=INT32

mp.messaging.outgoing.prices-out.connector=smallrye-pulsar
mp.messaging.outgoing.prices-out.schema=DOUBLE

schema プロパティーの値が スキーマタイプ と一致する場合、そのタイプを使用した単純なスキーマが作成され、該当するチャネルに使用されます。

Pulsar コネクターを使用すると、@Identifier 修飾子で識別される Schema Bean を CDI を通じて提供することで、複雑なスキーマタイプを設定できます。

たとえば、次の Bean は JSON スキーマとキー/値スキーマを提供します。

package pulsar.configuration;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class PulsarSchemaProvider {

    @Produces
    @Identifier("user-schema")
    Schema<User> userSchema = Schema.JSON(User.class);

    @Produces
    @Identifier("a-channel")
    Schema<KeyValue<Integer, User>> keyValueSchema() {
        return Schema.KeyValue(Schema.INT32, Schema.JSON(User.class), KeyValueEncodingType.SEPARATED);
    }

    public static class User {
        String name;
        int age;

    }
}

定義されたスキーマを使用して着信チャネル users を設定するには、schema プロパティーをスキーマ user-schema の識別子に設定する必要があります。

mp.messaging.incoming.users.connector=smallrye-pulsar
mp.messaging.incoming.users.schema=user-schema

schema プロパティーが見つからない場合、コネクターはチャネル名で識別される Schema Bean を検索します。 たとえば、発信チャネル a-channel はキー/値スキーマを使用します。

mp.messaging.outgoing.a-channel.connector=smallrye-pulsar

スキーマ情報が提供されていない場合、着信チャネルは Schema.AUTO_CONSUME() を使用します。一方、発信チャネルは Schema.AUTO_PRODUCE_BYTES() スキーマを使用します。

7.1. 自動スキーマ検出

Quarkus Messaging Pulsar (io.quarkus:quarkus-messaging-pulsar) を使用すると、Quarkus は多くの場合、設定する正しい Pulsar Schema を自動的に検出できます。 この自動検出は、@Incoming メソッドと @Outgoing メソッドの宣言、および注入された @Channel に基づいています。

たとえば、以下のように宣言した場合

@Outgoing("generated-price")
public Multi<Integer> generate() {
    ...
}

このとき、設定において generated-price チャネルで smallrye-pulsar コネクターを使用するよう指定されていると、Quarkus は generated-price チャネルの schema 属性を Pulsar Schema INT32 に自動的に設定します。

同様に、以下を宣言した場合

@Incoming("my-pulsar-consumer")
public void consume(org.apache.pulsar.api.client.Message<byte[]> record) {
    ...
}

設定において my-pulsar-consumer チャネルで smallrye-pulsar コネクターを使用するよう指定されていると、Quarkus は schema 属性を Pulsar BYTES Schema に自動的に設定します。

最後に、以下を宣言した場合

@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;

設定において price-create チャネルが smallrye-pulsar コネクターを使用するよう指定されていると、Quarkus は schema を Pulsar INT64 Schema に自動的に設定します。

Pulsar Schema の自動検出でサポートされているすべてのタイプは、次のとおりです。

  • short および java.lang.Short

  • int および java.lang.Integer

  • long および java.lang.Long

  • float および java.lang.Float

  • double および java.lang.Double

  • byte[]

  • java.time.Instant

  • java.sql.Timestamp

  • java.time.LocalDate

  • java.time.LocalTime

  • java.time.LocalDateTime

  • java.nio.ByteBuffer

  • Avro スキーマから生成されたクラスと Avro GenericRecord は、AVRO スキーマタイプで設定されます。

  • Protobuf スキーマから生成されたクラスは、PROTOBUF スキーマタイプで設定されます。

  • その他のクラスは、自動的に JSON スキーマタイプで設定されます。

JSON スキーマタイプは、スキーマ検証を適用することに注意してください。

Pulsar が提供するこれらのスキーマに加えて、Quarkus は 検証を適用することなく 次のスキーマ実装を提供します。

  • io.vertx.core.buffer.Buffer は、io.quarkus.pulsar.schema.BufferSchema スキーマで設定されます。

  • io.vertx.core.json.JsonObject は、io.quarkus.pulsar.schema.JsonObjectSchema スキーマで設定されます。

  • io.vertx.core.json.JsonArray は、io.quarkus.pulsar.schema.JsonArraySchema スキーマで設定されます。

  • スキーマレス JSON シリアライゼーションでは、schema 設定が ObjectMapper<fully_qualified_name_of_the_bean> に設定されている場合、Jackson ObjectMapper を使用してスキーマが生成されます。このとき、Pulsar Schema 検証は適用されません。 io.quarkus.pulsar.schema.ObjectMapperSchema を使用すると、検証なしで JSON スキーマを明示的に設定できます。

schema が設定されている場合、自動検出によって置き換えられることはありません。

シリアライザーの自動検出に問題がある場合は、quarkus.messaging.pulsar.serializer-autodetection.enabled=false を設定することで、これを完全にオフにすることができます。 オフにする必要があると認識した場合は、Quarkus issue tracker にバグを報告していただければ、どのような問題でも修正することができます。

8. Dev Services for Pulsar

With Quarkus Messaging Pulsar extension (quarkus-messaging-pulsar) Dev Services for Pulsar automatically starts a Pulsar broker in dev mode and when running tests. So, you don’t have to start a broker manually. The application is configured automatically.

8.1. Enabling / Disabling Dev Services for Pulsar

Dev Services for Pulsar is automatically enabled unless:

  • quarkus.pulsar.devservices.enabled is set to false

  • the pulsar.client.serviceUrl is configured

  • all the Reactive Messaging Pulsar channels have the serviceUrl attribute set

Dev Services for Pulsar relies on Docker to start the broker. If your environment does not support Docker, you will need to start the broker manually, or connect to an already running broker. You can configure the broker address using pulsar.client..

8.2. 共有ブローカー

Most of the time you need to share the broker between applications. Dev Services for Pulsar implements a service discovery mechanism for your multiple Quarkus applications running in dev mode to share a single broker.

Dev Services for Pulsar starts the container with the quarkus-dev-service-pulsar label which is used to identify the container.

If you need multiple (shared) brokers, you can configure the quarkus.pulsar.devservices.service-name attribute and indicate the broker name. It looks for a container with the same value, or starts a new one if none can be found. The default service name is pulsar.

Sharing is enabled by default in dev mode, but disabled in test mode. You can disable the sharing with quarkus.pulsar.devservices.shared=false.

8.3. ポートの設定

By default, Dev Services for Pulsar picks a random port and configures the application. You can set the port by configuring the quarkus.pulsar.devservices.port property.

Note that the Pulsar advertised address is automatically configured with the chosen port.

8.4. イメージの設定

Dev Services for Pulsar supports the official Apache Pulsar image.

A custom image name can be configured as such:

quarkus.pulsar.devservices.image-name=datastax/lunastreaming-all:2.10_4.7

8.5. Configuring the Pulsar broker

You can configure the Dev Services for Pulsar with custom broker configuration.

The following example enables transaction support:

quarkus.pulsar.devservices.broker-config.transaction-coordinator-enabled=true
quarkus.pulsar.devservices.broker-config.system-topic-enabled=true

9. Pulsar クライアントの設定

Pulsar のクライアント、コンシューマー、プロデューサーは、Pulsar クライアントアプリケーションの動作を設定するために、細かくカスタマイズすることが可能です。

Pulsar コネクターは、Pulsar クライアントと、チャネルごとのコンシューマーまたはプロデューサーを作成します。それぞれに実用的なデフォルトが設定されているため、設定が容易です。 作成は処理されますが、利用できる設定オプションは、すべて Pulsar チャネルを通じて設定可能です。

通常、PulsarClientPulsarConsumer、`PulsarProducer`を作成するには、ビルダー API を使用します。ビルダー API は、その性質上、実装に渡す設定オブジェクトを毎回ビルドします。 該当する設定オブジェクトは、 ClientConfigurationDataConsumerConfigurationData、 および ProducerConfigurationData です。

Pulsar コネクターを使用すると、これらの設定オブジェクトのプロパティーを直接受信できます。 たとえば、PulsarClient のブローカー認証情報は、authPluginClassName および authParams プロパティーを使用して受信されます。 受信チャネル data の認証を設定するには、以下のように指定します。

mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://localhost:6650
mp.messaging.incoming.data.topic=topic
mp.messaging.incoming.data.subscriptionInitialPosition=Earliest
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"}

Pulsar コンシューマーのプロパティー subscriptionInitialPosition も、列挙値 SubscriptionInitialPosition.Earliest として表される Earliest 値で設定されていることに注意してください。

この方法は、ほとんどの設定ケースに対応できます。 ただし、CryptoKeyReaderServiceUrlProvider などのシリアライズできないオブジェクトは、この方法では設定できません。 Pulsar コネクターを使用すると、Pulsar 設定データオブジェクト ClientConfigurationDataConsumerConfigurationDataProducerConfigurationData のインスタンスを考慮できます。

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

class PulsarConfig {

    @Produces
    @Identifier("my-consumer-options")
    public ConsumerConfigurationData<String> getConsumerConfig() {
        ConsumerConfigurationData<String> data = new ConsumerConfigurationData<>();
        data.setAckReceiptEnabled(true);
        data.setCryptoKeyReader(DefaultCryptoKeyReader.builder()
                //...
                .build());
        return data;
    }
}

このインスタンスは、コネクターが使用するクライアントを設定するために取得および使用されます。 client-configurationconsumer-configuration、または producer-configuration 属性を使用してクライアントの名前を指定する必要があります。

mp.messaging.incoming.prices.consumer-configuration=my-consumer-options

[client|consumer|producer]-configuration が設定されていない場合、コネクターはチャネル名で識別されるインスタンスを検索します。

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public ClientConfigurationData getClientConfig() {
        ClientConfigurationData data = new ClientConfigurationData();
        data.setEnableTransaction(true);
        data.setServiceUrlProvider(AutoClusterFailover.builder()
                // ...
                .build());
        return data;
    }
}

また、キーごとの設定値を含む Map<String, Object> を指定することもできます。

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import java.util.Map;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public Map<String, Object> getProducerConfig() {
        return Map.of(
                "batcherBuilder", BatcherBuilder.KEY_BASED,
                "sendTimeoutMs", 3000,
                "customMessageRouter", new PartialRoundRobinMessageRouterImpl(4));
    }
}

さまざまな設定ソースは、重要度の最も低いものから最も高いものの順に、次の優先順序で読み込まれます。

  1. デフォルトの設定識別子 (default-pulsar-clientdefault-pulsar-consumerdefault-pulsar-producer) を使用して生成された Map<String, Object> 設定マップ

  2. 設定内またはチャネル名内の識別子を使用して生成された Map<String, Object> 設定マップ

  3. チャネル設定内またはチャネル名内の識別子を使用して生成された [Client|Producer|Consuemr]ConfigurationData オブジェクト

  4. [Client|Producer|Consuemr]ConfigurationData フィールド名を使用して名前が付けられたチャネル設定プロパティー

設定オプションの完全なリストについては、設定リファレンス を参照してください。

9.1. Pulsar 認証の設定

Pulsar は、プラグ可能な認証フレームワークを提供します。Pulsar ブローカー/プロキシーは、このメカニズムを使用してクライアントを認証します。

クライアントは、authPluginClassName および authParams 属性を使用して application.properties ファイルで設定できます。

pulsar.client.serviceUrl=pulsar://pulsar:6650
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
pulsar.client.authParams={"userId":"superuser","password":"admin"}

または、プログラムで設定することもできます。

import java.util.Map;

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public ClientConfigurationData config() {
        var data = new ClientConfigurationData();
        var auth = new AuthenticationBasic();
        auth.configure(Map.of("userId", "superuser", "password", "admin"));
        data.setAuthentication(auth);
        return data;
    }
}

9.1.1. Datastax Luna Streaming へのアクセスの設定

Luna Streaming は、DataStax のツールとサポートを備えた、Apache Pulsar の製品版ディストリビューションです。 DataStax Luna Pulsar テナントを作成したら、自動生成されたトークンをメモし、トークン認証を設定します。

pulsar.client.serviceUrl=pulsar+ssl://pulsar-aws-eucentral1.streaming.datastax.com:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
pulsar.client.authParams=token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2ODY4MTc4MzQsImlzcyI6ImRhdGFzdGF4Iiwic3ViIjoiY2xpZW50OzA3NGZhOTI4LThiODktNDBhNC04MDEzLWNlNjVkN2JmZWIwZTtjSEpwWTJWejsyMDI5ODdlOGUyIiwidG9rZW5pZCI6IjIwMjk4N2U4ZTIifQ....

必ず事前にトピックを作成するか、namespace 設定で 自動トピック作成 を有効にしてください。

トピック設定では、トピックの完全な名前を参照する必要があることに注意してください。

mp.messaging.incoming.prices.topic=persistent://my-tenant/default/prices

9.1.2. StreamNative Cloud へのアクセスの設定

StreamNative Cloud は、フルマネージドの Pulsar-as-a-Service です。完全ホスト型、パブリッククラウド上で StreamNative によって管理する方式、Kubernetes 上で自己管理する方式など、さまざまなデプロイ方法で利用できます。

StreamNative Pulsar クラスターは Oauth2 認証を使用します。そのため、アプリケーションが使用している Pulsar namespace/トピックへの権限 を持つ サービスアカウント が存在することを確認する必要があります。

次に、サービスアカウントの キーファイル (秘密鍵 として機能します) をダウンロードし、発行者 URL (通常は https://auth.streamnative.cloud/) とクラスターの オーディエンス (urn:sn:pulsar:o-rf3ol:redhat など) をメモする必要があります。 この手順を行う際には、StreamNative Cloud コンソールの Admin セクションにある Pulsar Clients ページが役立ちます。

Pulsar Oauth2 認証を使用してアプリケーションを設定するには、以下のように指定します。

pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
pulsar.client.authParams={"type":"client_credentials","privateKey":"data:application/json;base64,<base64-encoded value>","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:o-rfwel:redhat"}

pulsar.client.authParams 設定には、issuerUrlaudience、および data:application/json;base64,<base64-encoded-key-file> という形式の privateKey を含む Json 文字列が含まれていることに注意してください。

または、プログラムで認証を設定することもできます。

package org.acme.pulsar;

import java.net.MalformedURLException;
import java.net.URL;

import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

@ApplicationScoped
public class PulsarAuth {

    @ConfigProperty(name = "pulsar.issuerUrl")
    String issuerUrl;

    @ConfigProperty(name = "pulsar.credentials")
    String credentials;

    @ConfigProperty(name = "pulsar.audience")
    String audience;

    @Produces
    @Identifier("pulsar-auth")
    public ClientConfigurationData pulsarClientConfig() throws MalformedURLException {
        var data = new ClientConfigurationData();
        data.setAuthentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), PulsarAuth.class.getResource(credentials), audience));
        return data;
    }
}

これは、キーファイルがリソースとしてアプリケーションクラスパスに含まれていることを前提としています。この場合、設定は次のようになります。

mp.messaging.incoming.prices.client-configuration=pulsar-auth

pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.issuerUrl=https://auth.streamnative.cloud/
pulsar.audience=urn:sn:pulsar:o-rfwel:redhat
pulsar.credentials=/o-rfwel-quarkus-app.json

pulsar-auth で識別されるクライアント設定を使用するチャネルには、client-configuration 属性を設定する必要があることに注意してください。

10. ヘルスチェック

Quarkus エクステンションは、Pulsar コネクターによって管理される各チャネルの起動、準備状況、および稼働状況を報告します。 ヘルスチェックは、Pulsar クライアントに依存してブローカーとの接続が確立されていることを確認します。

着信チャネルと発信チャネルの Startup および Readiness プローブは、ブローカーとの接続が確立されると、OK を報告します。

着信チャネルと発信チャネルの Liveness プローブは、ブローカーとの接続が確立され、かつ 失敗が発見されなかった場合に、OK を報告します。

メッセージ処理が失敗すると、メッセージが 否定応答 されることに注意してください。その場合、メッセージは失敗ストラテジーによって処理されます。失敗ストラテジーは、失敗を報告し、liveness チェックの結果に影響を与えます。失敗ストラテジー `fail`は、失敗を報告するため、liveness チェックも失敗を報告します。

11. 設定リファレンス

以下は、Pulsar コネクターのチャネル、コンシューマー、プロデューサー、およびクライアントの設定属性のリストです。 Pulsar クライアントの設定方法の詳細は、Pulsar クライアントの設定 を参照してください。

11.1. 着信チャネル設定 (Pulsar からの受信)

以下の属性は以下のように設定します:

mp.messaging.incoming.your-channel-name.attribute=value
Table 1. Incoming Attributes of the 'smallrye-pulsar' connector
属性 (alias) 説明 タイプ 必須 デフォルト

ack-strategy

Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be ack, cumulative.

string

false

ack

ackTimeout.redeliveryBackoff

Comma separated values for configuring ack timeout MultiplierRedeliveryBackoff, min delay, max delay, multiplier.

string

false

batchReceive

Whether batch receive is used to consume messages

boolean

false

false

client-configuration

Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier.

string

false

consumer-configuration

Identifier of a CDI bean that provides the default Pulsar consumer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier.

string

false

deadLetterPolicy.deadLetterTopic

Name of the dead letter topic where the failing messages will be sent

string

false

deadLetterPolicy.initialSubscriptionName

Name of the initial subscription name of the dead letter topic

string

false

deadLetterPolicy.maxRedeliverCount

Maximum number of times that a message will be redelivered before being sent to the dead letter topic

int

false

deadLetterPolicy.retryLetterTopic

Name of the retry topic where the failing messages will be sent

string

false

failure-strategy

Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be nack (default), fail, ignore or `reconsume-later

string

false

nack

health-enabled

Whether health reporting is enabled (default) or disabled

boolean

false

true

negativeAck.redeliveryBackoff

Comma separated values for configuring negative ack MultiplierRedeliveryBackoff, min delay, max delay, multiplier.

string

false

reconsumeLater.delay

Default delay for reconsume failure-strategy, in seconds

long

false

3

schema

The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed Schema qualified with @Identifier and the channel name. As a fallback AUTO_CONSUME or AUTO_PRODUCE are used.

string

false

serviceUrl

The service URL for the Pulsar service

string

false

pulsar://localhost:6650

topic

The consumed / populated Pulsar topic. If not set, the channel name is used

string

false

tracing-enabled

Whether tracing is enabled (default) or disabled

boolean

false

true

基盤となる Pulsar コンシューマーがサポートするプロパティーを設定することもできます。

これらのプロパティーは、pulsar.consumer 接頭辞を使用してグローバルに設定することもできます。

pulsar.consumer.subscriptionInitialPosition=Earliest
Table 2. Pulsar consumer Attributes
属性 説明 タイプ 設定ファイル デフォルト

topicNames

トピック名

Set

true

[]

topicsPattern

Topic pattern

Pattern

true

subscriptionName

Subscription name

文字列

true

subscriptionType

Subscription type.
Four subscription types are available:
* Exclusive
* Failover
* Shared
* Key_Shared

SubscriptionType

true

Exclusive

subscriptionProperties

Map

true

subscriptionMode

SubscriptionMode

true

Durable

messageListener

MessageListener

false

consumerEventListener

ConsumerEventListener

false

negativeAckRedeliveryBackoff

Interface for custom message is negativeAcked policy. You can specify RedeliveryBackoff for a consumer.

RedeliveryBackoff

false

ackTimeoutRedeliveryBackoff

Interface for custom message is ackTimeout policy. You can specify RedeliveryBackoff for a consumer.

RedeliveryBackoff

false

receiverQueueSize

Size of a consumer’s receiver queue.
For example, the number of messages accumulated by a consumer before an application calls Receive.
A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.

int

true

1000

acknowledgementsGroupTimeMicros

Group a consumer acknowledgment for a specified time.
By default, a consumer uses 100ms grouping time to send out acknowledgments to a broker.
Setting a group time of 0 sends out acknowledgments immediately.
A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.

long

true

100000

maxAcknowledgmentGroupSize

Group a consumer acknowledgment for the number of messages.

int

true

1000

negativeAckRedeliveryDelayMicros

Delay to wait before redelivering messages that failed to be processed.
When an application uses Consumer#negativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.

long

true

60000000

maxTotalReceiverQueueSizeAcrossPartitions

The max total receiver queue size across partitions.
This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.

int

true

50000

consumerName

Consumer name

文字列

true

ackTimeoutMillis

Timeout of unacked messages

long

true

0

tickDurationMillis

Granularity of the ack-timeout redelivery.
Using an higher tickDurationMillis reduces the memory overhead to track messages when setting ack-timeout to a bigger value (for example, 1 hour).

long

true

1000

priorityLevel

Priority level for a consumer to which a broker gives more priority while dispatching messages in Shared subscription type.
The broker follows descending priorities. For example, 0=max-priority, 1, 2,…​
In Shared subscription type, the broker first dispatches messages to the max priority level consumers if they have permits. Otherwise, the broker considers next priority level consumers.
Example 1
If a subscription has consumerA with priorityLevel 0 and consumerB with priorityLevel 1, then the broker only dispatches messages to consumerA until it runs out permits and then starts dispatching messages to consumerB.
Example 2
Consumer Priority, Level, Permits
C1, 0, 2
C2, 0, 1
C3, 0, 1
C4, 1, 2
C5, 1, 1

Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.

int

true

0

maxPendingChunkedMessage

The maximum size of a queue holding pending chunked messages. When the threshold is reached, the consumer drops pending messages to optimize memory utilization.

int

true

10

autoAckOldestChunkedMessageOnQueueFull

Whether to automatically acknowledge pending chunked messages when the threshold of maxPendingChunkedMessage is reached. If set to false, these messages will be redelivered by their broker.

boolean

true

false

expireTimeOfIncompleteChunkedMessageMillis

The time interval to expire incomplete chunks if a consumer fails to receive all the chunks in the specified time period. The default value is 1 minute.

long

true

60000

cryptoKeyReader

CryptoKeyReader

false

messageCrypto

MessageCrypto

false

cryptoFailureAction

Consumer should take action when it receives a message that can not be decrypted.
* FAIL: this is the default option to fail messages until crypto succeeds.
* DISCARD:silently acknowledge and not deliver message to an application.
* CONSUME: deliver encrypted messages to applications. It is the application’s responsibility to decrypt the message.

The decompression of message fails.

If messages contain batch messages, a client is not be able to retrieve individual messages in batch.

Delivered encrypted message contains EncryptionContext which contains encryption and compression information in it using which application can decrypt consumed message payload.

ConsumerCryptoFailureAction

true

FAIL

properties

A name or value property of this consumer.

properties is application defined metadata attached to a consumer.

When getting a topic stats, associate this metadata with the consumer stats for easier identification.

SortedMap

true

{}

readCompacted

If enabling readCompacted, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.

A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.

Only enabling readCompacted on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).

Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException.

boolean

true

false

subscriptionInitialPosition

Initial position at which to set cursor when subscribing to a topic at first time.

SubscriptionInitialPosition

true

Latest

patternAutoDiscoveryPeriod

Topic auto discovery period when using a pattern for topic’s consumer.

The default and minimum value is 1 minute.

int

true

60

regexSubscriptionMode

When subscribing to a topic using a regular expression, you can pick a certain type of topics.

* PersistentOnly: only subscribe to persistent topics.
* NonPersistentOnly: only subscribe to non-persistent topics.
* AllTopics: subscribe to both persistent and non-persistent topics.

RegexSubscriptionMode

true

PersistentOnly

deadLetterPolicy

Dead letter policy for consumers.

By default, some messages are probably redelivered many times, even to the extent that it never stops.

By using the dead letter mechanism, messages have the max redelivery count. When exceeding the maximum number of redeliveries, messages are sent to the Dead Letter Topic and acknowledged automatically.

You can enable the dead letter mechanism by setting deadLetterPolicy.

When specifying the dead letter policy while not specifying ackTimeoutMillis, you can set the ack timeout to 30000 millisecond.

DeadLetterPolicy

true

retryEnable

boolean

true

false

batchReceivePolicy

BatchReceivePolicy

false

autoUpdatePartitions

If autoUpdatePartitions is enabled, a consumer subscribes to partition increasement automatically.

Note: this is only for partitioned consumers.

boolean

true

true

autoUpdatePartitionsIntervalSeconds

long

true

60

replicateSubscriptionState

If replicateSubscriptionState is enabled, a subscription state is replicated to geo-replicated clusters.

boolean

true

false

resetIncludeHead

boolean

true

false

keySharedPolicy

KeySharedPolicy

false

batchIndexAckEnabled

boolean

true

false

ackReceiptEnabled

boolean

true

false

poolMessages

boolean

true

false

payloadProcessor

MessagePayloadProcessor

false

startPaused

boolean

true

false

autoScaledReceiverQueueSizeEnabled

boolean

true

false

topicConfigurations

List

true

[]

11.2. 発信チャネルの設定 (Pulsar への公開)

Table 3. Outgoing Attributes of the 'smallrye-pulsar' connector
属性 (alias) 説明 タイプ 必須 デフォルト

client-configuration

Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier.

string

false

health-enabled

Whether health reporting is enabled (default) or disabled

boolean

false

true

maxPendingMessages

The maximum size of a queue holding pending messages, i.e messages waiting to receive an acknowledgment from a broker

int

false

1000

producer-configuration

Identifier of a CDI bean that provides the default Pulsar producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier.

string

false

schema

The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed Schema qualified with @Identifier and the channel name. As a fallback AUTO_CONSUME or AUTO_PRODUCE are used.

string

false

serviceUrl

The service URL for the Pulsar service

string

false

pulsar://localhost:6650

topic

The consumed / populated Pulsar topic. If not set, the channel name is used

string

false

tracing-enabled

Whether tracing is enabled (default) or disabled

boolean

false

true

waitForWriteCompletion

Whether the client waits for the broker to acknowledge the written record before acknowledging the message

boolean

false

true

基盤となる Pulsar プロデューサーがサポートするプロパティーを設定することもできます。

これらのプロパティーは、pulsar.producer 接頭辞を使用してグローバルに設定することもできます。

pulsar.producer.batchingEnabled=false
Table 4. Pulsar プロデューサー属性
属性 説明 タイプ 設定ファイル デフォルト

topicName

トピック名

文字列

true

producerName

Producer name

文字列

true

sendTimeoutMs

Message send timeout in ms.
If a message is not acknowledged by a server before the sendTimeout expires, an error occurs.

long

true

30000

blockIfQueueFull

If it is set to true, when the outgoing message queue is full, the Send and SendAsync methods of producer block, rather than failing and throwing errors.
If it is set to false, when the outgoing message queue is full, the Send and SendAsync methods of producer fail and ProducerQueueIsFullError exceptions occur.

The MaxPendingMessages parameter determines the size of the outgoing message queue.

boolean

true

false

maxPendingMessages

The maximum size of a queue holding pending messages.

For example, a message waiting to receive an acknowledgment from a broker.

By default, when the queue is full, all calls to the Send and SendAsync methods fail unless you set BlockIfQueueFull to true.

int

true

0

maxPendingMessagesAcrossPartitions

The maximum number of pending messages across partitions.

Use the setting to lower the max pending messages for each partition (#setMaxPendingMessages(int)) if the total number exceeds the configured value.

int

true

0

messageRoutingMode

Message routing logic for producers on partitioned topics.
Apply the logic only when setting no key on messages.
Available options are as follows:
* pulsar.RoundRobinDistribution: round robin
* pulsar.UseSinglePartition: publish all messages to a single partition
* pulsar.CustomPartition: a custom partitioning scheme

MessageRoutingMode

true

hashingScheme

Hashing function determining the partition where you publish a particular message (partitioned topics only).
Available options are as follows:
* pulsar.JavastringHash: the equivalent of string.hashCode() in Java
* pulsar.Murmur3_32Hash: applies the Murmur3 hashing function
* pulsar.BoostHash: applies the hashing function from C++'s Boost library

HashingScheme

true

JavaStringHash

cryptoFailureAction

Producer should take action when encryption fails.
* FAIL: if encryption fails, unencrypted messages fail to send.
* SEND: if encryption fails, unencrypted messages are sent.

ProducerCryptoFailureAction

true

FAIL

customMessageRouter

MessageRouter

false

batchingMaxPublishDelayMicros

Batching time period of sending messages.

long

true

1000

batchingPartitionSwitchFrequencyByPublishDelay

int

true

10

batchingMaxMessages

The maximum number of messages permitted in a batch.

int

true

1000

batchingMaxBytes

int

true

131072

batchingEnabled

Enable batching of messages.

boolean

true

true

batcherBuilder

BatcherBuilder

false

chunkingEnabled

Enable chunking of messages.

boolean

true

false

chunkMaxMessageSize

int

true

-1

cryptoKeyReader

CryptoKeyReader

false

messageCrypto

MessageCrypto

false

encryptionKeys

Set

true

[]

compressionType

Message data compression type used by a producer.
Available options:
* LZ4
* ZLIB
* ZSTD
* SNAPPY

CompressionType

true

NONE

initialSequenceId

Long

true

autoUpdatePartitions

boolean

true

true

autoUpdatePartitionsIntervalSeconds

long

true

60

multiSchema

boolean

true

true

accessMode

ProducerAccessMode

true

Shared

lazyStartPartitionedProducers

boolean

true

false

properties

SortedMap

true

{}

initialSubscriptionName

Use this configuration to automatically create an initial subscription when creating a topic. If this field is not set, the initial subscription is not created.

文字列

true

11.3. Pulsar クライアントの設定

以下は、基盤となる PulsarClient の設定リファレンスです。 これらのオプションは、チャネル属性を使用して設定できます。

mp.messaging.incoming.your-channel-name.numIoThreads=4

または、pulsar.client 接頭辞を使用してグローバルに設定します。

pulsar.client.serviceUrl=pulsar://pulsar:6650
Table 5. Pulsar client Attributes
属性 説明 タイプ 設定ファイル デフォルト

serviceUrl

Pulsar cluster HTTP URL to connect to a broker.

文字列

true

serviceUrlProvider

The implementation class of ServiceUrlProvider used to generate ServiceUrl.

ServiceUrlProvider

false

authentication

Authentication settings of the client.

認証

false

authPluginClassName

Class name of authentication plugin of the client.

文字列

true

authParams

Authentication parameter of the client.

文字列

true

authParamMap

Authentication map of the client.

Map

true

operationTimeoutMs

Client operation timeout (in milliseconds).

long

true

30000

lookupTimeoutMs

Client lookup timeout (in milliseconds).

long

true

-1

statsIntervalSeconds

Interval to print client stats (in seconds).

long

true

60

numIoThreads

IOスレッド数。

int

true

10

numListenerThreads

Number of consumer listener threads.

int

true

10

connectionsPerBroker

Number of connections established between the client and each Broker. A value of 0 means to disable connection pooling.

int

true

1

connectionMaxIdleSeconds

Release the connection if it is not used for more than [connectionMaxIdleSeconds] seconds. If [connectionMaxIdleSeconds] < 0, disabled the feature that auto release the idle connections

int

true

180

useTcpNoDelay

Whether to use TCP NoDelay option.

boolean

true

true

useTls

Whether to use TLS.

boolean

true

false

tlsKeyFilePath

Path to the TLS key file.

文字列

true

tlsCertificateFilePath

Path to the TLS certificate file.

文字列

true

tlsTrustCertsFilePath

Path to the trusted TLS certificate file.

文字列

true

tlsAllowInsecureConnection

Whether the client accepts untrusted TLS certificates from the broker.

boolean

true

false

tlsHostnameVerificationEnable

Whether the hostname is validated when the client creates a TLS connection with brokers.

boolean

true

false

concurrentLookupRequest

The number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum prevents overloading a broker.

int

true

5000

maxLookupRequest

Maximum number of lookup requests allowed on each broker connection to prevent overloading a broker.

int

true

50000

maxLookupRedirects

Maximum times of redirected lookup requests.

int

true

20

maxNumberOfRejectedRequestPerConnection

Maximum number of rejected requests of a broker in a certain time frame (60 seconds) after the current connection is closed and the client creating a new connection to connect to a different broker.

int

true

50

keepAliveIntervalSeconds

Seconds of keeping alive interval for each client broker connection.

int

true

30

connectionTimeoutMs

Duration of waiting for a connection to a broker to be established.If the duration passes without a response from a broker, the connection attempt is dropped.

int

true

10000

requestTimeoutMs

Maximum duration for completing a request.

int

true

60000

readTimeoutMs

Maximum read time of a request.

int

true

60000

autoCertRefreshSeconds

Seconds of auto refreshing certificate.

int

true

300

initialBackoffIntervalNanos

Initial backoff interval (in nanosecond).

long

true

100000000

maxBackoffIntervalNanos

Max backoff interval (in nanosecond).

long

true

60000000000

enableBusyWait

Whether to enable BusyWait for EpollEventLoopGroup.

boolean

true

false

listenerName

Listener name for lookup. Clients can use listenerName to choose one of the listeners as the service URL to create a connection to the broker as long as the network is accessible."advertisedListeners" must enabled in broker side.

文字列

true

useKeyStoreTls

Set TLS using KeyStore way.

boolean

true

false

sslProvider

The TLS provider used by an internal client to authenticate with other Pulsar brokers.

文字列

true

tlsKeyStoreType

TLS KeyStore type configuration.

文字列

true

JKS

tlsKeyStorePath

Path of TLS KeyStore.

文字列

true

tlsKeyStorePassword

Password of TLS KeyStore.

文字列

true

tlsTrustStoreType

TLS TrustStore type configuration. You need to set this configuration when client authentication is required.

文字列

true

JKS

tlsTrustStorePath

Path of TLS TrustStore.

文字列

true

tlsTrustStorePassword

Password of TLS TrustStore.

文字列

true

tlsCiphers

Set of TLS Ciphers.

Set

true

[]

tlsProtocols

Protocols of TLS.

Set

true

[]

memoryLimitBytes

Limit of client memory usage (in byte). The 64M default can guarantee a high producer throughput.

long

true

67108864

proxyServiceUrl

URL of proxy service. proxyServiceUrl and proxyProtocol must be mutually inclusive.

文字列

true

proxyProtocol

Protocol of proxy service. proxyServiceUrl and proxyProtocol must be mutually inclusive.

ProxyProtocol

true

enableTransaction

Whether to enable transaction.

boolean

true

false

clock

Clock

false

dnsLookupBindAddress

The Pulsar client dns lookup bind address, default behavior is bind on 0.0.0.0

文字列

true

dnsLookupBindPort

The Pulsar client dns lookup bind port, takes effect when dnsLookupBindAddress is configured, default value is 0.

int

true

0

socks5ProxyAddress

Address of SOCKS5 proxy.

InetSocketAddress

true

socks5ProxyUsername

User name of SOCKS5 proxy.

文字列

true

socks5ProxyPassword

Password of SOCKS5 proxy.

文字列

true

description

The extra description of the client version. The length cannot exceed 64.

文字列

true

設定ファイルで設定できない (シリアライズできない) 設定プロパティーは、Config file 列にその旨が記載されています。

12. さらに詳しく

このガイドでは、Quarkus を使用して Pulsar と対話する方法を説明しました。 Quarkus は、Quarkus Messaging を利用してデータストリーミングアプリケーションを構築します。

さらに詳しく知りたい場合は、Quarkusで使用されている実装、 SmallRye Reactive Messaging のドキュメントを確認してください。

関連コンテンツ