The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.
このページを編集

Apache Pulsar リファレンスガイド

このリファレンスガイドでは、Quarkus アプリケーションが Quarkus Messaging を利用して Apache Pulsar と対話する仕組みを説明します。

1. はじめに

Apache Pulsar は、クラウド向けに構築されたオープンソースの分散メッセージングおよびストリーミングプラットフォームです。 階層型ストレージ機能により、マルチテナントの高性能ソリューションをサーバーメッセージングに提供します。

Pulsar はパブリッシュ - サブスクライブパターンを実装します。

  • プロデューサーは トピック にメッセージを公開します。

  • コンシューマーは、そのトピックへの サブスクリプション を作成し、着信メッセージを受信して処理します。処理が完了すると、ブローカーに 完了通知 を送信します。

  • サブスクリプションが作成されると、コンシューマーが切断されても、Pulsar はすべてのメッセージを保持します。 保持されたメッセージは、これらのメッセージがすべて正常に処理されたことをコンシューマーが確認した場合にのみ破棄されます。

Pulsar クラスターは、以下の要素で構成されます。

  • ステートレスコンポーネントである 1 つ以上のブローカー。

  • トピックのメタデータ、スキーマ、調整、クラスター設定を維持するための メタデータストア

  • メッセージの永続的な保存に使用される bookies のセット。

2. Apache Pulsar のための Quarkus エクステンション

Quarkus は、 SmallRye Reactive Messaging フレームワークを通じて Apache Pulsar のサポートを提供します。 Eclipse MicroProfile Reactive Messaging 仕様 3.0 に基づいて、CDI とイベント駆動型を橋渡しする柔軟なプログラミングモデルを提案します。

このガイドでは、Apache Pulsar と SmallRye Reactive Messaging フレームワークについて詳しく説明します。 クイックスタートについては、Apache Pulsar を使用した Quarkus Messaging の概要 を参照してください。

プロジェクトのベースディレクトリーで次のコマンドを実行すると、 messaging-pulsar エクステンションをプロジェクトに追加できます。

コマンドラインインタフェース
quarkus extension add messaging-pulsar
Maven
./mvnw quarkus:add-extension -Dextensions='messaging-pulsar'
Gradle
./gradlew addExtension --extensions='messaging-pulsar'

これにより、ビルドファイルに次の内容が追加されます。

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-pulsar</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-pulsar")

このエクステンションには、 pulsar-clients-original バージョン 3.0.0 が推移的依存関係として含まれており、Pulsar ブローカーバージョン 2.10.x との互換性があります。

3. SmallRye Pulsar コネクターの設定

SmallRye Reactive Messaging フレームワークは、Apache Kafka、Apache Pulsar、AMQP、Apache Camel、JMS、MQTT などのさまざまなメッセージングバックエンドをサポートしているため、汎用的な語彙を使用しています。

  • アプリケーションは メッセージ を送受信します。 Message は、ペイロード をラップし、いくつかの メタデータ で拡張できます。 これは、値とキーで構成される Pulsar Message と混同しないでください。 Pulsar コネクターでは、Reactive Messaging メッセージ が Pulsar メッセージ に対応します。

  • メッセージは チャネル を通過します。アプリケーションコンポーネントは、チャネルに接続してメッセージを公開および消費します。Pulsar コネクターは、チャネル を Pulsar トピック にマッピングします。

  • チャネルは コネクター を使用してメッセージバックエンドに接続します。 コネクターは、着信メッセージを特定のチャネル (アプリケーションが消費するチャネル) にマッピングし、特定のチャネルに送信された発信メッセージを収集するように設定されています。 各コネクターは、特定のメッセージングテクノロジーに特化しています。 たとえば、Pulsar を処理するコネクターの名前は、 smallrye-pulsar となっています。

着信チャネルを持つ Pulsar コネクターの最小設定は次のようになります。

%prod.pulsar.client.serviceUrl=pulsar:6650 (1)
mp.messaging.incoming.prices.connector=smallrye-pulsar (2)
1 プロダクションプロファイルの Pulsar ブローカーサービス URL を設定します。 この URL は、 mp.messaging.incoming.$channel.serviceUrl プロパティーを使用して、グローバルに、またはチャネルごとに設定できます。 開発モードおよびテスト実行時には、Dev Services for Pulsar が Pulsar ブローカーを自動的に起動します。
2 prices チャネルを管理するためのコネクターを設定します。 デフォルトでは、トピック 名はチャネル名と同じです。

トピック名は、トピック属性を設定することで上書きできます。

%prod 接頭辞は、アプリケーションが本番モードで実行される場合にのみプロパティーが使用されることを示します (つまり、開発モードまたはテストモードでは使用されません)。詳細は、プロファイルに関するドキュメント を参照してください。
コネクターの自動アタッチ

クラスパスに単一のコネクターがある場合は、 connector 属性の設定を省略できます。 Quarkus は、孤立した チャネルをクラスパスにある (一意の) コネクターに自動的に関連付けます。 孤立した チャネルとは、ダウンストリームコンシューマーのない発信チャネル、またはアップストリームプロデューサーのない着信チャネルです。

この自動アタッチは、以下を使用して無効にできます。

quarkus.messaging.auto-connector-attachment=false

設定オプションの詳細は、Pulsar クライアントの設定 を参照してください。

4. Pulsar からのメッセージの受信

Pulsar コネクターは、Pulsar クライアントを使用して Pulsar ブローカーに接続し、コンシューマーを作成して Pulsar ブローカーからメッセージを受信します。また、各 Pulsar Message を Reactive Messaging Message にマッピングします。

4.1. 例

Pulsar ブローカーが実行中で、 pulsar:6650 アドレスを使用してアクセスできるとします。 次のようにして、 prices チャネルで Pulsar メッセージを受信するようにアプリケーションを設定します。

mp.messaging.incoming.prices.serviceUrl=pulsar://pulsar:6650 (1)
mp.messaging.incoming.prices.subscriptionInitialPosition=Earliest (2)
  1. Pulsar ブローカーサービスの URL を設定します。

  2. コンシューマーのサブスクリプションが Earliest 位置からメッセージの受信を開始することを確認します。

Pulsar トピックやコンシューマー名を設定する必要はありません。 デフォルトでは、コネクターはチャネル名 (prices) を使用します。 トピックとコンシューマー名は、 topic 属性と consumerName 属性を設定してオーバーライドすることができます。

Pulsar では、コンシューマーがトピックサブスクリプションの subscriptionName を指定する必要があります。 指定されていない場合、コネクターが一意の サブスクリプション名 を生成します。

すると、アプリケーションが double ペイロードを直接受信できるようになります。

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

または、Reactive Messaging タイプ Message<Double> を取得することもできます。

@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
    // access record metadata
    var metadata = msg.getMetadata(PulsarIncomingMessageMetadata.class).orElseThrow();
    // process the message payload.
    double price = msg.getPayload();
    // Acknowledge the incoming message (acknowledge the Pulsar message back to the broker)
    return msg.ack();
}

Reactive Messaging の Message タイプを使用すると、コンシューマーメソッドが着信メッセージのメタデータにアクセスし、確認応答を手動で処理できます。

Pulsar メッセージオブジェクトに直接アクセスする場合は、次を使用します。

@Incoming("prices")
public void consume(org.apache.pulsar.client.api.Message<Double> msg) {
    String key = msg.getKey();
    String value = msg.getValue();
    String topic = msg.topicName();
    // ...
}

org.apache.pulsar.client.api.Message は、基盤となる Pulsar クライアントによって提供され、コンシューマーメソッドで直接使用できます。

または、次の例のように、アプリケーションで、チャネル名によって識別される Multi を Bean に注入し、そのイベントをサブスクライブすることもできます。

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }
}

@Channel でメッセージを消費する場合、アプリケーションコードがサブスクリプションを行います。 上記の例では、Quarkus REST (旧称 RESTEasy Reactive) エンドポイントがサブスクリプションを処理します。

チャネルとして注入できるのは、以下のタイプです。

@Inject @Channel("prices") Multi<Double> streamOfPayloads;

@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;

@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;

@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

前出の Message の例と同様に、注入されたチャネルがペイロード (Multi<T>) を受信した場合は、そのチャネルが自動的にメッセージを確認応答し、複数のサブスクライバーをサポートします。 注入されたチャネルが Message (Multi<Message<T>>) を受信した場合は、手動で確認応答とブロードキャストを行う必要があります。

4.2. ブロッキング処理

リアクティブメッセージングは、I/O スレッドでメソッドを呼び出します。このトピックの詳細については、 Quarkus リアクティブアーキテクチャーのドキュメント を参照してください。ただし、多くの場合、リアクティブメッセージングとデータベースインタラクションなどのブロック処理を組み合わせる必要があります。このためには、処理が ブロッキング であり、呼び出し元のスレッドで実行するべきではないことを示す @Blocking アノテーションを使用する必要があります。

例えば、以下のコードは、Hibernate with Panacheを 使用してデータベースに受信ペイロードを格納する方法を示しています。

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

@Blocking アノテーションは 2 つあります。

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

効果はどちらも同じです。したがって、両方を使うことができます。1番目のものは、使用するワーカープールや順序を保持するかどうかなど、より細かい調整が可能です。2 番目のものは、Quarkus の他のリアクティブ機能でも使用され、デフォルトのワーカープールを使用し、順序を保持します。

@Blocking アノテーションの使用法の詳細については、 SmallRye Reactive Messaging – Handling blocking execution を参照してください。

@RunOnVirtualThread

Java 仮想スレッドでの ブロッキング処理の実行については、 Quarkus Virtual Thread support with Reactive Messagingのドキュメント を参照してください。

@Transactional

メソッドに @Transactional アノテーションが付けられている場合、メソッドに @Blocking アノテーションが付けられていなくても、自動的に blocking と見なされます。

4.3. Pulsar のサブスクリプションタイプ

Pulsar の subscriptionType コンシューマー設定は、パブリッシュ - サブスクライブやキューイングなど、さまざまなメッセージングシナリオを実現するために柔軟に使用できます。

  • Exclusive サブスクリプションタイプ使用すると、"ファンアウト型の pub/sub メッセージング" 用に 一意のサブスクリプション名 を指定できます。これがデフォルトのサブスクリプションタイプです。

  • SharedKey_Shared、または Failover サブスクリプションタイプを使用すると、複数のコンシューマーで 同じサブスクリプション名 を共有して、コンシューマー間で "メッセージキューイング" を実現できます。

サブスクリプション名が指定されていない場合、Quarkus は一意の ID を生成します。

4.4. デシリアライゼーションと Pulsar Schema

Pulsar コネクターを使用すると、基盤となる Pulsar コンシューマーのスキーマ設定を設定できます。 詳細は、Pulsar Schema 設定と自動スキーマ検出 を参照してください。

4.5. 確認応答ストラテジー

Pulsar メッセージから生成されたメッセージが 確認応答 されると、コネクターが 確認応答リクエスト を Pulsar ブローカーに送信します。 すべての Reactive Messaging メッセージが 確認応答 される必要があります。ほとんどの場合、これは自動的に処理されます。 確認応答リクエストは、次の 2 つのストラテジーを使用して Pulsar ブローカーに送信できます。

  • 個別確認応答 は、デフォルトのストラテジーです。確認応答リクエストが各メッセージに対してブローカーに送信されます。

  • 累積確認応答 は、 ack-strategy=cumulative を使用して設定されるストラテジーです。コンシューマーが受信した最後のメッセージのみを確認します。 提供されたメッセージまでのストリーム内の全メッセージは、そのコンシューマーに再配信されません。

デフォルトでは、Pulsar コンシューマーは、ブローカーからの確認応答の確認を待たずに確認応答を検証します。 ackReceiptEnabled=true を使用してこれを有効にできます。

4.6. 失敗処理ストラテジー

Pulsar メッセージから生成されたメッセージが 否定応答 された場合、失敗ストラテジーが適用されます。 Quarkus Pulsar エクステンションは、4 つのストラテジーをサポートしています。

  • nack (デフォルト) は、ブローカーに 否定応答 を送信し、ブローカーがこのメッセージをコンシューマーに再配信するようにトリガーします。 否定応答は、 negativeAckRedeliveryDelayMicros プロパティーと negativeAck.redeliveryBackoff プロパティーを使用してさらに設定できます。

  • fail の場合、アプリケーションが失敗します。メッセージはそれ以上処理されません。

  • ignore の場合、失敗がログに記録されますが、確認応答ストラテジーが適用され、処理が続行されます。

  • continue の場合、失敗がログに記録されますが、確認応答や否定応答を適用せずに処理が続行されます。このストラテジーは、確認応答タイムアウト 設定とともに使用できます。

  • reconsume-later は、 reconsumeLater API を使用して、遅延して再消費されるメッセージを 再試行レタートピック に送信します。 遅延は reconsumeLater.delay プロパティーを使用して設定でき、デフォルトは 3 秒です。 失敗メタデータに io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata のインスタンスを追加することで、メッセージごとにカスタムの遅延またはプロパティーを設定できます。

4.6.1. 確認応答タイムアウト

Pulsar クライアントは、否定応答と同様に、 確認応答タイムアウト メカニズムにより、指定された ackTimeout 期間にわたって、確認応答されていないメッセージを追跡し、ブローカーに 確認応答されていないメッセージの再配信リクエスト を送信します。これを受けて、ブローカーは確認応答されていないメッセージをコンシューマーに再送信します。

タイムアウトと再配信バックオフメカニズムを設定するには、 ackTimeoutMillis プロパティーと ackTimeout.redeliveryBackoff プロパティーを設定します。 ackTimeout.redeliveryBackoff には、最小遅延 (ミリ秒単位)、最大遅延 (ミリ秒単位)、および乗数をコンマ区切り値として指定できます。

mp.messaging.incoming.out.failure-strategy=continue
mp.messaging.incoming.out.ackTimeoutMillis=10000
mp.messaging.incoming.out.ackTimeout.redeliveryBackoff=1000,60000,2

4.6.2. 後の再消費と再試行レタートピック

再試行レタートピック は、正常に消費されなかったメッセージをデッドレタートピックにプッシュし、メッセージの消費を継続します。 デッドレタートピックは、確認応答タイムアウト、否定応答、再試行レタートピックなどのさまざまなメッセージ再配信方法で使用できることに注意してください。

mp.messaging.incoming.data.failure-strategy=reconsume-later
mp.messaging.incoming.data.reconsumeLater.delay=5000
mp.messaging.incoming.data.retryEnable=true
mp.messaging.incoming.data.negativeAck.redeliveryBackoff=1000,60000,2

4.6.3. デッドレタートピック

デッドレタートピック は、正常に消費されなかったメッセージをデッドレタートピックにプッシュし、メッセージの消費を継続します。 デッドレタートピックは、確認応答タイムアウト、否定応答、再試行レタートピックなどのさまざまなメッセージ再配信方法で使用できることに注意してください。

mp.messaging.incoming.data.failure-strategy=nack
mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount=2
mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic
mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName=my-dlq-subscription
mp.messaging.incoming.data.subscriptionType=Shared

否定応答 または 確認応答タイムアウト 再配信方法では、少なくとも未処理のメッセージを含むメッセージのバッチ全体が再配信されます。 詳細は、プロデューサーのバッチ処理 を参照してください。

4.7. Pulsar メッセージの一括受信

デフォルトでは、incoming メソッドは各 Pulsar メッセージを個別に受信します。 batchReceive=true プロパティーを使用するか、コンシューマー設定で batchReceivePolicy を指定することで、バッチモードを有効にすることができます。

@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<org.apache.pulsar.client.api.Messages<Double>> messages) {
    for (org.apache.pulsar.client.api.Message<Double> msg : messages.getPayload()) {
        String key = msg.getKey();
        String topic = msg.getTopicName();
        long timestamp = msg.getEventTime();
        //... process messages
    }
    // ack will commit the latest offsets (per partition) of the batch.
    return messages.ack();
}

@Incoming("prices")
public void consumeRecords(org.apache.pulsar.client.api.Messages<Double> messages) {
    for (org.apache.pulsar.client.api.Message<Double> msg : messages) {
        //... process messages
    }
}

または、consume メソッドにペイロードのリストを直接受信することもできます。

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

Quarkus は、着信チャネルのバッチタイプを自動検出し、バッチ設定を自動的に設定します。 mp.messaging.incoming.$channel.batchReceive プロパティーを使用して、バッチモードを明示的に設定できます。

5. Pulsar へのメッセージの送信

Pulsar Connectorはリアクティブ・メッセージング Message をPulsarメッセージとして書き込むことができます。

5.1. 例

Pulsar ブローカーが実行中で、 pulsar:6650 アドレスを使用してアクセスできるとします。 次のように、 prices チャネルからのメッセージを Pulsar メッセージに書き込むようにアプリケーションを設定します。

mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650 (1)
  1. Pulsar ブローカーサービスの URL を設定します。

Pulsar トピックやプロデューサー名を設定する必要はありません。 デフォルトでは、コネクターはチャネル名 (prices) を使用します。 トピックとコンシューマー名は、 topic 属性と producerName 属性を設定してオーバーライドすることができます。

次に、アプリケーションは、 Message<Double>prices チャネルに送信する必要があります。次のスニペットのように、 double ペイロードを使用できます。

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class PulsarPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

generate メソッドは、 Multi<Double> を返します。これは、 Flow.Publisher インターフェイスを実装します。 このパブリッシャーは、メッセージを生成し、設定された Pulsar トピックに送信するために、フレームワークによって使用されます。

ペイロードを返す代わりに、 io.smallrye.reactive.messaging.pulsar.OutgoingMessage を返して、Pulsar メッセージを送信することもできます。

@Outgoing("out")
public Multi<OutgoingMessage<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> OutgoingMessage.of("my-key", random.nextDouble()));
}

ペイロードを org.eclipse.microprofile.reactive.messaging.Message 内にラップして、書き込まれたレコードをより詳細に制御できます。

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(PulsarOutgoingMessageMetadata.builder()
                            .withKey("my-key")
                            .withProperties(Map.of("property-key", "value"))
                            .build()));
}

Messages を送信するときに io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata のインスタンスを追加して、メッセージが Pulsar に書き込まれる方法に影響を与えることができます。

発信メソッドは、 Flow.Publisher を返すメソッドシグネチャーの他に、単一のメッセージを返すこともできます。 この場合、プロデューサーがこのメソッドをジェネレーターとして使用して無限のストリームを作成します。

@Outgoing("prices-out") T generate(); // T excluding void

@Outgoing("prices-out") Message<T> generate();

@Outgoing("prices-out") Uni<T> generate();

@Outgoing("prices-out") Uni<Message<T>> generate();

@Outgoing("prices-out") CompletionStage<T> generate();

@Outgoing("prices-out") CompletionStage<Message<T>> generate();

5.2. シリアライゼーションと Pulsar Schema

Pulsar コネクターを使用すると、基盤となる Pulsar プロデューサーのスキーマ設定を設定できます。 詳細は、Pulsar Schema 設定と自動スキーマ検出 を参照してください。

5.3. キー/値のペアの送信

キー/値のペアを Pulsar に送信するには、Pulsar プロデューサースキーマを KeyValue スキーマを使用して設定できます。

package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class PulsarKeyValueExample {

    @Identifier("out")
    @Produces
    Schema<KeyValue<String, Long>> schema = Schema.KeyValue(Schema.STRING, Schema.INT64);

    @Incoming("in")
    @Outgoing("out")
    public KeyValue<String, Long> process(long in) {
        return new KeyValue<>("my-key", in);
    }

}

書き込まれたレコードをさらに制御する必要がある場合は、 PulsarOutgoingMessageMetadata を使用します。

5.4. 確認応答

Pulsar ブローカーは、プロデューサーからメッセージを受信すると、メッセージに MessageId を割り当ててプロデューサーに送り返し、メッセージが公開されたことを確認します。

デフォルトでは、コネクターは Pulsar がレコードを確認応答するのを待ってから、処理 (受信した Message の確認応答) を続行します。 これを無効にするには、 waitForWriteCompletion 属性を false に設定します。

レコードを書き込めない場合、メッセージは nacked になります。

Pulsar クライアントは、失敗した場合に、 送信タイムアウト に達するまで、自動的にメッセージの送信を再試行します。 送信タイムアウト は、 sendTimeoutMs 属性で設定可能で、デフォルトでは 30 秒です。

5.5. バックプレッシャーとインフライトレコード

Pulsar 発信コネクターは、バックプレッシャーを処理し、Pulsar ブローカーへの書き込みを待機している保留メッセージの数を監視します。 保留メッセージの数は、 maxPendingMessages 属性を使用して設定され、デフォルトは 1000 です。

コネクターは、その数のメッセージのみを同時に送信します。 少なくとも 1 つの保留メッセージがブローカーによって確認応答されるまで、他のメッセージは送信されません。 次に、ブローカーの保留メッセージの 1 つが確認応答されると、コネクターが Pulsar に新しいメッセージを書き込みます。

また、 maxPendingMessages0 に設定することで、保留メッセージの制限を解除することもできます。 Pulsar では、 maxPendingMessagesAcrossPartitions を使用して、パーティションごとの保留メッセージの数を設定することもできます。

5.6. プロデューサーのバッチ処理

デフォルトでは、Pulsar プロデューサーは個々のメッセージをまとめてバッチ処理し、ブローカーに公開します。 バッチ処理のパラメーターは、 batchingMaxPublishDelayMicrosbatchingPartitionSwitchFrequencyByPublishDelaybatchingMaxMessagesbatchingMaxBytes 設定プロパティーを使用して設定できます。 batchingEnabled=false で完全に無効にすることもできます。

Key_Shared コンシューマーサブスクリプションを使用する場合、 batcherBuilderBatcherBuilder.KEY_BASED に設定できます。

6. Pulsar トランザクションと exactly-once 処理

Pulsar トランザクション を使用すると、イベントストリーミングアプリケーションが 1 つのアトミック操作でメッセージを消費、処理、生成できます。

トランザクションを使用すると、1 つまたは複数のプロデューサーが複数のトピックにメッセージのバッチを送信できるようになります。バッチ内のすべてのメッセージが、最終的にすべてのコンシューマーにみえるか、またはコンシューマーに一切みえなくなります。

使用するには、ブローカー設定で transactionCoordinatorEnabled=true および systemTopicEnabled=true を使用して、トランザクションのサポートを有効にする必要があります。

クライアント側でも、 PulsarClient 設定でトランザクションのサポートを有効にする必要があります。

mp.messaging.outgoing.tx-producer.enableTransaction=true

Pulsar コネクターは、トランザクション内にレコードを書き込むための PulsarTransactions カスタムエミッターを提供します。

これは、通常のエミッター @Channel として使用できます。

package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.OutgoingMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;

@ApplicationScoped
public class PulsarTransactionalProducer {

    @Inject
    @Channel("tx-out-example")
    PulsarTransactions<OutgoingMessage<Integer>> txProducer;

    @Inject
    @Channel("other-producer")
    PulsarTransactions<String> producer;

    @Incoming("in")
    public Uni<Void> emitInTransaction(Message<Integer> in) {
        return txProducer.withTransaction(emitter -> {
            emitter.send(OutgoingMessage.of("a", 1));
            emitter.send(OutgoingMessage.of("b", 2));
            emitter.send(OutgoingMessage.of("c", 3));
            producer.send(emitter, "4");
            producer.send(emitter, "5");
            producer.send(emitter, "6");
            return Uni.createFrom().completionStage(in::ack);
        });
    }

}

withTransaction メソッドに渡される関数は、レコードを生成する TransactionalEmitter を受け取り、トランザクションの結果を提供する Uni を返します。 処理が正常に完了すると、プロデューサーがフラッシュされ、トランザクションがコミットされます。 処理中に例外が発生したり、失敗した Uni が返されたり、 TransactionalEmitter が中止の対象としてマークされたりすると、トランザクションは中止されます。

複数のトランザクションプロデューサーが 1 つのトランザクションに参加できます。 これにより、すべてのメッセージが開始したトランザクションを使用して確実に送信されます。また、トランザクションがコミットされる前に、参加しているすべてのプロデューサーがフラッシュされます。

このメソッドが Vert.x コンテキストで呼び出されると、処理関数もそのコンテキストで呼び出されます。 それ以外の場合は、プロデューサーの送信スレッドで呼び出されます。

6.1. Exactly-Once 処理

Pulsar Transactions API を使用すると、生成されたメッセージとともに、トランザクション内のコンシューマーオフセットを管理することもできます。 これにより、コンシューマーとトランザクションプロデューサーを consume-transform-produce パターンでカップリングすることができます。これは exactly-once 処理としても知られています。 これは、アプリケーションがメッセージを消費および処理し、結果をトピックに公開して、消費されたメッセージのオフセットをトランザクションでコミットすることを意味します。

PulsarTransactions エミッターは、トランザクション内の着信 Pulsar メッセージに exactly-once 処理を適用する方法も提供します。

次の例には、トランザクション内の Pulsar メッセージのバッチが含まれています。

mp.messaging.outgoing.tx-out-example.enableTransaction=true
# ...
mp.messaging.incoming.in-channel.enableTransaction=true
mp.messaging.incoming.in-channel.batchReceive=true
package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.apache.pulsar.client.api.Messages;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;

    @ApplicationScoped
    public class PulsarExactlyOnceProcessor {

        @Inject
        @Channel("tx-out-example")
        PulsarTransactions<Integer> txProducer;

        @Incoming("in-channel")
        public Uni<Void> emitInTransaction(Message<Messages<Integer>> batch) {
            return txProducer.withTransactionAndAck(batch, emitter -> {
                for (org.apache.pulsar.client.api.Message<Integer> record : batch.getPayload()) {
                    emitter.send(PulsarMessage.of(record.getValue() + 1, record.getKey()));
                }
                return Uni.createFrom().voidItem();
            });
        }

    }

処理が正常に完了すると、トランザクション内でメッセージが確認応答され、トランザクションがコミットされます。

exactly-once を使用する場合、累積的にではなく、個別にのみメッセージを確認応答できます。

処理を中止する必要がある場合、メッセージは否定応答されます。処理を再試行したり、フェイルストップしたりするために、いずれかの失敗ストラテジーを使用することができます。 トランザクションが失敗して中止された場合、 withTransaction から返される Uni は失敗することに注意してください。

アプリケーションでエラーケースを処理することもできます。しかし、メッセージの消費を続行するには、 @Incoming メソッドから返される Uni が失敗しないようにする必要があります。 PulsarTransactions#withTransactionAndAck メソッドは、メッセージの確認応答と否定応答を行いますが、リアクティブストリームを停止しません。 失敗を無視すると、最後にコミットされたオフセットにコンシューマーがリセットされ、そこから処理が再開されます。

失敗発生時に重複を回避するために、ブローカー側でメッセージの重複排除とバッチインデックスレベルの確認応答を有効にすることを推奨します。

quarkus.pulsar.devservices.broker-config.brokerDeduplicationEnabled=true
quarkus.pulsar.devservices.broker-config.brokerDeduplicationEntriesInterval=1000
quarkus.pulsar.devservices.broker-config.brokerDeduplicationSnapshotIntervalSeconds=3000
quarkus.pulsar.devservices.broker-config.acknowledgmentAtBatchIndexLevelEnabled=3000

mp.messaging.incoming.data.batchIndexAckEnabled=true

7. Pulsar Schema 設定と自動スキーマ検出

Pulsar メッセージは、ペイロードとともに非構造化バイト配列として保存されます。 Pulsar スキーマ は、構造化データを raw メッセージバイトにシリアライズする方法を定義します。 スキーマ は、強制的なデータ構造での書き込みと読み取りを行うために、プロデューサーとコンシューマーに適用されます。 スキーマは、データがトピックに公開される前にデータを raw バイトにシリアライズします。また、raw バイトがコンシューマーに配信される前に raw バイトをデシリアライズします。

Pulsar は、登録されたスキーマ情報を保存する中央リポジトリーとしてスキーマレジストリーを使用します。 これにより、プロデューサー/コンシューマーがブローカーを通じてトピックのメッセージのスキーマを調整できるようになります。 デフォルトでは、スキーマの保存に Apache BookKeeper が使用されます。

Pulsar API は、多数の プリミティブ型 および complex 型 (Key/Value、Avro、Protobuf など) に関する組み込みのスキーマ情報を提供します。

Pulsar コネクターでは、 schema プロパティーを使用してスキーマをプリミティブ型として指定できます。

mp.messaging.incoming.prices.connector=smallrye-pulsar
mp.messaging.incoming.prices.schema=INT32

mp.messaging.outgoing.prices-out.connector=smallrye-pulsar
mp.messaging.outgoing.prices-out.schema=DOUBLE

schema プロパティーの値が スキーマタイプ と一致する場合、そのタイプを使用した単純なスキーマが作成され、該当するチャネルに使用されます。

Pulsar コネクターを使用すると、 @Identifier 修飾子で識別される Schema Bean を CDI を通じて提供することで、複雑なスキーマタイプを設定できます。

たとえば、次の Bean は JSON スキーマとキー/値スキーマを提供します。

package pulsar.configuration;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class PulsarSchemaProvider {

    @Produces
    @Identifier("user-schema")
    Schema<User> userSchema = Schema.JSON(User.class);

    @Produces
    @Identifier("a-channel")
    Schema<KeyValue<Integer, User>> keyValueSchema() {
        return Schema.KeyValue(Schema.INT32, Schema.JSON(User.class), KeyValueEncodingType.SEPARATED);
    }

    public static class User {
        String name;
        int age;

    }
}

定義されたスキーマを使用して着信チャネル users を設定するには、 schema プロパティーをスキーマ user-schema の識別子に設定する必要があります。

mp.messaging.incoming.users.connector=smallrye-pulsar
mp.messaging.incoming.users.schema=user-schema

schema プロパティーが見つからない場合、コネクターはチャネル名で識別される Schema Bean を検索します。 たとえば、発信チャネル a-channel はキー/値スキーマを使用します。

mp.messaging.outgoing.a-channel.connector=smallrye-pulsar

スキーマ情報が提供されていない場合、着信チャネルは Schema.AUTO_CONSUME() を使用します。一方、発信チャネルは Schema.AUTO_PRODUCE_BYTES() スキーマを使用します。

7.1. 自動スキーマ検出

Quarkus Messaging Pulsar (io.quarkus:quarkus-messaging-pulsar) を使用すると、Quarkus は多くの場合、設定する正しい Pulsar Schema を自動的に検出できます。 この自動検出は、 @Incoming メソッドと @Outgoing メソッドの宣言、および注入された @Channel に基づいています。

たとえば、以下のように宣言した場合

@Outgoing("generated-price")
public Multi<Integer> generate() {
    ...
}

このとき、設定において generated-price チャネルで smallrye-pulsar コネクターを使用するよう指定されていると、Quarkus は generated-price チャネルの schema 属性を Pulsar Schema INT32 に自動的に設定します。

同様に、以下を宣言した場合

@Incoming("my-pulsar-consumer")
public void consume(org.apache.pulsar.api.client.Message<byte[]> record) {
    ...
}

設定において my-pulsar-consumer チャネルで smallrye-pulsar コネクターを使用するよう指定されていると、Quarkus は schema 属性を Pulsar BYTES Schema に自動的に設定します。

最後に、以下を宣言した場合

@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;

設定において price-create チャネルが smallrye-pulsar コネクターを使用するよう指定されていると、Quarkus は schema を Pulsar INT64 Schema に自動的に設定します。

Pulsar Schema の自動検出でサポートされているすべてのタイプは、次のとおりです。

  • short および java.lang.Short

  • int および java.lang.Integer

  • long および java.lang.Long

  • float および java.lang.Float

  • double および java.lang.Double

  • byte[]

  • java.time.Instant

  • java.sql.Timestamp

  • java.time.LocalDate

  • java.time.LocalTime

  • java.time.LocalDateTime

  • java.nio.ByteBuffer

  • Avro スキーマから生成されたクラスと Avro GenericRecord は、 AVRO スキーマタイプで設定されます。

  • Protobuf スキーマから生成されたクラスは、 PROTOBUF スキーマタイプで設定されます。

  • その他のクラスは、自動的に JSON スキーマタイプで設定されます。

JSON スキーマタイプは、スキーマ検証を適用することに注意してください。

Pulsar が提供するこれらのスキーマに加えて、Quarkus は 検証を適用することなく 次のスキーマ実装を提供します。

  • io.vertx.core.buffer.Buffer は、 io.quarkus.pulsar.schema.BufferSchema スキーマで設定されます。

  • io.vertx.core.json.JsonObject は、 io.quarkus.pulsar.schema.JsonObjectSchema スキーマで設定されます。

  • io.vertx.core.json.JsonArray は、 io.quarkus.pulsar.schema.JsonArraySchema スキーマで設定されます。

  • スキーマレス JSON シリアライゼーションでは、 schema 設定が ObjectMapper<fully_qualified_name_of_the_bean> に設定されている場合、Jackson ObjectMapper を使用してスキーマが生成されます。このとき、Pulsar Schema 検証は適用されません。 io.quarkus.pulsar.schema.ObjectMapperSchema を使用すると、検証なしで JSON スキーマを明示的に設定できます。

schema が設定されている場合、自動検出によって置き換えられることはありません。

シリアライザーの自動検出に問題がある場合は、 quarkus.messaging.pulsar.serializer-autodetection.enabled=false を設定することで、これを完全にオフにすることができます。 オフにする必要があると認識した場合は、Quarkus issue tracker にバグを報告していただければ、どのような問題でも修正することができます。

8. Dev Services for Pulsar

Quarkus Messaging Pulsar エクステンション (quarkus-messaging-pulsar) を使用すると、Dev Services for Pulsar が開発モードおよびテスト実行時に自動で Pulsar ブローカーを起動します。 そのため、ブローカーを手動で起動する必要はありません。 また、アプリケーションの設定も自動的に行われます。

8.1. Dev Services for Pulsar の有効化/無効化

以下の場合を除き、Dev Services for Pulsar は自動的に有効になります。

  • quarkus.pulsar.devservices.enabledfalse に設定されている

  • `pulsar.client.serviceUrl`が設定されている

  • すべての Reactive Messaging Pulsar チャネルには `serviceUrl`属性が設定されている

Dev Services for Pulsar は、ブローカーを起動するために Docker に依存しています。環境が Docker をサポートしていない場合は、ブローカーを手動で起動するか、すでに実行中のブローカーに接続する必要があります。 pulsar.client. を使用してブローカーアドレスを設定できます。

8.2. 共有ブローカー

ほとんどの場合、アプリケーション間でブローカーを共有する必要があります。Dev Services for Plusar は、 開発 モードで動作する複数の Quarkus アプリケーションが 1 つのブローカーを共有するための サービスディスカバリー メカニズムを実装しています。

Dev Services for Pulsar は、コンテナーを識別するために使用される quarkus-dev-service-pulsar ラベルを使用してコンテナーを起動します。

複数の (共有) ブローカーが必要な場合は、quarkus.pulsar.devservices.service-name 属性を設定してブローカー名を指定できます。 値が同じコンテナーを検索し、見つからない場合は新しいコンテナーを開始します。 デフォルトのサービス名は pulsar です。

開発モードでは共有はデフォルトで有効になっていますが、テストモードでは無効になっています。 quarkus.pulsar.devservices.shared=false で共有を無効にできます。

8.3. ポートの設定

デフォルトでは、Dev Services for Pulsar はランダムなポートを選択し、アプリケーションを設定します。 quarkus.pulsar.devservices.port プロパティーを設定することでポートを設定できます。

Pulsar がアドバタイズするアドレスは、選択したポートで自動的に設定されることに注意してください。

8.4. イメージの設定

Dev Services for Pulsar は、https://hub.docker.com/r/apachepulsar/pulsar[公式 Apache Pulsar イメージ] をサポートしています。

カスタムイメージ名は次のように設定できます。

quarkus.pulsar.devservices.image-name=datastax/lunastreaming-all:2.10_4.7

8.5. Pulsar ブローカーの設定

カスタムブローカー設定を使用して、Dev Services for Pulsar を設定できます。

次の例では、トランザクションサポートを有効にします。

quarkus.pulsar.devservices.broker-config.transaction-coordinator-enabled=true
quarkus.pulsar.devservices.broker-config.system-topic-enabled=true

8.6. 設定リファレンス

ビルド時に固定される設定プロパティ - その他の設定プロパティは実行時にオーバーライド可能です。

Configuration property

タイプ

デフォルト

If Dev Services for Pulsar has been explicitly enabled or disabled. Dev Services are generally enabled by default, unless there is an existing configuration present. For Pulsar, Dev Services starts a broker unless pulsar.client.serviceUrl is set or if all the Reactive Messaging Pulsar channel are configured with serviceUrl.

Environment variable: QUARKUS_PULSAR_DEVSERVICES_ENABLED

Show more

ブーリアン

Optional fixed port the dev service will listen to.

If not defined, the port will be chosen randomly.

Environment variable: QUARKUS_PULSAR_DEVSERVICES_PORT

Show more

int

The image to use. Note that only Apache Pulsar images are supported. Specifically, the image repository must end with apachepulsar/pulsar. Check https://hub.docker.com/r/apachepulsar/pulsar to find the available versions.

Environment variable: QUARKUS_PULSAR_DEVSERVICES_IMAGE_NAME

Show more

string

apachepulsar/pulsar:3.2.4

Indicates if the Pulsar broker managed by Quarkus Dev Services is shared. When shared, Quarkus looks for running containers using label-based service discovery. If a matching container is found, it is used, and so a second one is not started. Otherwise, Dev Services for Pulsar starts a new container.

The discovery uses the quarkus-dev-service-pulsar label. The value is configured using the service-name property.

Container sharing is only used in dev mode.

Environment variable: QUARKUS_PULSAR_DEVSERVICES_SHARED

Show more

ブーリアン

true

The value of the quarkus-dev-service-pulsar label attached to the started container. This property is used when shared is set to true. In this case, before starting a container, Dev Services for Pulsar looks for a container with the quarkus-dev-service-pulsar label set to the configured value. If found, it will use this container instead of starting a new one. Otherwise, it starts a new container with the quarkus-dev-service-pulsar label set to the specified value.

This property is used when you need multiple shared Pulsar brokers.

Environment variable: QUARKUS_PULSAR_DEVSERVICES_SERVICE_NAME

Show more

string

pulsar

Broker config to set on the Pulsar instance

Environment variable: QUARKUS_PULSAR_DEVSERVICES_BROKER_CONFIG__ENVIRONMENT_VARIABLE_NAME_

Show more

Map<String,String>

9. Pulsar クライアントの設定

Pulsar のクライアント、コンシューマー、プロデューサーは、Pulsar クライアントアプリケーションの動作を設定するために、細かくカスタマイズすることが可能です。

Pulsar コネクターは、Pulsar クライアントと、チャネルごとのコンシューマーまたはプロデューサーを作成します。それぞれに実用的なデフォルトが設定されているため、設定が容易です。 作成は処理されますが、利用できる設定オプションは、すべて Pulsar チャネルを通じて設定可能です。

通常、 PulsarClientPulsarConsumerPulsarProducer を作成するには、ビルダー API を使用します。ビルダー API は、その性質上、実装に渡す設定オブジェクトを毎回ビルドします。 該当する設定オブジェクトは、 ClientConfigurationDataConsumerConfigurationData、 および ProducerConfigurationData です。

Pulsar コネクターを使用すると、これらの設定オブジェクトのプロパティーを直接受信できます。 たとえば、 PulsarClient のブローカー認証情報は、 authPluginClassName および authParams プロパティーを使用して受信されます。 受信チャネル data の認証を設定するには、以下のように指定します。

mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://localhost:6650
mp.messaging.incoming.data.topic=topic
mp.messaging.incoming.data.subscriptionInitialPosition=Earliest
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"}

Pulsar コンシューマーのプロパティー subscriptionInitialPosition も、列挙値 SubscriptionInitialPosition.Earliest として表される Earliest 値で設定されていることに注意してください。

この方法は、ほとんどの設定ケースに対応できます。 ただし、 CryptoKeyReaderServiceUrlProvider などのシリアライズできないオブジェクトは、この方法では設定できません。 Pulsar コネクターを使用すると、Pulsar 設定データオブジェクト ClientConfigurationDataConsumerConfigurationDataProducerConfigurationData のインスタンスを考慮できます。

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

class PulsarConfig {

    @Produces
    @Identifier("my-consumer-options")
    public ConsumerConfigurationData<String> getConsumerConfig() {
        ConsumerConfigurationData<String> data = new ConsumerConfigurationData<>();
        data.setAckReceiptEnabled(true);
        data.setCryptoKeyReader(DefaultCryptoKeyReader.builder()
                //...
                .build());
        return data;
    }
}

このインスタンスは、コネクターが使用するクライアントを設定するために取得および使用されます。 client-configurationconsumer-configuration、または producer-configuration 属性を使用してクライアントの名前を指定する必要があります。

mp.messaging.incoming.prices.consumer-configuration=my-consumer-options

[client|consumer|producer]-configuration が設定されていない場合、コネクターはチャネル名で識別されるインスタンスを検索します。

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public ClientConfigurationData getClientConfig() {
        ClientConfigurationData data = new ClientConfigurationData();
        data.setEnableTransaction(true);
        data.setServiceUrlProvider(AutoClusterFailover.builder()
                // ...
                .build());
        return data;
    }
}

また、キーごとの設定値を含む Map<String, Object> を指定することもできます。

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import java.util.Map;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public Map<String, Object> getProducerConfig() {
        return Map.of(
                "batcherBuilder", BatcherBuilder.KEY_BASED,
                "sendTimeoutMs", 3000,
                "customMessageRouter", new PartialRoundRobinMessageRouterImpl(4));
    }
}

さまざまな設定ソースは、重要度の最も低いものから最も高いものの順に、次の優先順序で読み込まれます。

  1. デフォルトの設定識別子 (default-pulsar-clientdefault-pulsar-consumerdefault-pulsar-producer) を使用して生成された Map<String, Object> 設定マップ

  2. 設定内またはチャネル名内の識別子を使用して生成された Map<String, Object> 設定マップ

  3. チャネル設定内またはチャネル名内の識別子を使用して生成された [Client|Producer|Consuemr]ConfigurationData オブジェクト

  4. [Client|Producer|Consuemr]ConfigurationData フィールド名を使用して名前が付けられたチャネル設定プロパティー

設定オプションの完全なリストについては、設定リファレンス を参照してください。

9.1. Pulsar 認証の設定

Pulsar は、プラグ可能な認証フレームワークを提供します。Pulsar ブローカー/プロキシーは、このメカニズムを使用してクライアントを認証します。

クライアントは、 authPluginClassName および authParams 属性を使用して application.properties ファイルで設定できます。

pulsar.client.serviceUrl=pulsar://pulsar:6650
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
pulsar.client.authParams={"userId":"superuser","password":"admin"}

または、プログラムで設定することもできます。

import java.util.Map;

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public ClientConfigurationData config() {
        var data = new ClientConfigurationData();
        var auth = new AuthenticationBasic();
        auth.configure(Map.of("userId", "superuser", "password", "admin"));
        data.setAuthentication(auth);
        return data;
    }
}

9.1.1. mTLSを使ったPulsarへの認証設定

Pulsar Messagingエクステンションは Quarkus TLSレジストリ と統合し、mTLSを使ってクライアントを認証します。

パルサー・チャンネルにmTLSを設定するには、 application.properties に名前付きTLS設定を提供する必要があります:

quarkus.tls.my-tls-config.trust-store.p12.path=target/certs/pulsar-client-truststore.p12
quarkus.tls.my-tls-config.trust-store.p12.password=secret
quarkus.tls.my-tls-config.key-store.p12.path=target/certs/pulsar-client-keystore.p12
quarkus.tls.my-tls-config.key-store.p12.password=secret

mp.messaging.incoming.prices.tls-configuration-name=my-tls-config

9.1.2. Datastax Luna Streaming へのアクセスの設定

Luna Streaming は、DataStax のツールとサポートを備えた、Apache Pulsar の製品版ディストリビューションです。 DataStax Luna Pulsar テナントを作成したら、自動生成されたトークンをメモし、トークン認証を設定します。

pulsar.client.serviceUrl=pulsar+ssl://pulsar-aws-eucentral1.streaming.datastax.com:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
pulsar.client.authParams=token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2ODY4MTc4MzQsImlzcyI6ImRhdGFzdGF4Iiwic3ViIjoiY2xpZW50OzA3NGZhOTI4LThiODktNDBhNC04MDEzLWNlNjVkN2JmZWIwZTtjSEpwWTJWejsyMDI5ODdlOGUyIiwidG9rZW5pZCI6IjIwMjk4N2U4ZTIifQ....

必ず事前にトピックを作成するか、namespace 設定で 自動トピック作成 を有効にしてください。

トピック設定では、トピックの完全な名前を参照する必要があることに注意してください。

mp.messaging.incoming.prices.topic=persistent://my-tenant/default/prices

9.1.3. StreamNative Cloud へのアクセスの設定

StreamNative Cloud は、フルマネージドの Pulsar-as-a-Service です。完全ホスト型、パブリッククラウド上で StreamNative によって管理する方式、Kubernetes 上で自己管理する方式など、さまざまなデプロイ方法で利用できます。

StreamNative Pulsar クラスターは Oauth2 認証を使用します。そのため、アプリケーションが使用している Pulsar namespace/トピックへの権限 を持つ サービスアカウント が存在することを確認する必要があります。

次に、サービスアカウントの キーファイル (秘密鍵 として機能します) をダウンロードし、発行者 URL (通常は https://auth.streamnative.cloud/) とクラスターの オーディエンス (urn:sn:pulsar:o-rf3ol:redhat など) をメモする必要があります。 この手順を行う際には、StreamNative Cloud コンソールの Admin セクションにある Pulsar Clients ページが役立ちます。

Pulsar Oauth2 認証を使用してアプリケーションを設定するには、以下のように指定します。

pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
pulsar.client.authParams={"type":"client_credentials","privateKey":"data:application/json;base64,<base64-encoded value>","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:o-rfwel:redhat"}

pulsar.client.authParams 設定には、 issuerUrlaudience、および data:application/json;base64,<base64-encoded-key-file> という形式の privateKey を含む Json 文字列が含まれていることに注意してください。

または、プログラムで認証を設定することもできます。

package org.acme.pulsar;

import java.net.MalformedURLException;
import java.net.URL;

import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

@ApplicationScoped
public class PulsarAuth {

    @ConfigProperty(name = "pulsar.issuerUrl")
    String issuerUrl;

    @ConfigProperty(name = "pulsar.credentials")
    String credentials;

    @ConfigProperty(name = "pulsar.audience")
    String audience;

    @Produces
    @Identifier("pulsar-auth")
    public ClientConfigurationData pulsarClientConfig() throws MalformedURLException {
        var data = new ClientConfigurationData();
        data.setAuthentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), PulsarAuth.class.getResource(credentials), audience));
        return data;
    }
}

これは、キーファイルがリソースとしてアプリケーションクラスパスに含まれていることを前提としています。この場合、設定は次のようになります。

mp.messaging.incoming.prices.client-configuration=pulsar-auth

pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.issuerUrl=https://auth.streamnative.cloud/
pulsar.audience=urn:sn:pulsar:o-rfwel:redhat
pulsar.credentials=/o-rfwel-quarkus-app.json

pulsar-auth で識別されるクライアント設定を使用するチャネルには、 client-configuration 属性を設定する必要があることに注意してください。

10. ヘルスチェック

Quarkus エクステンションは、Pulsar コネクターによって管理される各チャネルの起動、準備状況、および稼働状況を報告します。 ヘルスチェックは、Pulsar クライアントに依存してブローカーとの接続が確立されていることを確認します。

着信チャネルと発信チャネルの Startup および Readiness プローブは、ブローカーとの接続が確立されると、OK を報告します。

着信チャネルと発信チャネルの Liveness プローブは、ブローカーとの接続が確立され、かつ 失敗が発見されなかった場合に、OK を報告します。

メッセージ処理が失敗すると、メッセージが 否定応答 されることに注意してください。その場合、メッセージは失敗ストラテジーによって処理されます。失敗ストラテジーは、失敗を報告し、liveness チェックの結果に影響を与えます。失敗ストラテジー `fail`は、失敗を報告するため、liveness チェックも失敗を報告します。

11. 設定リファレンス

以下は、Pulsar コネクターのチャネル、コンシューマー、プロデューサー、およびクライアントの設定属性のリストです。 Pulsar クライアントの設定方法の詳細は、Pulsar クライアントの設定 を参照してください。

11.1. 着信チャネル設定 (Pulsar からの受信)

以下の属性は以下のように設定します:

mp.messaging.incoming.your-channel-name.attribute=value
Table 1. 'smallrye-pulsar' コネクターの受信属性
属性 (alias) 説明 タイプ 必須 デフォルト

ack-strategy

レコードから生成されたメッセージが確認されたときに適用するコミット戦略を指定します。値は ack , cumulative です。

string

false

ack

ackTimeout.redeliveryBackoff

カンマ区切りの値で ack タイムアウトを設定 MultiplierRedeliveryBackoff、min delay、max delay、multiplier。

string

false

batchReceive

バッチ受信を使用してメッセージを消費するかどうか

boolean

false

false

client-configuration

このチャネルのデフォルトPulsarクライアント設定を提供するCDI Beanの識別子。チャネル構成は、任意の属性をオーバーライドすることができます。BeanはMap<String, Object>型を持ち、@io.smallrye.common.annotation.Identifierアノテーションを使用して識別子を設定する必要があります。

string

false

consumer-configuration

このチャネルのデフォルトPulsarコンシューマ設定を提供するCDI Beanの識別子。チャネル構成は、任意の属性をオーバーライドすることができます。BeanはMap<String, Object>型を持ち、@io.smallrye.common.annotation.Identifierアノテーションを使用して識別子を設定する必要があります。

string

false

deadLetterPolicy.deadLetterTopic

失敗メッセージが送信されるデッドレタートピックの名前

string

false

deadLetterPolicy.initialSubscriptionName

デッド・レタートピックの初期サブスクリプション名

string

false

deadLetterPolicy.maxRedeliverCount

デッドレター・トピックに送信される前にメッセージが再配信される最大回数。

int

false

deadLetterPolicy.retryLetterTopic

失敗したメッセージが送信される再試行トピックの名前

string

false

failure-strategy

レコードから生成されたメッセージが否定応答(nack)されたときに適用する失敗ストラテジーを指定します。指定できる値は nack (デフォルト)、 failignore あるいは `reconsume-later' です。

string

false

nack

health-enabled

ヘルスレポートが有効(デフォルト)か無効か

boolean

false

true

negativeAck.redeliveryBackoff

負の ack を設定するためのカンマ区切りの値 MultiplierRedeliveryBackoff、min delay、max delay、multiplier。

string

false

reconsumeLater.delay

リコンシューム失敗戦略のデフォルト遅延時間(秒

long

false

3

schema

このチャネルのPulsarスキーマ・タイプ。構成されると、指定されたSchemaTypeでスキーマが構築され、チャネルに使用されます。指定されていない場合、スキーマは @Identifier とチャネル名で修飾されたCDI Bean型 Schema を検索して解決されます。フォールバックとして AUTO_CONSUME または AUTO_PRODUCE が使用されます。

string

false

serviceUrl

パルサー・サービスのサービスURL

string

false

pulsar://localhost:6650

topic

消費された/入力されたパルサー・トピック。設定されていない場合、チャンネル名が使用されます

string

false

tracing-enabled

トレースが有効(デフォルト)か無効か

boolean

false

true

基盤となる Pulsar コンシューマーがサポートするプロパティーを設定することもできます。

これらのプロパティーは、 pulsar.consumer 接頭辞を使用してグローバルに設定することもできます。

pulsar.consumer.subscriptionInitialPosition=Earliest
Table 2. Pulsar コンシューマー属性
属性 説明 タイプ 設定ファイル デフォルト

topicNames

トピック名

Set

true

[]

topicsPattern

トピックパターン

Pattern

true

subscriptionName

サブスクリプション名

文字列

true

subscriptionType

サブスクリプションタイプ。 4つのサブスクリプションタイプがあります:
* Exclusive
* Failover
* Shared
* Key_Shared

SubscriptionType

true

Exclusive

subscriptionProperties

Map

true

subscriptionMode

SubscriptionMode

true

Durable

messageListener

MessageListener

false

consumerEventListener

ConsumerEventListener

false

negativeAckRedeliveryBackoff

カスタム・メッセージのインタフェースは negativeAcked ポリシーです。コンシューマには RedeliveryBackoff を指定できます。

RedeliveryBackoff

false

ackTimeoutRedeliveryBackoff

カスタム・メッセージのインターフェースは ackTimeout ポリシーです。コンシューマに対して RedeliveryBackoff を指定できます。

RedeliveryBackoff

false

receiverQueueSize

コンシューマの受信キューのサイズ。

例えば、アプリケーションが Receive を呼び出す前に、コンシューマに蓄積されるメッセージ数。

デフォルト値より大きい値を設定すると、メモリ使用量が増加しますが、コンシューマのスループットが向上します。

int

true

1000

acknowledgementsGroupTimeMicros

コンシューマの確認応答を指定時間グループ化します。

デフォルトでは、コンシューマはブローカに確認応答を送信するのに100msのグループ化時間を使用します。

グループ化時間を 0 に設定すると、確認応答を直ちに送信します。

グループ化時間を長くすると効率的ですが、その分失敗後のメッセージ再送が若干増えます。

long

true

100000

maxAcknowledgmentGroupSize

メッセージの数だけコンシューマー確認をグループ化します。

int

true

1000

negativeAckRedeliveryDelayMicros

処理に失敗したメッセージを再配信するまでの待ち時間。

アプリケーションが Consumer#negativeAcknowledge(Message) を使用する場合、処理に失敗したメッセージは一定のタイムアウト後に再配信されます。

long

true

60000000

maxTotalReceiverQueueSizeAcrossPartitions

パーティション間のレシーバ・キュー・サイズの合計の最大値。

この設定により、合計レシーバ・キュー・サイズがこの値を超えた場合、個々のパーティションのレシーバ・キュー・サイズが減少します。

int

true

50000

consumerName

コンシューマー名

文字列

true

ackTimeoutMillis

非暗号化メッセージのタイムアウト

long

true

0

tickDurationMillis

ack-timeout 再配信の粒度。

tickDurationMillis を大きくすることで、ack-timeout を大きな値 (たとえば 1 時間) に設定したときにメッセージを追跡するためのメモリ・オーバーヘッドを減らすことができます。

long

true

1000

priorityLevel

共有サブスクリプションタイプでメッセージをディスパッチする際に、ブローカーがより優先順位を与えるコンシューマーの優先順位レベル。

ブローカーは優先順位の降順に従います。例えば、0=最大優先度、1、2、…​のように。

共有サブスクリプションタイプでは、ブローカーは まず、最大優先度レベルのコンシューマーが許可を持っていれば、そのコンシューマーにメッセージをディスパッチ します。そうでない場合、ブローカーは次の優先順位のコンシューマーを考慮します。 例1

サブスクリプションに priorityLevel 0 の consumerA と priorityLevel 1 の consumerB がある場合、ブローカはパーミットを 使い果たすまで consumerA にのみメッセージをディスパッチ し、その後 consumerB にメッセージをディスパッチし始めます。 例2

コンシューマの優先度、レベル、パーミット

C1, 0, 2

C2, 0, 1

C3, 0, 1

C4, 1, 2

C5, 1, 1

int

true

0

maxPendingChunkedMessage

保留中のチャンク・メッセージを保持するキューの最大サイズ。この閾値に達すると、コンシューマは保留中のメッセージを削除してメモリ使用率を最適化します。

int

true

10

autoAckOldestChunkedMessageOnQueueFull

maxPendingChunkedMessage のしきい値に達したときに、保留中のチャンク・メッセージを自動的に承認するかどうか。 false に設定すると、これらのメッセージはブローカによって再配信されます。

boolean

true

false

expireTimeOfIncompleteChunkedMessageMillis

コンシューマが指定された時間内にすべてのチャンクを受信できなかった場合に、不完全なチャンクを失効させる時間間隔。デフォルト値は 1 分です。

long

true

60000

cryptoKeyReader

CryptoKeyReader

false

messageCrypto

MessageCrypto

false

cryptoFailureAction

復号化できないメッセージを受信した場合、コンシューマーは以下のアクションを取る 必要があります。

* FAIL : 暗号化が成功するまでメッセージをフェイルするデフォルトのオプション。

DISCARD* :(メッセージを)黙認し、アプリケーションに配信しません。

CONSUME* :暗号化されたメッセージをアプリケーションに配信します。メッセージを復号化するのはアプリケーションの責任です。

ConsumerCryptoFailureAction

true

FAIL

properties

このコンシューマの名前または値のプロパティ。

SortedMap

true

{}

readCompacted

readCompacted を有効にすると、コンシューマーはトピックの完全なメッセージ バックログを読み取るのではなく、圧縮されたトピックからメッセージを読み取ります。

コンシューマーは、バックログを圧縮するときのトピック メッセージ内のポイントに到達するまで、圧縮されたトピック内の各キーの最新の値のみを参照します。そのポイントを超えると、通常どおりメッセージを送信します。

アクティブなコンシューマーが 1 つある永続トピック (失敗または排他サブスクリプションなど) へのサブスクリプションでのみ、 readCompacted を有効にします。

非永続トピックへのサブスクリプションまたは共有サブスクリプションでこれを有効にしようとすると、サブスクリプション呼び出しで PulsarClientException がスローされます。

boolean

true

false

subscriptionInitialPosition

最初にトピックを購読するときにカーソルを設定する初期位置。

SubscriptionInitialPosition

true

Latest

patternAutoDiscoveryPeriod

トピックの消費者にパターンを使用する場合のトピック自動検出期間。

int

true

60

regexSubscriptionMode

正規表現を使ってトピックを購読する場合、特定の種類のトピックを選ぶことができます。

RegexSubscriptionMode

true

PersistentOnly

deadLetterPolicy

コンシューマー向けのデッドレター ポリシー。

デフォルトでは、一部のメッセージは何度も再配信される可能性があり、停止しないこともあります。

デッドレター メカニズムを使用すると、メッセージの再配信回数が最大になります。再配信の最大回数を超えると、メッセージはデッドレター トピックに送信され、自動的に確認応答されます

デッドレター メカニズムを有効にするには、 deadLetterPolicy を設定します。

ackTimeoutMillis を指定せずにデッドレター ポリシーを指定すると、ack タイムアウトを 30000 ミリ秒に設定できます。

DeadLetterPolicy

true

retryEnable

boolean

true

false

batchReceivePolicy

BatchReceivePolicy

false

autoUpdatePartitions

autoUpdatePartitions が有効な場合、コンシューマーは自動的にパーティションの増加に加入します。

boolean

true

true

autoUpdatePartitionsIntervalSeconds

long

true

60

replicateSubscriptionState

replicateSubscriptionState が有効な場合、サブスクリプションの状態は地理的に複製されたクラスタに複製されます。

boolean

true

false

resetIncludeHead

boolean

true

false

keySharedPolicy

KeySharedPolicy

false

batchIndexAckEnabled

boolean

true

false

ackReceiptEnabled

boolean

true

false

poolMessages

boolean

true

false

payloadProcessor

MessagePayloadProcessor

false

startPaused

boolean

true

false

autoScaledReceiverQueueSizeEnabled

boolean

true

false

topicConfigurations

List

true

[]

11.2. 発信チャネルの設定 (Pulsar への公開)

Table 3. 'smallrye-pulsar' コネクターの発信属性
属性 (alias) 説明 タイプ 必須 デフォルト

client-configuration

このチャネルのデフォルトPulsarクライアント設定を提供するCDI Beanの識別子。チャネル設定は、任意の属性をオーバーライドすることができます。BeanはMap<String, Object>型を持ち、@io.smallrye.common.annotation.Identifierアノテーションを使用して識別子を設定する必要があります。

string

false

health-enabled

ヘルスレポートが有効(デフォルト)か無効か

boolean

false

true

maxPendingMessages

保留メッセージ、つまりブローカーからの確認応答を待っているメッセージを保持するキューの最大サイズ。

int

false

1000

producer-configuration

このチャネルのデフォルトPulsarプロデューサー設定を提供するCDI Beanの識別子。チャネル設定は、任意の属性をオーバーライドすることができます。BeanはMap<String, Object>型を持ち、@io.smallrye.common.annotation.Identifierアノテーションを使用して識別子を設定する必要があります。

string

false

schema

このチャネルのPulsarスキーマ・タイプ。設定されると、指定されたSchemaTypeでスキーマが構築され、チャネルに使用されます。指定されていない場合、スキーマは @Identifier とチャネル名で修飾されたCDI Bean型 Schema を検索して解決されます。フォールバックとして AUTO_CONSUME または AUTO_PRODUCE が使用されます。

string

false

serviceUrl

パルサー・サービスのサービスURL

string

false

pulsar://localhost:6650

topic

消費された/入力されたパルサー・トピック。設定されていない場合、チャンネル名が使用されます

string

false

tracing-enabled

トレースが有効(デフォルト)か無効か

boolean

false

true

waitForWriteCompletion

顧客がメッセージを確認する前に、ブローカーが書かれたレコードを確認するのを待つかどうか。

boolean

false

true

基盤となる Pulsar プロデューサーがサポートするプロパティーを設定することもできます。

これらのプロパティーは、 pulsar.producer 接頭辞を使用してグローバルに設定することもできます。

pulsar.producer.batchingEnabled=false
Table 4. Pulsar プロデューサー属性
属性 説明 タイプ 設定ファイル デフォルト

topicName

トピック名

文字列

true

producerName

プロデューサー名

文字列

true

sendTimeoutMs

ミリ秒単位のメッセージ送信のタイムアウト。 sendTimeout が切れる前にメッセージがサーバによって確認されないと、エラーが発生します。

long

true

30000

blockIfQueueFull

true に設定した場合、送信メッセージ キューがいっぱいになったときに、プロデューサーの Send メソッドと SendAsync メソッドは失敗してエラーをスローするのではなく、ブロックされます。+ false に設定した場合、送信メッセージ キューがいっぱいになったときに、プロデューサーの Send メソッドと SendAsync メソッドは失敗し、 ProducerQueueIsFullError 例外が発生します。

MaxPendingMessages パラメーターは、送信メッセージ キューのサイズを決定します。

boolean

true

false

maxPendingMessages

保留中のメッセージを保持するキューの最大サイズ。

int

true

0

maxPendingMessagesAcrossPartitions

パーティション間の保留メッセージの最大数。

int

true

0

messageRoutingMode

パーティショニングされたトピック 上のプロデューサに対するメッセージ・ルーティング・ロジック。

メッセージにキーを設定しない場合にのみ、このロジックを適用します。

使用可能なオプションは以下のとおりです: * pulsar.RoundRobinDistribution : ラウンドロビン+ * pulsar.UseSinglePartition : すべてのメッセージを単一のパーティションにパブリッシュします+ * pulsar.CustomPartition : カスタムパーティショニングスキーム

MessageRoutingMode

true

hashingScheme

特定のメッセージを公開するパーティションを決定するハッシュ関数 (パーティション化されたトピックのみ)。 使用可能なオプションは以下のとおりです:+ * pulsar.JavastringHash : Java の string.hashCode() に相当します。+ * pulsar.Murmur3_32Hash : Murmur3 のハッシュ関数を適用します。+ * pulsar.BoostHash : C++ の Boost ライブラリのハッシュ関数を適用します。

HashingScheme

true

JavaStringHash

cryptoFailureAction

暗号化に失敗した場合、プロデューサはアクションを起こす必要があります。+ * FAIL : 暗号化に失敗した場合、暗号化されていないメッセージは送信されませ ん。+ * SEND : 暗号化に失敗した場合、暗号化されていないメッセージが送信されます。

ProducerCryptoFailureAction

true

FAIL

customMessageRouter

MessageRouter

false

batchingMaxPublishDelayMicros

メッセージ送信のバッチ期間。

long

true

1000

batchingPartitionSwitchFrequencyByPublishDelay

int

true

10

batchingMaxMessages

バッチで許可されるメッセージの最大数。

int

true

1000

batchingMaxBytes

int

true

131072

batchingEnabled

メッセージのバッチ処理を有効にします。

boolean

true

true

batcherBuilder

BatcherBuilder

false

chunkingEnabled

メッセージのチャンキングを有効にします。

boolean

true

false

chunkMaxMessageSize

int

true

-1

cryptoKeyReader

CryptoKeyReader

false

messageCrypto

MessageCrypto

false

encryptionKeys

Set

true

[]

compressionType

プロデューサーが使用するメッセージデータ圧縮タイプ。+ 使用可能なオプション:+ * LZ4
* ZLIB
* ZSTD
* SNAPPY

CompressionType

true

NONE

initialSequenceId

Long

true

autoUpdatePartitions

boolean

true

true

autoUpdatePartitionsIntervalSeconds

long

true

60

multiSchema

boolean

true

true

accessMode

ProducerAccessMode

true

Shared

lazyStartPartitionedProducers

boolean

true

false

properties

SortedMap

true

{}

initialSubscriptionName

トピックの作成時に初期サブスクリプションを自動的に作成するには、この設定を使用します。このフィールドが設定されていない場合、初期サブスクリプションは作成されません。

文字列

true

11.3. Pulsar クライアントの設定

以下は、基盤となる PulsarClient の設定リファレンスです。 これらのオプションは、チャネル属性を使用して設定できます。

mp.messaging.incoming.your-channel-name.numIoThreads=4

または、 pulsar.client 接頭辞を使用してグローバルに設定します。

pulsar.client.serviceUrl=pulsar://pulsar:6650
Table 5. Pulsar クライアント属性
属性 説明 タイプ 設定ファイル デフォルト

serviceUrl

ブローカーに接続するためのPulsarクラスタHTTP URL。

文字列

true

serviceUrlProvider

ServiceUrl を生成する ServiceUrlProvider の実装クラスです。

ServiceUrlProvider

false

authentication

クライアントの認証設定。

認証

false

authPluginClassName

クライアントの認証プラグインのクラス名。

文字列

true

authParams

クライアントの認証パラメータ。

文字列

true

authParamMap

クライアントの認証マップ。

Map

true

operationTimeoutMs

クライアント操作のタイムアウト(ミリ秒)。

long

true

30000

lookupTimeoutMs

クライアント・ルックアップ・タイムアウト(ミリ秒)。

long

true

-1

statsIntervalSeconds

クライアントの統計情報を出力する間隔(秒)。

long

true

60

numIoThreads

IOスレッド数。

int

true

10

numListenerThreads

コンシューマー・リスナーのスレッド数。

int

true

10

connectionsPerBroker

クライアントと各 Broker 間で確立される接続の数。値 0 は、コネクション・プーリングを無効にすることを意味します。

int

true

1

connectionMaxIdleSeconds

[connectionMaxIdleSeconds] 秒以上接続が使用されない場合、接続を解放します。[connectionMaxIdleSeconds] < 0 の場合、アイドル状態の接続を自動的に解放する機能を無効にします。

int

true

180

useTcpNoDelay

TCP NoDelayオプションを使用するかどうか。

boolean

true

true

useTls

TLSを使用するかどうか。

boolean

true

false

tlsKeyFilePath

TLSキーファイルへのパス。

文字列

true

tlsCertificateFilePath

TLS証明書ファイルへのパス。

文字列

true

tlsTrustCertsFilePath

信頼するTLS証明書ファイルへのパス。

文字列

true

tlsAllowInsecureConnection

クライアントがブローカーから信頼されていない TLS 証明書を受け入れるかどうか。

boolean

true

false

tlsHostnameVerificationEnable

クライアントがブローカーとTLS接続を作成するときにホスト名を検証するかどうか。

boolean

true

false

concurrentLookupRequest

各ブローカー接続で同時に送信できるルックアップ要求の数。最大値を設定することで、ブローカーに過負荷がかかるのを防ぎます。

int

true

5000

maxLookupRequest

ブローカーへの過負荷を防ぐため、各ブローカー接続で許可されるルックアップ要求の最大数。

int

true

50000

maxLookupRedirects

リダイレクトされたルックアップ要求の最大回数。

int

true

20

maxNumberOfRejectedRequestPerConnection

現在の接続が閉じられ、クライアントが別のブローカーに接続するために新しい接続を作成した後、一定時間内(60秒)にブローカーが拒否したリクエストの最大数。

int

true

50

keepAliveIntervalSeconds

各クライアント・ブローカー接続のキープアライブインターバルの秒数。

int

true

30

connectionTimeoutMs

ブローカーからの応答がないまま期間が経過すると、接続の試みは中止されます。

int

true

10000

requestTimeoutMs

リクエスト完了までの最長時間。

int

true

60000

readTimeoutMs

リクエストの最大読み取り時間。

int

true

60000

autoCertRefreshSeconds

証明書の自動更新の秒数。

int

true

300

initialBackoffIntervalNanos

最初のバックオフ間隔(ナノ秒単位)。

long

true

100000000

maxBackoffIntervalNanos

最大バックオフ間隔(ナノ秒)。

long

true

60000000000

enableBusyWait

EpollEventLoopGroupのBusyWaitを有効にするかどうか。

boolean

true

false

listenerName

ルックアップするリスナー名。クライアントは、ネットワークがアクセス可能である限り、 listenerName を使用して、ブローカーへの接続を作成するサービス URL としてリスナーのいずれかを選択できます。"advertisedListeners" はブローカー側で有効になっている必要があります。

文字列

true

useKeyStoreTls

KeyStoreの方法でTLSを設定します。

boolean

true

false

sslProvider

内部クライアントが他のPulsarブローカーとの認証に使用するTLSプロバイダー。

文字列

true

tlsKeyStoreType

TLS KeyStoreタイプの設定。

文字列

true

JKS

tlsKeyStorePath

TLS KeyStore のパス。

文字列

true

tlsKeyStorePassword

TLS KeyStoreのパスワード。

文字列

true

tlsTrustStoreType

TLS TrustStoreタイプの設定。クライアント認証が必要な場合は、この設定を行う必要があります。

文字列

true

JKS

tlsTrustStorePath

TLS TrustStoreのパス。

文字列

true

tlsTrustStorePassword

TLS TrustStoreのパスワード。

文字列

true

tlsCiphers

TLS Ciphersのセット。

Set

true

[]

tlsProtocols

TLSのプロトコル。

Set

true

[]

memoryLimitBytes

クライアントのメモリ使用量の上限 (バイト単位)。デフォルトの 64M は高いプロデューサのスループットを保証します。

long

true

67108864

proxyServiceUrl

proxyServiceUrl と proxyProtocol は相互に含める必要があります。

文字列

true

proxyProtocol

プロキシサービスのプロトコル。proxyServiceUrl と proxyProtocol は相互に含める必要があります。

ProxyProtocol

true

enableTransaction

トランザクションを有効にするかどうか。

boolean

true

false

clock

Clock

false

dnsLookupBindAddress

PulsarクライアントのDNSルックアップバインド・アドレス、デフォルトの動作は0.0.0.0にバインドされます。

文字列

true

dnsLookupBindPort

dnsLookupBindAddressが構成されている場合に有効となります。デフォルト値は0です。

int

true

0

socks5ProxyAddress

SOCKS5プロキシのアドレス。

InetSocketAddress

true

socks5ProxyUsername

SOCKS5プロキシのユーザー名。

文字列

true

socks5ProxyPassword

SOCKS5プロキシのパスワード。

文字列

true

description

クライアント・バージョンの追加説明。長さは64を超えることはできません。

文字列

true

設定ファイルで設定できない (シリアライズできない) 設定プロパティーは、 Config file 列にその旨が記載されています。

12. さらに詳しく

このガイドでは、Quarkus を使用して Pulsar と対話する方法を説明しました。 Quarkus は、Quarkus Messaging を利用してデータストリーミングアプリケーションを構築します。

さらに詳しく知りたい場合は、Quarkusで使用されている実装、 SmallRye Reactive Messaging のドキュメントを確認してください。

関連コンテンツ