Apache Pulsar リファレンスガイド
このリファレンスガイドでは、Quarkus アプリケーションが Quarkus Messaging を利用して Apache Pulsar と対話する仕組みを説明します。
1. はじめに
Apache Pulsar は、クラウド向けに構築されたオープンソースの分散メッセージングおよびストリーミングプラットフォームです。 階層型ストレージ機能により、マルチテナントの高性能ソリューションをサーバーメッセージングに提供します。
Pulsar はパブリッシュ - サブスクライブパターンを実装します。
-
プロデューサーは トピック にメッセージを公開します。
-
コンシューマーは、そのトピックへの サブスクリプション を作成し、着信メッセージを受信して処理します。処理が完了すると、ブローカーに 完了通知 を送信します。
-
サブスクリプションが作成されると、コンシューマーが切断されても、Pulsar はすべてのメッセージを保持します。 保持されたメッセージは、これらのメッセージがすべて正常に処理されたことをコンシューマーが確認した場合にのみ破棄されます。
Pulsar クラスターは、以下の要素で構成されます。
-
ステートレスコンポーネントである 1 つ以上のブローカー。
-
トピックのメタデータ、スキーマ、調整、クラスター設定を維持するための メタデータストア。
-
メッセージの永続的な保存に使用される bookies のセット。
2. Apache Pulsar のための Quarkus エクステンション
Quarkus は、 SmallRye Reactive Messaging フレームワークを通じて Apache Pulsar のサポートを提供します。 Eclipse MicroProfile Reactive Messaging 仕様 3.0 に基づいて、CDI とイベント駆動型を橋渡しする柔軟なプログラミングモデルを提案します。
このガイドでは、Apache Pulsar と SmallRye Reactive Messaging フレームワークについて詳しく説明します。 クイックスタートについては、Apache Pulsar を使用した Quarkus Messaging の概要 を参照してください。 |
プロジェクトのベースディレクトリーで次のコマンドを実行すると、messaging-pulsar
エクステンションをプロジェクトに追加できます。
quarkus extension add messaging-pulsar
./mvnw quarkus:add-extension -Dextensions='messaging-pulsar'
./gradlew addExtension --extensions='messaging-pulsar'
これにより、ビルドファイルに次の内容が追加されます。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-pulsar</artifactId>
</dependency>
implementation("io.quarkus:quarkus-messaging-pulsar")
このエクステンションには、 |
3. SmallRye Pulsar コネクターの設定
SmallRye Reactive Messaging フレームワークは、Apache Kafka、Apache Pulsar、AMQP、Apache Camel、JMS、MQTT などのさまざまなメッセージングバックエンドをサポートしているため、汎用的な語彙を使用しています。
-
アプリケーションは メッセージ を送受信します。
Message
は、ペイロード をラップし、いくつかの メタデータ で拡張できます。 これは、値とキーで構成される PulsarMessage
と混同しないでください。 Pulsar コネクターでは、Reactive Messaging メッセージ が Pulsar メッセージ に対応します。 -
メッセージは チャネル を通過します。アプリケーションコンポーネントは、チャネルに接続してメッセージを公開および消費します。Pulsar コネクターは、チャネル を Pulsar トピック にマッピングします。
-
チャネルは コネクター を使用してメッセージバックエンドに接続します。 コネクターは、着信メッセージを特定のチャネル (アプリケーションが消費するチャネル) にマッピングし、特定のチャネルに送信された発信メッセージを収集するように設定されています。 各コネクターは、特定のメッセージングテクノロジーに特化しています。 たとえば、Pulsar を処理するコネクターの名前は、
smallrye-pulsar
となっています。
着信チャネルを持つ Pulsar コネクターの最小設定は次のようになります。
%prod.pulsar.client.serviceUrl=pulsar:6650 (1)
mp.messaging.incoming.prices.connector=smallrye-pulsar (2)
1 | プロダクションプロファイルの Pulsar ブローカーサービス URL を設定します。
この URL は、mp.messaging.incoming.$channel.serviceUrl プロパティーを使用して、グローバルに、またはチャネルごとに設定できます。
開発モードおよびテスト実行時には、Dev Services for Pulsar が Pulsar ブローカーを自動的に起動します。 |
2 | prices チャネルを管理するためのコネクターを設定します。 デフォルトでは、トピック 名はチャネル名と同じです。 |
トピック名は、トピック属性を設定することで上書きできます。
%prod 接頭辞は、アプリケーションが本番モードで実行される場合にのみプロパティーが使用されることを示します (つまり、開発モードまたはテストモードでは使用されません)。詳細は、プロファイルに関するドキュメント を参照してください。
|
コネクターの自動アタッチ
クラスパスに単一のコネクターがある場合は、 この自動アタッチは、以下を使用して無効にできます。
|
設定オプションの詳細は、Pulsar クライアントの設定 を参照してください。
4. Pulsar からのメッセージの受信
Pulsar コネクターは、Pulsar クライアントを使用して Pulsar ブローカーに接続し、コンシューマーを作成して Pulsar ブローカーからメッセージを受信します。また、各 Pulsar Message
を Reactive Messaging Message
にマッピングします。
4.1. 例
Pulsar ブローカーが実行中で、pulsar:6650
アドレスを使用してアクセスできるとします。
次のようにして、prices
チャネルで Pulsar メッセージを受信するようにアプリケーションを設定します。
mp.messaging.incoming.prices.serviceUrl=pulsar://pulsar:6650 (1)
mp.messaging.incoming.prices.subscriptionInitialPosition=Earliest (2)
-
Pulsar ブローカーサービスの URL を設定します。
-
コンシューマーのサブスクリプションが
Earliest
位置からメッセージの受信を開始することを確認します。
Pulsar トピックやコンシューマー名を設定する必要はありません。
デフォルトでは、コネクターはチャネル名 ( |
Pulsar では、コンシューマーがトピックサブスクリプションの |
すると、アプリケーションが double
ペイロードを直接受信できるようになります。
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PriceConsumer {
@Incoming("prices")
public void consume(double price) {
// process your price.
}
}
または、Reactive Messaging タイプ Message<Double>
を取得することもできます。
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
// access record metadata
var metadata = msg.getMetadata(PulsarIncomingMessageMetadata.class).orElseThrow();
// process the message payload.
double price = msg.getPayload();
// Acknowledge the incoming message (acknowledge the Pulsar message back to the broker)
return msg.ack();
}
Reactive Messaging の Message
タイプを使用すると、コンシューマーメソッドが着信メッセージのメタデータにアクセスし、確認応答を手動で処理できます。
Pulsar メッセージオブジェクトに直接アクセスする場合は、次を使用します。
@Incoming("prices")
public void consume(org.apache.pulsar.client.api.Message<Double> msg) {
String key = msg.getKey();
String value = msg.getValue();
String topic = msg.topicName();
// ...
}
org.apache.pulsar.client.api.Message
は、基盤となる Pulsar クライアントによって提供され、コンシューマーメソッドで直接使用できます。
または、次の例のように、アプリケーションで、チャネル名によって識別される Multi
を Bean に注入し、そのイベントをサブスクライブすることもできます。
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;
@Path("/prices")
public class PriceResource {
@Inject
@Channel("prices")
Multi<Double> prices;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<Double> stream() {
return prices;
}
}
|
チャネルとして注入できるのは、以下のタイプです。
@Inject @Channel("prices") Multi<Double> streamOfPayloads;
@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;
@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;
@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;
前出の Message
の例と同様に、注入されたチャネルがペイロード (Multi<T>
) を受信した場合は、そのチャネルが自動的にメッセージを確認応答し、複数のサブスクライバーをサポートします。
注入されたチャネルが Message (Multi<Message<T>>
) を受信した場合は、手動で確認応答とブロードキャストを行う必要があります。
4.2. ブロッキング処理
リアクティブメッセージングは、I/O スレッドでメソッドを呼び出します。このトピックの詳細については、 Quarkus リアクティブアーキテクチャーのドキュメント を参照してください。ただし、多くの場合、リアクティブメッセージングとデータベースインタラクションなどのブロック処理を組み合わせる必要があります。このためには、処理が ブロッキング であり、呼び出し元のスレッドで実行するべきではないことを示す @Blocking
アノテーションを使用する必要があります。
例えば、以下のコードは、Hibernate with Panacheを 使用してデータベースに受信ペイロードを格納する方法を示しています。
import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
@ApplicationScoped
public class PriceStorage {
@Incoming("prices")
@Transactional
public void store(int priceInUsd) {
Price price = new Price();
price.value = priceInUsd;
price.persist();
}
}
効果はどちらも同じです。したがって、両方を使うことができます。1番目のものは、使用するワーカープールや順序を保持するかどうかなど、より細かい調整が可能です。2 番目のものは、Quarkus の他のリアクティブ機能でも使用され、デフォルトのワーカープールを使用し、順序を保持します。
|
@RunOnVirtualThread
Java 仮想スレッドでの ブロッキング処理の実行については、 Quarkus Virtual Thread support with Reactive Messagingのドキュメント を参照してください。 |
@Transactional
メソッドに |
4.3. Pulsar のサブスクリプションタイプ
Pulsar の subscriptionType コンシューマー設定は、パブリッシュ - サブスクライブやキューイングなど、さまざまなメッセージングシナリオを実現するために柔軟に使用できます。
-
Exclusive サブスクリプションタイプ使用すると、"ファンアウト型の pub/sub メッセージング" 用に 一意のサブスクリプション名 を指定できます。これがデフォルトのサブスクリプションタイプです。
-
Shared、Key_Shared、または Failover サブスクリプションタイプを使用すると、複数のコンシューマーで 同じサブスクリプション名 を共有して、コンシューマー間で "メッセージキューイング" を実現できます。
サブスクリプション名が指定されていない場合、Quarkus は一意の ID を生成します。
4.4. デシリアライゼーションと Pulsar Schema
Pulsar コネクターを使用すると、基盤となる Pulsar コンシューマーのスキーマ設定を設定できます。 詳細は、Pulsar Schema 設定と自動スキーマ検出 を参照してください。
4.5. 確認応答ストラテジー
Pulsar メッセージから生成されたメッセージが 確認応答 されると、コネクターが 確認応答リクエスト を Pulsar ブローカーに送信します。 すべての Reactive Messaging メッセージが 確認応答 される必要があります。ほとんどの場合、これは自動的に処理されます。 確認応答リクエストは、次の 2 つのストラテジーを使用して Pulsar ブローカーに送信できます。
-
個別確認応答 は、デフォルトのストラテジーです。確認応答リクエストが各メッセージに対してブローカーに送信されます。
-
累積確認応答 は、
ack-strategy=cumulative
を使用して設定されるストラテジーです。コンシューマーが受信した最後のメッセージのみを確認します。 提供されたメッセージまでのストリーム内の全メッセージは、そのコンシューマーに再配信されません。
デフォルトでは、Pulsar コンシューマーは、ブローカーからの確認応答の確認を待たずに確認応答を検証します。
|
4.6. 失敗処理ストラテジー
Pulsar メッセージから生成されたメッセージが 否定応答 された場合、失敗ストラテジーが適用されます。 Quarkus Pulsar エクステンションは、4 つのストラテジーをサポートしています。
-
nack
(デフォルト) は、ブローカーに 否定応答 を送信し、ブローカーがこのメッセージをコンシューマーに再配信するようにトリガーします。 否定応答は、negativeAckRedeliveryDelayMicros
プロパティーとnegativeAck.redeliveryBackoff
プロパティーを使用してさらに設定できます。 -
fail
の場合、アプリケーションが失敗します。メッセージはそれ以上処理されません。 -
ignore
の場合、失敗がログに記録されますが、確認応答ストラテジーが適用され、処理が続行されます。 -
continue
の場合、失敗がログに記録されますが、確認応答や否定応答を適用せずに処理が続行されます。このストラテジーは、確認応答タイムアウト 設定とともに使用できます。 -
reconsume-later
は、reconsumeLater
API を使用して、遅延して再消費されるメッセージを 再試行レタートピック に送信します。 遅延はreconsumeLater.delay
プロパティーを使用して設定でき、デフォルトは 3 秒です。 失敗メタデータにio.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata
のインスタンスを追加することで、メッセージごとにカスタムの遅延またはプロパティーを設定できます。
4.6.1. 確認応答タイムアウト
Pulsar クライアントは、否定応答と同様に、 確認応答タイムアウト メカニズムにより、指定された ackTimeout 期間にわたって、確認応答されていないメッセージを追跡し、ブローカーに 確認応答されていないメッセージの再配信リクエスト を送信します。これを受けて、ブローカーは確認応答されていないメッセージをコンシューマーに再送信します。
タイムアウトと再配信バックオフメカニズムを設定するには、ackTimeoutMillis
プロパティーと ackTimeout.redeliveryBackoff
プロパティーを設定します。
ackTimeout.redeliveryBackoff
には、最小遅延 (ミリ秒単位)、最大遅延 (ミリ秒単位)、および乗数をコンマ区切り値として指定できます。
mp.messaging.incoming.out.failure-strategy=continue
mp.messaging.incoming.out.ackTimeoutMillis=10000
mp.messaging.incoming.out.ackTimeout.redeliveryBackoff=1000,60000,2
4.6.2. 後の再消費と再試行レタートピック
再試行レタートピック は、正常に消費されなかったメッセージをデッドレタートピックにプッシュし、メッセージの消費を継続します。 デッドレタートピックは、確認応答タイムアウト、否定応答、再試行レタートピックなどのさまざまなメッセージ再配信方法で使用できることに注意してください。
mp.messaging.incoming.data.failure-strategy=reconsume-later
mp.messaging.incoming.data.reconsumeLater.delay=5000
mp.messaging.incoming.data.retryEnable=true
mp.messaging.incoming.data.negativeAck.redeliveryBackoff=1000,60000,2
4.6.3. デッドレタートピック
デッドレタートピック は、正常に消費されなかったメッセージをデッドレタートピックにプッシュし、メッセージの消費を継続します。 デッドレタートピックは、確認応答タイムアウト、否定応答、再試行レタートピックなどのさまざまなメッセージ再配信方法で使用できることに注意してください。
mp.messaging.incoming.data.failure-strategy=nack
mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount=2
mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic
mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName=my-dlq-subscription
mp.messaging.incoming.data.subscriptionType=Shared
否定応答 または 確認応答タイムアウト 再配信方法では、少なくとも未処理のメッセージを含むメッセージのバッチ全体が再配信されます。 詳細は、プロデューサーのバッチ処理 を参照してください。 |
4.7. Pulsar メッセージの一括受信
デフォルトでは、incoming メソッドは各 Pulsar メッセージを個別に受信します。
batchReceive=true
プロパティーを使用するか、コンシューマー設定で batchReceivePolicy
を指定することで、バッチモードを有効にすることができます。
@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<org.apache.pulsar.client.api.Messages<Double>> messages) {
for (org.apache.pulsar.client.api.Message<Double> msg : messages.getPayload()) {
String key = msg.getKey();
String topic = msg.getTopicName();
long timestamp = msg.getEventTime();
//... process messages
}
// ack will commit the latest offsets (per partition) of the batch.
return messages.ack();
}
@Incoming("prices")
public void consumeRecords(org.apache.pulsar.client.api.Messages<Double> messages) {
for (org.apache.pulsar.client.api.Message<Double> msg : messages) {
//... process messages
}
}
または、consume メソッドにペイロードのリストを直接受信することもできます。
@Incoming("prices")
public void consume(List<Double> prices) {
for (double price : prices) {
// process price
}
}
Quarkus は、着信チャネルのバッチタイプを自動検出し、バッチ設定を自動的に設定します。
|
5. Pulsar へのメッセージの送信
The Pulsar Connector can write Reactive Messaging `Message`s as Pulsar Message.
5.1. 例
Pulsar ブローカーが実行中で、pulsar:6650
アドレスを使用してアクセスできるとします。
次のように、prices
チャネルからのメッセージを Pulsar メッセージに書き込むようにアプリケーションを設定します。
mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650 (1)
-
Pulsar ブローカーサービスの URL を設定します。
Pulsar トピックやプロデューサー名を設定する必要はありません。
デフォルトでは、コネクターはチャネル名 ( |
次に、アプリケーションは、Message<Double>
を prices
チャネルに送信する必要があります。次のスニペットのように、double
ペイロードを使用できます。
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class PulsarPriceProducer {
private final Random random = new Random();
@Outgoing("prices-out")
public Multi<Double> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble());
}
}
generate メソッドは、Multi<Double>
を返します。これは、Flow.Publisher
インターフェイスを実装します。
このパブリッシャーは、メッセージを生成し、設定された Pulsar トピックに送信するために、フレームワークによって使用されます。
ペイロードを返す代わりに、io.smallrye.reactive.messaging.pulsar.OutgoingMessage
を返して、Pulsar メッセージを送信することもできます。
@Outgoing("out")
public Multi<OutgoingMessage<Double>> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> OutgoingMessage.of("my-key", random.nextDouble()));
}
ペイロードを org.eclipse.microprofile.reactive.messaging.Message
内にラップして、書き込まれたレコードをより詳細に制御できます。
@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> Message.of(random.nextDouble())
.addMetadata(PulsarOutgoingMessageMetadata.builder()
.withKey("my-key")
.withProperties(Map.of("property-key", "value"))
.build()));
}
Messages
を送信するときに io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata
のインスタンスを追加して、メッセージが Pulsar に書き込まれる方法に影響を与えることができます。
発信メソッドは、Flow.Publisher
を返すメソッドシグネチャーの他に、単一のメッセージを返すこともできます。
この場合、プロデューサーがこのメソッドをジェネレーターとして使用して無限のストリームを作成します。
@Outgoing("prices-out") T generate(); // T excluding void
@Outgoing("prices-out") Message<T> generate();
@Outgoing("prices-out") Uni<T> generate();
@Outgoing("prices-out") Uni<Message<T>> generate();
@Outgoing("prices-out") CompletionStage<T> generate();
@Outgoing("prices-out") CompletionStage<Message<T>> generate();
5.2. シリアライゼーションと Pulsar Schema
Pulsar コネクターを使用すると、基盤となる Pulsar プロデューサーのスキーマ設定を設定できます。 詳細は、Pulsar Schema 設定と自動スキーマ検出 を参照してください。
5.3. キー/値のペアの送信
キー/値のペアを Pulsar に送信するには、Pulsar プロデューサースキーマを KeyValue スキーマを使用して設定できます。
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.common.annotation.Identifier;
@ApplicationScoped
public class PulsarKeyValueExample {
@Identifier("out")
@Produces
Schema<KeyValue<String, Long>> schema = Schema.KeyValue(Schema.STRING, Schema.INT64);
@Incoming("in")
@Outgoing("out")
public KeyValue<String, Long> process(long in) {
return new KeyValue<>("my-key", in);
}
}
書き込まれたレコードをさらに制御する必要がある場合は、PulsarOutgoingMessageMetadata
を使用します。
5.4. 確認応答
Pulsar ブローカーは、プロデューサーからメッセージを受信すると、メッセージに MessageId
を割り当ててプロデューサーに送り返し、メッセージが公開されたことを確認します。
デフォルトでは、コネクターは Pulsar がレコードを確認応答するのを待ってから、処理 (受信した Message
の確認応答) を続行します。
これを無効にするには、waitForWriteCompletion
属性を false
に設定します。
レコードを書き込めない場合、メッセージは nacked
になります。
Pulsar クライアントは、失敗した場合に、 送信タイムアウト に達するまで、自動的にメッセージの送信を再試行します。
送信タイムアウト は、 |
5.5. バックプレッシャーとインフライトレコード
Pulsar 発信コネクターは、バックプレッシャーを処理し、Pulsar ブローカーへの書き込みを待機している保留メッセージの数を監視します。
保留メッセージの数は、maxPendingMessages
属性を使用して設定され、デフォルトは 1000 です。
コネクターは、その数のメッセージのみを同時に送信します。 少なくとも 1 つの保留メッセージがブローカーによって確認応答されるまで、他のメッセージは送信されません。 次に、ブローカーの保留メッセージの 1 つが確認応答されると、コネクターが Pulsar に新しいメッセージを書き込みます。
また、maxPendingMessages
を 0
に設定することで、保留メッセージの制限を解除することもできます。
Pulsar では、maxPendingMessagesAcrossPartitions
を使用して、パーティションごとの保留メッセージの数を設定することもできます。
5.6. プロデューサーのバッチ処理
デフォルトでは、Pulsar プロデューサーは個々のメッセージをまとめてバッチ処理し、ブローカーに公開します。
バッチ処理のパラメーターは、batchingMaxPublishDelayMicros
、batchingPartitionSwitchFrequencyByPublishDelay
、batchingMaxMessages
、batchingMaxBytes
設定プロパティーを使用して設定できます。batchingEnabled=false
で完全に無効にすることもできます。
Key_Shared
コンシューマーサブスクリプションを使用する場合、batcherBuilder
を BatcherBuilder.KEY_BASED
に設定できます。
6. Pulsar トランザクションと exactly-once 処理
Pulsar トランザクション を使用すると、イベントストリーミングアプリケーションが 1 つのアトミック操作でメッセージを消費、処理、生成できます。
トランザクションを使用すると、1 つまたは複数のプロデューサーが複数のトピックにメッセージのバッチを送信できるようになります。バッチ内のすべてのメッセージが、最終的にすべてのコンシューマーにみえるか、またはコンシューマーに一切みえなくなります。
使用するには、ブローカー設定で |
クライアント側でも、PulsarClient
設定でトランザクションのサポートを有効にする必要があります。
mp.messaging.outgoing.tx-producer.enableTransaction=true
Pulsar コネクターは、トランザクション内にレコードを書き込むための PulsarTransactions
カスタムエミッターを提供します。
これは、通常のエミッター @Channel
として使用できます。
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.OutgoingMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;
@ApplicationScoped
public class PulsarTransactionalProducer {
@Inject
@Channel("tx-out-example")
PulsarTransactions<OutgoingMessage<Integer>> txProducer;
@Inject
@Channel("other-producer")
PulsarTransactions<String> producer;
@Incoming("in")
public Uni<Void> emitInTransaction(Message<Integer> in) {
return txProducer.withTransaction(emitter -> {
emitter.send(OutgoingMessage.of("a", 1));
emitter.send(OutgoingMessage.of("b", 2));
emitter.send(OutgoingMessage.of("c", 3));
producer.send(emitter, "4");
producer.send(emitter, "5");
producer.send(emitter, "6");
return Uni.createFrom().completionStage(in::ack);
});
}
}
withTransaction
メソッドに渡される関数は、レコードを生成する TransactionalEmitter
を受け取り、トランザクションの結果を提供する Uni
を返します。
処理が正常に完了すると、プロデューサーがフラッシュされ、トランザクションがコミットされます。
処理中に例外が発生したり、失敗した Uni
が返されたり、TransactionalEmitter
が中止の対象としてマークされたりすると、トランザクションは中止されます。
複数のトランザクションプロデューサーが 1 つのトランザクションに参加できます。 これにより、すべてのメッセージが開始したトランザクションを使用して確実に送信されます。また、トランザクションがコミットされる前に、参加しているすべてのプロデューサーがフラッシュされます。 |
このメソッドが Vert.x コンテキストで呼び出されると、処理関数もそのコンテキストで呼び出されます。 それ以外の場合は、プロデューサーの送信スレッドで呼び出されます。
6.1. Exactly-Once 処理
Pulsar Transactions API を使用すると、生成されたメッセージとともに、トランザクション内のコンシューマーオフセットを管理することもできます。 これにより、コンシューマーとトランザクションプロデューサーを consume-transform-produce パターンでカップリングすることができます。これは exactly-once 処理としても知られています。 これは、アプリケーションがメッセージを消費および処理し、結果をトピックに公開して、消費されたメッセージのオフセットをトランザクションでコミットすることを意味します。
PulsarTransactions
エミッターは、トランザクション内の着信 Pulsar メッセージに exactly-once 処理を適用する方法も提供します。
次の例には、トランザクション内の Pulsar メッセージのバッチが含まれています。
mp.messaging.outgoing.tx-out-example.enableTransaction=true
# ...
mp.messaging.incoming.in-channel.enableTransaction=true
mp.messaging.incoming.in-channel.batchReceive=true
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.pulsar.client.api.Messages;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;
@ApplicationScoped
public class PulsarExactlyOnceProcessor {
@Inject
@Channel("tx-out-example")
PulsarTransactions<Integer> txProducer;
@Incoming("in-channel")
public Uni<Void> emitInTransaction(Message<Messages<Integer>> batch) {
return txProducer.withTransactionAndAck(batch, emitter -> {
for (org.apache.pulsar.client.api.Message<Integer> record : batch.getPayload()) {
emitter.send(PulsarMessage.of(record.getValue() + 1, record.getKey()));
}
return Uni.createFrom().voidItem();
});
}
}
処理が正常に完了すると、トランザクション内でメッセージが確認応答され、トランザクションがコミットされます。
exactly-once を使用する場合、累積的にではなく、個別にのみメッセージを確認応答できます。 |
処理を中止する必要がある場合、メッセージは否定応答されます。処理を再試行したり、フェイルストップしたりするために、いずれかの失敗ストラテジーを使用することができます。
トランザクションが失敗して中止された場合、withTransaction
から返される Uni
は失敗することに注意してください。
アプリケーションでエラーケースを処理することもできます。しかし、メッセージの消費を続行するには、@Incoming
メソッドから返される Uni
が失敗しないようにする必要があります。
PulsarTransactions#withTransactionAndAck
メソッドは、メッセージの確認応答と否定応答を行いますが、リアクティブストリームを停止しません。
失敗を無視すると、最後にコミットされたオフセットにコンシューマーがリセットされ、そこから処理が再開されます。
失敗発生時に重複を回避するために、ブローカー側でメッセージの重複排除とバッチインデックスレベルの確認応答を有効にすることを推奨します。
|
7. Pulsar Schema 設定と自動スキーマ検出
Pulsar メッセージは、ペイロードとともに非構造化バイト配列として保存されます。 Pulsar スキーマ は、構造化データを raw メッセージバイトにシリアライズする方法を定義します。 スキーマ は、強制的なデータ構造での書き込みと読み取りを行うために、プロデューサーとコンシューマーに適用されます。 スキーマは、データがトピックに公開される前にデータを raw バイトにシリアライズします。また、raw バイトがコンシューマーに配信される前に raw バイトをデシリアライズします。
Pulsar は、登録されたスキーマ情報を保存する中央リポジトリーとしてスキーマレジストリーを使用します。 これにより、プロデューサー/コンシューマーがブローカーを通じてトピックのメッセージのスキーマを調整できるようになります。 デフォルトでは、スキーマの保存に Apache BookKeeper が使用されます。
Pulsar コネクターでは、schema
プロパティーを使用してスキーマをプリミティブ型として指定できます。
mp.messaging.incoming.prices.connector=smallrye-pulsar
mp.messaging.incoming.prices.schema=INT32
mp.messaging.outgoing.prices-out.connector=smallrye-pulsar
mp.messaging.outgoing.prices-out.schema=DOUBLE
schema
プロパティーの値が スキーマタイプ と一致する場合、そのタイプを使用した単純なスキーマが作成され、該当するチャネルに使用されます。
Pulsar コネクターを使用すると、@Identifier
修飾子で識別される Schema
Bean を CDI を通じて提供することで、複雑なスキーマタイプを設定できます。
たとえば、次の Bean は JSON スキーマとキー/値スキーマを提供します。
package pulsar.configuration;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import io.smallrye.common.annotation.Identifier;
@ApplicationScoped
public class PulsarSchemaProvider {
@Produces
@Identifier("user-schema")
Schema<User> userSchema = Schema.JSON(User.class);
@Produces
@Identifier("a-channel")
Schema<KeyValue<Integer, User>> keyValueSchema() {
return Schema.KeyValue(Schema.INT32, Schema.JSON(User.class), KeyValueEncodingType.SEPARATED);
}
public static class User {
String name;
int age;
}
}
定義されたスキーマを使用して着信チャネル users
を設定するには、schema
プロパティーをスキーマ user-schema
の識別子に設定する必要があります。
mp.messaging.incoming.users.connector=smallrye-pulsar
mp.messaging.incoming.users.schema=user-schema
schema
プロパティーが見つからない場合、コネクターはチャネル名で識別される Schema
Bean を検索します。
たとえば、発信チャネル a-channel
はキー/値スキーマを使用します。
mp.messaging.outgoing.a-channel.connector=smallrye-pulsar
スキーマ情報が提供されていない場合、着信チャネルは Schema.AUTO_CONSUME()
を使用します。一方、発信チャネルは Schema.AUTO_PRODUCE_BYTES()
スキーマを使用します。
7.1. 自動スキーマ検出
Quarkus Messaging Pulsar (io.quarkus:quarkus-messaging-pulsar
) を使用すると、Quarkus は多くの場合、設定する正しい Pulsar Schema を自動的に検出できます。
この自動検出は、@Incoming
メソッドと @Outgoing
メソッドの宣言、および注入された @Channel
に基づいています。
たとえば、以下のように宣言した場合
@Outgoing("generated-price")
public Multi<Integer> generate() {
...
}
このとき、設定において generated-price
チャネルで smallrye-pulsar
コネクターを使用するよう指定されていると、Quarkus は generated-price
チャネルの schema
属性を Pulsar Schema INT32
に自動的に設定します。
同様に、以下を宣言した場合
@Incoming("my-pulsar-consumer")
public void consume(org.apache.pulsar.api.client.Message<byte[]> record) {
...
}
設定において my-pulsar-consumer
チャネルで smallrye-pulsar
コネクターを使用するよう指定されていると、Quarkus は schema
属性を Pulsar BYTES
Schema に自動的に設定します。
最後に、以下を宣言した場合
@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;
設定において price-create
チャネルが smallrye-pulsar
コネクターを使用するよう指定されていると、Quarkus は schema
を Pulsar INT64
Schema に自動的に設定します。
Pulsar Schema の自動検出でサポートされているすべてのタイプは、次のとおりです。
-
short
およびjava.lang.Short
-
int
およびjava.lang.Integer
-
long
およびjava.lang.Long
-
float
およびjava.lang.Float
-
double
およびjava.lang.Double
-
byte[]
-
java.time.Instant
-
java.sql.Timestamp
-
java.time.LocalDate
-
java.time.LocalTime
-
java.time.LocalDateTime
-
java.nio.ByteBuffer
-
Avro スキーマから生成されたクラスと Avro
GenericRecord
は、AVRO
スキーマタイプで設定されます。 -
Protobuf スキーマから生成されたクラスは、
PROTOBUF
スキーマタイプで設定されます。 -
その他のクラスは、自動的に
JSON
スキーマタイプで設定されます。
|
Pulsar が提供するこれらのスキーマに加えて、Quarkus は 検証を適用することなく 次のスキーマ実装を提供します。
-
io.vertx.core.buffer.Buffer
は、io.quarkus.pulsar.schema.BufferSchema
スキーマで設定されます。 -
io.vertx.core.json.JsonObject
は、io.quarkus.pulsar.schema.JsonObjectSchema
スキーマで設定されます。 -
io.vertx.core.json.JsonArray
は、io.quarkus.pulsar.schema.JsonArraySchema
スキーマで設定されます。 -
スキーマレス JSON シリアライゼーションでは、
schema
設定がObjectMapper<fully_qualified_name_of_the_bean>
に設定されている場合、JacksonObjectMapper
を使用してスキーマが生成されます。このとき、Pulsar Schema 検証は適用されません。io.quarkus.pulsar.schema.ObjectMapperSchema
を使用すると、検証なしで JSON スキーマを明示的に設定できます。
schema
が設定されている場合、自動検出によって置き換えられることはありません。
シリアライザーの自動検出に問題がある場合は、quarkus.messaging.pulsar.serializer-autodetection.enabled=false
を設定することで、これを完全にオフにすることができます。
オフにする必要があると認識した場合は、Quarkus issue tracker にバグを報告していただければ、どのような問題でも修正することができます。
8. Dev Services for Pulsar
With Quarkus Messaging Pulsar extension (quarkus-messaging-pulsar
)
Dev Services for Pulsar automatically starts a Pulsar broker in dev mode and when running tests.
So, you don’t have to start a broker manually.
The application is configured automatically.
8.1. Enabling / Disabling Dev Services for Pulsar
Dev Services for Pulsar is automatically enabled unless:
-
quarkus.pulsar.devservices.enabled
is set tofalse
-
the
pulsar.client.serviceUrl
is configured -
all the Reactive Messaging Pulsar channels have the
serviceUrl
attribute set
Dev Services for Pulsar relies on Docker to start the broker.
If your environment does not support Docker, you will need to start the broker manually, or connect to an already running broker.
You can configure the broker address using pulsar.client.
.
8.2. 共有ブローカー
Most of the time you need to share the broker between applications. Dev Services for Pulsar implements a service discovery mechanism for your multiple Quarkus applications running in dev mode to share a single broker.
Dev Services for Pulsar starts the container with the quarkus-dev-service-pulsar label which is used to identify the container.
|
If you need multiple (shared) brokers, you can configure the quarkus.pulsar.devservices.service-name
attribute and indicate the broker name.
It looks for a container with the same value, or starts a new one if none can be found.
The default service name is pulsar
.
Sharing is enabled by default in dev mode, but disabled in test mode.
You can disable the sharing with quarkus.pulsar.devservices.shared=false
.
8.3. ポートの設定
By default, Dev Services for Pulsar picks a random port and configures the application.
You can set the port by configuring the quarkus.pulsar.devservices.port
property.
Note that the Pulsar advertised address is automatically configured with the chosen port.
8.4. イメージの設定
Dev Services for Pulsar supports the official Apache Pulsar image.
A custom image name can be configured as such:
quarkus.pulsar.devservices.image-name=datastax/lunastreaming-all:2.10_4.7
8.5. Configuring the Pulsar broker
You can configure the Dev Services for Pulsar with custom broker configuration.
The following example enables transaction support:
quarkus.pulsar.devservices.broker-config.transaction-coordinator-enabled=true
quarkus.pulsar.devservices.broker-config.system-topic-enabled=true
9. Pulsar クライアントの設定
Pulsar のクライアント、コンシューマー、プロデューサーは、Pulsar クライアントアプリケーションの動作を設定するために、細かくカスタマイズすることが可能です。
Pulsar コネクターは、Pulsar クライアントと、チャネルごとのコンシューマーまたはプロデューサーを作成します。それぞれに実用的なデフォルトが設定されているため、設定が容易です。 作成は処理されますが、利用できる設定オプションは、すべて Pulsar チャネルを通じて設定可能です。
通常、PulsarClient
、PulsarConsumer
、`PulsarProducer`を作成するには、ビルダー API を使用します。ビルダー API は、その性質上、実装に渡す設定オブジェクトを毎回ビルドします。
該当する設定オブジェクトは、
ClientConfigurationData、
ConsumerConfigurationData、
および ProducerConfigurationData です。
Pulsar コネクターを使用すると、これらの設定オブジェクトのプロパティーを直接受信できます。
たとえば、PulsarClient
のブローカー認証情報は、authPluginClassName
および authParams
プロパティーを使用して受信されます。
受信チャネル data
の認証を設定するには、以下のように指定します。
mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://localhost:6650
mp.messaging.incoming.data.topic=topic
mp.messaging.incoming.data.subscriptionInitialPosition=Earliest
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"}
Pulsar コンシューマーのプロパティー subscriptionInitialPosition
も、列挙値 SubscriptionInitialPosition.Earliest
として表される Earliest
値で設定されていることに注意してください。
この方法は、ほとんどの設定ケースに対応できます。
ただし、CryptoKeyReader
、ServiceUrlProvider
などのシリアライズできないオブジェクトは、この方法では設定できません。
Pulsar コネクターを使用すると、Pulsar 設定データオブジェクト ClientConfigurationData
、ConsumerConfigurationData
、ProducerConfigurationData
のインスタンスを考慮できます。
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
class PulsarConfig {
@Produces
@Identifier("my-consumer-options")
public ConsumerConfigurationData<String> getConsumerConfig() {
ConsumerConfigurationData<String> data = new ConsumerConfigurationData<>();
data.setAckReceiptEnabled(true);
data.setCryptoKeyReader(DefaultCryptoKeyReader.builder()
//...
.build());
return data;
}
}
このインスタンスは、コネクターが使用するクライアントを設定するために取得および使用されます。
client-configuration
、consumer-configuration
、または producer-configuration
属性を使用してクライアントの名前を指定する必要があります。
mp.messaging.incoming.prices.consumer-configuration=my-consumer-options
[client|consumer|producer]-configuration
が設定されていない場合、コネクターはチャネル名で識別されるインスタンスを検索します。
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
class PulsarConfig {
@Produces
@Identifier("prices")
public ClientConfigurationData getClientConfig() {
ClientConfigurationData data = new ClientConfigurationData();
data.setEnableTransaction(true);
data.setServiceUrlProvider(AutoClusterFailover.builder()
// ...
.build());
return data;
}
}
また、キーごとの設定値を含む Map<String, Object>
を指定することもできます。
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import java.util.Map;
class PulsarConfig {
@Produces
@Identifier("prices")
public Map<String, Object> getProducerConfig() {
return Map.of(
"batcherBuilder", BatcherBuilder.KEY_BASED,
"sendTimeoutMs", 3000,
"customMessageRouter", new PartialRoundRobinMessageRouterImpl(4));
}
}
さまざまな設定ソースは、重要度の最も低いものから最も高いものの順に、次の優先順序で読み込まれます。
-
デフォルトの設定識別子 (
default-pulsar-client
、default-pulsar-consumer
、default-pulsar-producer
) を使用して生成されたMap<String, Object>
設定マップ -
設定内またはチャネル名内の識別子を使用して生成された
Map<String, Object>
設定マップ -
チャネル設定内またはチャネル名内の識別子を使用して生成された
[Client|Producer|Consuemr]ConfigurationData
オブジェクト -
[Client|Producer|Consuemr]ConfigurationData
フィールド名を使用して名前が付けられたチャネル設定プロパティー
設定オプションの完全なリストについては、設定リファレンス を参照してください。
9.1. Pulsar 認証の設定
Pulsar は、プラグ可能な認証フレームワークを提供します。Pulsar ブローカー/プロキシーは、このメカニズムを使用してクライアントを認証します。
クライアントは、authPluginClassName
および authParams
属性を使用して application.properties
ファイルで設定できます。
pulsar.client.serviceUrl=pulsar://pulsar:6650
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
pulsar.client.authParams={"userId":"superuser","password":"admin"}
または、プログラムで設定することもできます。
import java.util.Map;
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
class PulsarConfig {
@Produces
@Identifier("prices")
public ClientConfigurationData config() {
var data = new ClientConfigurationData();
var auth = new AuthenticationBasic();
auth.configure(Map.of("userId", "superuser", "password", "admin"));
data.setAuthentication(auth);
return data;
}
}
9.1.1. Configuring authentication to Pulsar using mTLS
Pulsar Messaging extension integrates with the Quarkus TLS registry to authenticate clients using mTLS.
To configure the mTLS for a Pulsar channel, you need to provide a named TLS configuration in the application.properties
:
quarkus.tls.my-tls-config.trust-store.p12.path=target/certs/pulsar-client-truststore.p12
quarkus.tls.my-tls-config.trust-store.p12.password=secret
quarkus.tls.my-tls-config.key-store.p12.path=target/certs/pulsar-client-keystore.p12
quarkus.tls.my-tls-config.key-store.p12.password=secret
mp.messaging.incoming.prices.tls-configuration-name=my-tls-config
9.1.2. Datastax Luna Streaming へのアクセスの設定
Luna Streaming は、DataStax のツールとサポートを備えた、Apache Pulsar の製品版ディストリビューションです。 DataStax Luna Pulsar テナントを作成したら、自動生成されたトークンをメモし、トークン認証を設定します。
pulsar.client.serviceUrl=pulsar+ssl://pulsar-aws-eucentral1.streaming.datastax.com:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
pulsar.client.authParams=token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2ODY4MTc4MzQsImlzcyI6ImRhdGFzdGF4Iiwic3ViIjoiY2xpZW50OzA3NGZhOTI4LThiODktNDBhNC04MDEzLWNlNjVkN2JmZWIwZTtjSEpwWTJWejsyMDI5ODdlOGUyIiwidG9rZW5pZCI6IjIwMjk4N2U4ZTIifQ....
必ず事前にトピックを作成するか、namespace 設定で 自動トピック作成 を有効にしてください。
トピック設定では、トピックの完全な名前を参照する必要があることに注意してください。
mp.messaging.incoming.prices.topic=persistent://my-tenant/default/prices
9.1.3. StreamNative Cloud へのアクセスの設定
StreamNative Cloud は、フルマネージドの Pulsar-as-a-Service です。完全ホスト型、パブリッククラウド上で StreamNative によって管理する方式、Kubernetes 上で自己管理する方式など、さまざまなデプロイ方法で利用できます。
StreamNative Pulsar クラスターは Oauth2 認証を使用します。そのため、アプリケーションが使用している Pulsar namespace/トピックへの権限 を持つ サービスアカウント が存在することを確認する必要があります。
次に、サービスアカウントの キーファイル (秘密鍵 として機能します) をダウンロードし、発行者 URL (通常は https://auth.streamnative.cloud/
) とクラスターの オーディエンス (urn:sn:pulsar:o-rf3ol:redhat
など) をメモする必要があります。
この手順を行う際には、StreamNative Cloud コンソールの Admin セクションにある Pulsar Clients ページが役立ちます。
Pulsar Oauth2 認証を使用してアプリケーションを設定するには、以下のように指定します。
pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
pulsar.client.authParams={"type":"client_credentials","privateKey":"data:application/json;base64,<base64-encoded value>","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:o-rfwel:redhat"}
pulsar.client.authParams
設定には、issuerUrl
、audience
、および data:application/json;base64,<base64-encoded-key-file>
という形式の privateKey
を含む Json 文字列が含まれていることに注意してください。
または、プログラムで認証を設定することもできます。
package org.acme.pulsar;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
@ApplicationScoped
public class PulsarAuth {
@ConfigProperty(name = "pulsar.issuerUrl")
String issuerUrl;
@ConfigProperty(name = "pulsar.credentials")
String credentials;
@ConfigProperty(name = "pulsar.audience")
String audience;
@Produces
@Identifier("pulsar-auth")
public ClientConfigurationData pulsarClientConfig() throws MalformedURLException {
var data = new ClientConfigurationData();
data.setAuthentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), PulsarAuth.class.getResource(credentials), audience));
return data;
}
}
これは、キーファイルがリソースとしてアプリケーションクラスパスに含まれていることを前提としています。この場合、設定は次のようになります。
mp.messaging.incoming.prices.client-configuration=pulsar-auth
pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.issuerUrl=https://auth.streamnative.cloud/
pulsar.audience=urn:sn:pulsar:o-rfwel:redhat
pulsar.credentials=/o-rfwel-quarkus-app.json
pulsar-auth
で識別されるクライアント設定を使用するチャネルには、client-configuration
属性を設定する必要があることに注意してください。
10. ヘルスチェック
Quarkus エクステンションは、Pulsar コネクターによって管理される各チャネルの起動、準備状況、および稼働状況を報告します。 ヘルスチェックは、Pulsar クライアントに依存してブローカーとの接続が確立されていることを確認します。
着信チャネルと発信チャネルの Startup および Readiness プローブは、ブローカーとの接続が確立されると、OK を報告します。
着信チャネルと発信チャネルの Liveness プローブは、ブローカーとの接続が確立され、かつ 失敗が発見されなかった場合に、OK を報告します。
メッセージ処理が失敗すると、メッセージが 否定応答 されることに注意してください。その場合、メッセージは失敗ストラテジーによって処理されます。失敗ストラテジーは、失敗を報告し、liveness チェックの結果に影響を与えます。失敗ストラテジー `fail`は、失敗を報告するため、liveness チェックも失敗を報告します。
11. 設定リファレンス
以下は、Pulsar コネクターのチャネル、コンシューマー、プロデューサー、およびクライアントの設定属性のリストです。 Pulsar クライアントの設定方法の詳細は、Pulsar クライアントの設定 を参照してください。
11.1. 着信チャネル設定 (Pulsar からの受信)
以下の属性は以下のように設定します:
mp.messaging.incoming.your-channel-name.attribute=value
属性 (alias) | 説明 | タイプ | 必須 | デフォルト |
---|---|---|---|---|
ack-strategy |
Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be |
string |
false |
|
ackTimeout.redeliveryBackoff |
Comma separated values for configuring ack timeout MultiplierRedeliveryBackoff, min delay, max delay, multiplier. |
string |
false |
|
batchReceive |
Whether batch receive is used to consume messages |
boolean |
false |
|
client-configuration |
Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. |
string |
false |
|
consumer-configuration |
Identifier of a CDI bean that provides the default Pulsar consumer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. |
string |
false |
|
deadLetterPolicy.deadLetterTopic |
Name of the dead letter topic where the failing messages will be sent |
string |
false |
|
deadLetterPolicy.initialSubscriptionName |
Name of the initial subscription name of the dead letter topic |
string |
false |
|
deadLetterPolicy.maxRedeliverCount |
Maximum number of times that a message will be redelivered before being sent to the dead letter topic |
int |
false |
|
deadLetterPolicy.retryLetterTopic |
Name of the retry topic where the failing messages will be sent |
string |
false |
|
failure-strategy |
Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be |
string |
false |
|
health-enabled |
Whether health reporting is enabled (default) or disabled |
boolean |
false |
|
negativeAck.redeliveryBackoff |
Comma separated values for configuring negative ack MultiplierRedeliveryBackoff, min delay, max delay, multiplier. |
string |
false |
|
reconsumeLater.delay |
Default delay for reconsume failure-strategy, in seconds |
long |
false |
|
schema |
The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed |
string |
false |
|
serviceUrl |
The service URL for the Pulsar service |
string |
false |
|
topic |
The consumed / populated Pulsar topic. If not set, the channel name is used |
string |
false |
|
tracing-enabled |
Whether tracing is enabled (default) or disabled |
boolean |
false |
|
基盤となる Pulsar コンシューマーがサポートするプロパティーを設定することもできます。
これらのプロパティーは、pulsar.consumer
接頭辞を使用してグローバルに設定することもできます。
pulsar.consumer.subscriptionInitialPosition=Earliest
属性 | 説明 | タイプ | 設定ファイル | デフォルト |
---|---|---|---|---|
topicNames |
トピック名 |
Set |
true |
[] |
topicsPattern |
Topic pattern |
Pattern |
true |
|
subscriptionName |
Subscription name |
文字列 |
true |
|
subscriptionType |
Subscription type. |
SubscriptionType |
true |
Exclusive |
subscriptionProperties |
Map |
true |
||
subscriptionMode |
SubscriptionMode |
true |
Durable |
|
messageListener |
MessageListener |
false |
||
consumerEventListener |
ConsumerEventListener |
false |
||
negativeAckRedeliveryBackoff |
Interface for custom message is negativeAcked policy. You can specify |
RedeliveryBackoff |
false |
|
ackTimeoutRedeliveryBackoff |
Interface for custom message is ackTimeout policy. You can specify |
RedeliveryBackoff |
false |
|
receiverQueueSize |
Size of a consumer’s receiver queue. |
int |
true |
1000 |
acknowledgementsGroupTimeMicros |
Group a consumer acknowledgment for a specified time. |
long |
true |
100000 |
maxAcknowledgmentGroupSize |
Group a consumer acknowledgment for the number of messages. |
int |
true |
1000 |
negativeAckRedeliveryDelayMicros |
Delay to wait before redelivering messages that failed to be processed. |
long |
true |
60000000 |
maxTotalReceiverQueueSizeAcrossPartitions |
The max total receiver queue size across partitions. |
int |
true |
50000 |
consumerName |
Consumer name |
文字列 |
true |
|
ackTimeoutMillis |
Timeout of unacked messages |
long |
true |
0 |
tickDurationMillis |
Granularity of the ack-timeout redelivery. |
long |
true |
1000 |
priorityLevel |
Priority level for a consumer to which a broker gives more priority while dispatching messages in Shared subscription type. Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4. |
int |
true |
0 |
maxPendingChunkedMessage |
The maximum size of a queue holding pending chunked messages. When the threshold is reached, the consumer drops pending messages to optimize memory utilization. |
int |
true |
10 |
autoAckOldestChunkedMessageOnQueueFull |
Whether to automatically acknowledge pending chunked messages when the threshold of |
boolean |
true |
false |
expireTimeOfIncompleteChunkedMessageMillis |
The time interval to expire incomplete chunks if a consumer fails to receive all the chunks in the specified time period. The default value is 1 minute. |
long |
true |
60000 |
cryptoKeyReader |
CryptoKeyReader |
false |
||
messageCrypto |
MessageCrypto |
false |
||
cryptoFailureAction |
Consumer should take action when it receives a message that can not be decrypted. The decompression of message fails. If messages contain batch messages, a client is not be able to retrieve individual messages in batch. Delivered encrypted message contains |
ConsumerCryptoFailureAction |
true |
FAIL |
properties |
A name or value property of this consumer.
When getting a topic stats, associate this metadata with the consumer stats for easier identification. |
SortedMap |
true |
{} |
readCompacted |
If enabling A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal. Only enabling Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a |
boolean |
true |
false |
subscriptionInitialPosition |
Initial position at which to set cursor when subscribing to a topic at first time. |
SubscriptionInitialPosition |
true |
Latest |
patternAutoDiscoveryPeriod |
Topic auto discovery period when using a pattern for topic’s consumer. The default and minimum value is 1 minute. |
int |
true |
60 |
regexSubscriptionMode |
When subscribing to a topic using a regular expression, you can pick a certain type of topics. * PersistentOnly: only subscribe to persistent topics. |
RegexSubscriptionMode |
true |
PersistentOnly |
deadLetterPolicy |
Dead letter policy for consumers. By default, some messages are probably redelivered many times, even to the extent that it never stops. By using the dead letter mechanism, messages have the max redelivery count. When exceeding the maximum number of redeliveries, messages are sent to the Dead Letter Topic and acknowledged automatically. You can enable the dead letter mechanism by setting When specifying the dead letter policy while not specifying |
DeadLetterPolicy |
true |
|
retryEnable |
boolean |
true |
false |
|
batchReceivePolicy |
BatchReceivePolicy |
false |
||
autoUpdatePartitions |
If Note: this is only for partitioned consumers. |
boolean |
true |
true |
autoUpdatePartitionsIntervalSeconds |
long |
true |
60 |
|
replicateSubscriptionState |
If |
boolean |
true |
false |
resetIncludeHead |
boolean |
true |
false |
|
keySharedPolicy |
KeySharedPolicy |
false |
||
batchIndexAckEnabled |
boolean |
true |
false |
|
ackReceiptEnabled |
boolean |
true |
false |
|
poolMessages |
boolean |
true |
false |
|
payloadProcessor |
MessagePayloadProcessor |
false |
||
startPaused |
boolean |
true |
false |
|
autoScaledReceiverQueueSizeEnabled |
boolean |
true |
false |
|
topicConfigurations |
List |
true |
[] |
11.2. 発信チャネルの設定 (Pulsar への公開)
属性 (alias) | 説明 | タイプ | 必須 | デフォルト |
---|---|---|---|---|
client-configuration |
Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. |
string |
false |
|
health-enabled |
Whether health reporting is enabled (default) or disabled |
boolean |
false |
|
maxPendingMessages |
The maximum size of a queue holding pending messages, i.e messages waiting to receive an acknowledgment from a broker |
int |
false |
|
producer-configuration |
Identifier of a CDI bean that provides the default Pulsar producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. |
string |
false |
|
schema |
The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed |
string |
false |
|
serviceUrl |
The service URL for the Pulsar service |
string |
false |
|
topic |
The consumed / populated Pulsar topic. If not set, the channel name is used |
string |
false |
|
tracing-enabled |
Whether tracing is enabled (default) or disabled |
boolean |
false |
|
waitForWriteCompletion |
Whether the client waits for the broker to acknowledge the written record before acknowledging the message |
boolean |
false |
|
基盤となる Pulsar プロデューサーがサポートするプロパティーを設定することもできます。
これらのプロパティーは、pulsar.producer
接頭辞を使用してグローバルに設定することもできます。
pulsar.producer.batchingEnabled=false
属性 | 説明 | タイプ | 設定ファイル | デフォルト |
---|---|---|---|---|
topicName |
トピック名 |
文字列 |
true |
|
producerName |
Producer name |
文字列 |
true |
|
sendTimeoutMs |
Message send timeout in ms. |
long |
true |
30000 |
blockIfQueueFull |
If it is set to The |
boolean |
true |
false |
maxPendingMessages |
The maximum size of a queue holding pending messages. For example, a message waiting to receive an acknowledgment from a broker. By default, when the queue is full, all calls to the |
int |
true |
0 |
maxPendingMessagesAcrossPartitions |
The maximum number of pending messages across partitions. Use the setting to lower the max pending messages for each partition ( |
int |
true |
0 |
messageRoutingMode |
Message routing logic for producers on partitioned topics. |
MessageRoutingMode |
true |
|
hashingScheme |
Hashing function determining the partition where you publish a particular message (partitioned topics only). |
HashingScheme |
true |
JavaStringHash |
cryptoFailureAction |
Producer should take action when encryption fails. |
ProducerCryptoFailureAction |
true |
FAIL |
customMessageRouter |
MessageRouter |
false |
||
batchingMaxPublishDelayMicros |
Batching time period of sending messages. |
long |
true |
1000 |
batchingPartitionSwitchFrequencyByPublishDelay |
int |
true |
10 |
|
batchingMaxMessages |
The maximum number of messages permitted in a batch. |
int |
true |
1000 |
batchingMaxBytes |
int |
true |
131072 |
|
batchingEnabled |
Enable batching of messages. |
boolean |
true |
true |
batcherBuilder |
BatcherBuilder |
false |
||
chunkingEnabled |
Enable chunking of messages. |
boolean |
true |
false |
chunkMaxMessageSize |
int |
true |
-1 |
|
cryptoKeyReader |
CryptoKeyReader |
false |
||
messageCrypto |
MessageCrypto |
false |
||
encryptionKeys |
Set |
true |
[] |
|
compressionType |
Message data compression type used by a producer. |
CompressionType |
true |
NONE |
initialSequenceId |
Long |
true |
||
autoUpdatePartitions |
boolean |
true |
true |
|
autoUpdatePartitionsIntervalSeconds |
long |
true |
60 |
|
multiSchema |
boolean |
true |
true |
|
accessMode |
ProducerAccessMode |
true |
Shared |
|
lazyStartPartitionedProducers |
boolean |
true |
false |
|
properties |
SortedMap |
true |
{} |
|
initialSubscriptionName |
Use this configuration to automatically create an initial subscription when creating a topic. If this field is not set, the initial subscription is not created. |
文字列 |
true |
11.3. Pulsar クライアントの設定
以下は、基盤となる PulsarClient
の設定リファレンスです。
これらのオプションは、チャネル属性を使用して設定できます。
mp.messaging.incoming.your-channel-name.numIoThreads=4
または、pulsar.client
接頭辞を使用してグローバルに設定します。
pulsar.client.serviceUrl=pulsar://pulsar:6650
属性 | 説明 | タイプ | 設定ファイル | デフォルト |
---|---|---|---|---|
serviceUrl |
Pulsar cluster HTTP URL to connect to a broker. |
文字列 |
true |
|
serviceUrlProvider |
The implementation class of ServiceUrlProvider used to generate ServiceUrl. |
ServiceUrlProvider |
false |
|
authentication |
Authentication settings of the client. |
認証 |
false |
|
authPluginClassName |
Class name of authentication plugin of the client. |
文字列 |
true |
|
authParams |
Authentication parameter of the client. |
文字列 |
true |
|
authParamMap |
Authentication map of the client. |
Map |
true |
|
operationTimeoutMs |
Client operation timeout (in milliseconds). |
long |
true |
30000 |
lookupTimeoutMs |
Client lookup timeout (in milliseconds). |
long |
true |
-1 |
statsIntervalSeconds |
Interval to print client stats (in seconds). |
long |
true |
60 |
numIoThreads |
IOスレッド数。 |
int |
true |
10 |
numListenerThreads |
Number of consumer listener threads. |
int |
true |
10 |
connectionsPerBroker |
Number of connections established between the client and each Broker. A value of 0 means to disable connection pooling. |
int |
true |
1 |
connectionMaxIdleSeconds |
Release the connection if it is not used for more than [connectionMaxIdleSeconds] seconds. If [connectionMaxIdleSeconds] < 0, disabled the feature that auto release the idle connections |
int |
true |
180 |
useTcpNoDelay |
Whether to use TCP NoDelay option. |
boolean |
true |
true |
useTls |
Whether to use TLS. |
boolean |
true |
false |
tlsKeyFilePath |
Path to the TLS key file. |
文字列 |
true |
|
tlsCertificateFilePath |
Path to the TLS certificate file. |
文字列 |
true |
|
tlsTrustCertsFilePath |
Path to the trusted TLS certificate file. |
文字列 |
true |
|
tlsAllowInsecureConnection |
Whether the client accepts untrusted TLS certificates from the broker. |
boolean |
true |
false |
tlsHostnameVerificationEnable |
Whether the hostname is validated when the client creates a TLS connection with brokers. |
boolean |
true |
false |
concurrentLookupRequest |
The number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum prevents overloading a broker. |
int |
true |
5000 |
maxLookupRequest |
Maximum number of lookup requests allowed on each broker connection to prevent overloading a broker. |
int |
true |
50000 |
maxLookupRedirects |
Maximum times of redirected lookup requests. |
int |
true |
20 |
maxNumberOfRejectedRequestPerConnection |
Maximum number of rejected requests of a broker in a certain time frame (60 seconds) after the current connection is closed and the client creating a new connection to connect to a different broker. |
int |
true |
50 |
keepAliveIntervalSeconds |
Seconds of keeping alive interval for each client broker connection. |
int |
true |
30 |
connectionTimeoutMs |
Duration of waiting for a connection to a broker to be established.If the duration passes without a response from a broker, the connection attempt is dropped. |
int |
true |
10000 |
requestTimeoutMs |
Maximum duration for completing a request. |
int |
true |
60000 |
readTimeoutMs |
Maximum read time of a request. |
int |
true |
60000 |
autoCertRefreshSeconds |
Seconds of auto refreshing certificate. |
int |
true |
300 |
initialBackoffIntervalNanos |
Initial backoff interval (in nanosecond). |
long |
true |
100000000 |
maxBackoffIntervalNanos |
Max backoff interval (in nanosecond). |
long |
true |
60000000000 |
enableBusyWait |
Whether to enable BusyWait for EpollEventLoopGroup. |
boolean |
true |
false |
listenerName |
Listener name for lookup. Clients can use listenerName to choose one of the listeners as the service URL to create a connection to the broker as long as the network is accessible."advertisedListeners" must enabled in broker side. |
文字列 |
true |
|
useKeyStoreTls |
Set TLS using KeyStore way. |
boolean |
true |
false |
sslProvider |
The TLS provider used by an internal client to authenticate with other Pulsar brokers. |
文字列 |
true |
|
tlsKeyStoreType |
TLS KeyStore type configuration. |
文字列 |
true |
JKS |
tlsKeyStorePath |
Path of TLS KeyStore. |
文字列 |
true |
|
tlsKeyStorePassword |
Password of TLS KeyStore. |
文字列 |
true |
|
tlsTrustStoreType |
TLS TrustStore type configuration. You need to set this configuration when client authentication is required. |
文字列 |
true |
JKS |
tlsTrustStorePath |
Path of TLS TrustStore. |
文字列 |
true |
|
tlsTrustStorePassword |
Password of TLS TrustStore. |
文字列 |
true |
|
tlsCiphers |
Set of TLS Ciphers. |
Set |
true |
[] |
tlsProtocols |
Protocols of TLS. |
Set |
true |
[] |
memoryLimitBytes |
Limit of client memory usage (in byte). The 64M default can guarantee a high producer throughput. |
long |
true |
67108864 |
proxyServiceUrl |
URL of proxy service. proxyServiceUrl and proxyProtocol must be mutually inclusive. |
文字列 |
true |
|
proxyProtocol |
Protocol of proxy service. proxyServiceUrl and proxyProtocol must be mutually inclusive. |
ProxyProtocol |
true |
|
enableTransaction |
Whether to enable transaction. |
boolean |
true |
false |
clock |
Clock |
false |
||
dnsLookupBindAddress |
The Pulsar client dns lookup bind address, default behavior is bind on 0.0.0.0 |
文字列 |
true |
|
dnsLookupBindPort |
The Pulsar client dns lookup bind port, takes effect when dnsLookupBindAddress is configured, default value is 0. |
int |
true |
0 |
socks5ProxyAddress |
Address of SOCKS5 proxy. |
InetSocketAddress |
true |
|
socks5ProxyUsername |
User name of SOCKS5 proxy. |
文字列 |
true |
|
socks5ProxyPassword |
Password of SOCKS5 proxy. |
文字列 |
true |
|
description |
The extra description of the client version. The length cannot exceed 64. |
文字列 |
true |
設定ファイルで設定できない (シリアライズできない) 設定プロパティーは、 |
12. さらに詳しく
このガイドでは、Quarkus を使用して Pulsar と対話する方法を説明しました。 Quarkus は、Quarkus Messaging を利用してデータストリーミングアプリケーションを構築します。
さらに詳しく知りたい場合は、Quarkusで使用されている実装、 SmallRye Reactive Messaging のドキュメントを確認してください。