Apache Pulsar リファレンスガイド
このリファレンスガイドでは、Quarkus アプリケーションが Quarkus Messaging を利用して Apache Pulsar と対話する仕組みを説明します。
1. はじめに
Apache Pulsar は、クラウド向けに構築されたオープンソースの分散メッセージングおよびストリーミングプラットフォームです。 階層型ストレージ機能により、マルチテナントの高性能ソリューションをサーバーメッセージングに提供します。
Pulsar はパブリッシュ - サブスクライブパターンを実装します。
-
プロデューサーは トピック にメッセージを公開します。
-
コンシューマーは、そのトピックへの サブスクリプション を作成し、着信メッセージを受信して処理します。処理が完了すると、ブローカーに 完了通知 を送信します。
-
サブスクリプションが作成されると、コンシューマーが切断されても、Pulsar はすべてのメッセージを保持します。 保持されたメッセージは、これらのメッセージがすべて正常に処理されたことをコンシューマーが確認した場合にのみ破棄されます。
Pulsar クラスターは、以下の要素で構成されます。
-
ステートレスコンポーネントである 1 つ以上のブローカー。
-
トピックのメタデータ、スキーマ、調整、クラスター設定を維持するための メタデータストア。
-
メッセージの永続的な保存に使用される bookies のセット。
2. Apache Pulsar のための Quarkus エクステンション
Quarkus は、 SmallRye Reactive Messaging フレームワークを通じて Apache Pulsar のサポートを提供します。 Eclipse MicroProfile Reactive Messaging 仕様 3.0 に基づいて、CDI とイベント駆動型を橋渡しする柔軟なプログラミングモデルを提案します。
このガイドでは、Apache Pulsar と SmallRye Reactive Messaging フレームワークについて詳しく説明します。 クイックスタートについては、Apache Pulsar を使用した Quarkus Messaging の概要 を参照してください。 |
プロジェクトのベースディレクトリーで次のコマンドを実行すると、 messaging-pulsar
エクステンションをプロジェクトに追加できます。
quarkus extension add messaging-pulsar
./mvnw quarkus:add-extension -Dextensions='messaging-pulsar'
./gradlew addExtension --extensions='messaging-pulsar'
これにより、ビルドファイルに次の内容が追加されます。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-pulsar</artifactId>
</dependency>
implementation("io.quarkus:quarkus-messaging-pulsar")
このエクステンションには、 |
3. SmallRye Pulsar コネクターの設定
SmallRye Reactive Messaging フレームワークは、Apache Kafka、Apache Pulsar、AMQP、Apache Camel、JMS、MQTT などのさまざまなメッセージングバックエンドをサポートしているため、汎用的な語彙を使用しています。
-
アプリケーションは メッセージ を送受信します。
Message
は、ペイロード をラップし、いくつかの メタデータ で拡張できます。 これは、値とキーで構成される PulsarMessage
と混同しないでください。 Pulsar コネクターでは、Reactive Messaging メッセージ が Pulsar メッセージ に対応します。 -
メッセージは チャネル を通過します。アプリケーションコンポーネントは、チャネルに接続してメッセージを公開および消費します。Pulsar コネクターは、チャネル を Pulsar トピック にマッピングします。
-
チャネルは コネクター を使用してメッセージバックエンドに接続します。 コネクターは、着信メッセージを特定のチャネル (アプリケーションが消費するチャネル) にマッピングし、特定のチャネルに送信された発信メッセージを収集するように設定されています。 各コネクターは、特定のメッセージングテクノロジーに特化しています。 たとえば、Pulsar を処理するコネクターの名前は、
smallrye-pulsar
となっています。
着信チャネルを持つ Pulsar コネクターの最小設定は次のようになります。
%prod.pulsar.client.serviceUrl=pulsar:6650 (1)
mp.messaging.incoming.prices.connector=smallrye-pulsar (2)
1 | プロダクションプロファイルの Pulsar ブローカーサービス URL を設定します。
この URL は、 mp.messaging.incoming.$channel.serviceUrl プロパティーを使用して、グローバルに、またはチャネルごとに設定できます。
開発モードおよびテスト実行時には、Dev Services for Pulsar が Pulsar ブローカーを自動的に起動します。 |
2 | prices チャネルを管理するためのコネクターを設定します。 デフォルトでは、トピック 名はチャネル名と同じです。 |
トピック名は、トピック属性を設定することで上書きできます。
%prod 接頭辞は、アプリケーションが本番モードで実行される場合にのみプロパティーが使用されることを示します (つまり、開発モードまたはテストモードでは使用されません)。詳細は、プロファイルに関するドキュメント を参照してください。
|
コネクターの自動アタッチ
クラスパスに単一のコネクターがある場合は、 この自動アタッチは、以下を使用して無効にできます。
|
設定オプションの詳細は、Pulsar クライアントの設定 を参照してください。
4. Pulsar からのメッセージの受信
Pulsar コネクターは、Pulsar クライアントを使用して Pulsar ブローカーに接続し、コンシューマーを作成して Pulsar ブローカーからメッセージを受信します。また、各 Pulsar Message
を Reactive Messaging Message
にマッピングします。
4.1. 例
Pulsar ブローカーが実行中で、 pulsar:6650
アドレスを使用してアクセスできるとします。
次のようにして、 prices
チャネルで Pulsar メッセージを受信するようにアプリケーションを設定します。
mp.messaging.incoming.prices.serviceUrl=pulsar://pulsar:6650 (1)
mp.messaging.incoming.prices.subscriptionInitialPosition=Earliest (2)
-
Pulsar ブローカーサービスの URL を設定します。
-
コンシューマーのサブスクリプションが
Earliest
位置からメッセージの受信を開始することを確認します。
Pulsar トピックやコンシューマー名を設定する必要はありません。
デフォルトでは、コネクターはチャネル名 ( |
Pulsar では、コンシューマーがトピックサブスクリプションの |
すると、アプリケーションが double
ペイロードを直接受信できるようになります。
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PriceConsumer {
@Incoming("prices")
public void consume(double price) {
// process your price.
}
}
または、Reactive Messaging タイプ Message<Double>
を取得することもできます。
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
// access record metadata
var metadata = msg.getMetadata(PulsarIncomingMessageMetadata.class).orElseThrow();
// process the message payload.
double price = msg.getPayload();
// Acknowledge the incoming message (acknowledge the Pulsar message back to the broker)
return msg.ack();
}
Reactive Messaging の Message
タイプを使用すると、コンシューマーメソッドが着信メッセージのメタデータにアクセスし、確認応答を手動で処理できます。
Pulsar メッセージオブジェクトに直接アクセスする場合は、次を使用します。
@Incoming("prices")
public void consume(org.apache.pulsar.client.api.Message<Double> msg) {
String key = msg.getKey();
String value = msg.getValue();
String topic = msg.topicName();
// ...
}
org.apache.pulsar.client.api.Message
は、基盤となる Pulsar クライアントによって提供され、コンシューマーメソッドで直接使用できます。
または、次の例のように、アプリケーションで、チャネル名によって識別される Multi
を Bean に注入し、そのイベントをサブスクライブすることもできます。
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;
@Path("/prices")
public class PriceResource {
@Inject
@Channel("prices")
Multi<Double> prices;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<Double> stream() {
return prices;
}
}
|
チャネルとして注入できるのは、以下のタイプです。
@Inject @Channel("prices") Multi<Double> streamOfPayloads;
@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;
@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;
@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;
前出の Message
の例と同様に、注入されたチャネルがペイロード (Multi<T>
) を受信した場合は、そのチャネルが自動的にメッセージを確認応答し、複数のサブスクライバーをサポートします。
注入されたチャネルが Message (Multi<Message<T>>
) を受信した場合は、手動で確認応答とブロードキャストを行う必要があります。
4.2. ブロッキング処理
リアクティブメッセージングは、I/O スレッドでメソッドを呼び出します。このトピックの詳細については、 Quarkus リアクティブアーキテクチャーのドキュメント を参照してください。ただし、多くの場合、リアクティブメッセージングとデータベースインタラクションなどのブロック処理を組み合わせる必要があります。このためには、処理が ブロッキング であり、呼び出し元のスレッドで実行するべきではないことを示す @Blocking
アノテーションを使用する必要があります。
例えば、以下のコードは、Hibernate with Panacheを 使用してデータベースに受信ペイロードを格納する方法を示しています。
import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
@ApplicationScoped
public class PriceStorage {
@Incoming("prices")
@Transactional
public void store(int priceInUsd) {
Price price = new Price();
price.value = priceInUsd;
price.persist();
}
}
効果はどちらも同じです。したがって、両方を使うことができます。1番目のものは、使用するワーカープールや順序を保持するかどうかなど、より細かい調整が可能です。2 番目のものは、Quarkus の他のリアクティブ機能でも使用され、デフォルトのワーカープールを使用し、順序を保持します。
|
@RunOnVirtualThread
Java 仮想スレッドでの ブロッキング処理の実行については、 Quarkus Virtual Thread support with Reactive Messagingのドキュメント を参照してください。 |
@Transactional
メソッドに |
4.3. Pulsar のサブスクリプションタイプ
Pulsar の subscriptionType コンシューマー設定は、パブリッシュ - サブスクライブやキューイングなど、さまざまなメッセージングシナリオを実現するために柔軟に使用できます。
-
Exclusive サブスクリプションタイプ使用すると、"ファンアウト型の pub/sub メッセージング" 用に 一意のサブスクリプション名 を指定できます。これがデフォルトのサブスクリプションタイプです。
-
Shared、Key_Shared、または Failover サブスクリプションタイプを使用すると、複数のコンシューマーで 同じサブスクリプション名 を共有して、コンシューマー間で "メッセージキューイング" を実現できます。
サブスクリプション名が指定されていない場合、Quarkus は一意の ID を生成します。
4.4. デシリアライゼーションと Pulsar Schema
Pulsar コネクターを使用すると、基盤となる Pulsar コンシューマーのスキーマ設定を設定できます。 詳細は、Pulsar Schema 設定と自動スキーマ検出 を参照してください。
4.5. 確認応答ストラテジー
Pulsar メッセージから生成されたメッセージが 確認応答 されると、コネクターが 確認応答リクエスト を Pulsar ブローカーに送信します。 すべての Reactive Messaging メッセージが 確認応答 される必要があります。ほとんどの場合、これは自動的に処理されます。 確認応答リクエストは、次の 2 つのストラテジーを使用して Pulsar ブローカーに送信できます。
-
個別確認応答 は、デフォルトのストラテジーです。確認応答リクエストが各メッセージに対してブローカーに送信されます。
-
累積確認応答 は、
ack-strategy=cumulative
を使用して設定されるストラテジーです。コンシューマーが受信した最後のメッセージのみを確認します。 提供されたメッセージまでのストリーム内の全メッセージは、そのコンシューマーに再配信されません。
デフォルトでは、Pulsar コンシューマーは、ブローカーからの確認応答の確認を待たずに確認応答を検証します。
|
4.6. 失敗処理ストラテジー
Pulsar メッセージから生成されたメッセージが 否定応答 された場合、失敗ストラテジーが適用されます。 Quarkus Pulsar エクステンションは、4 つのストラテジーをサポートしています。
-
nack
(デフォルト) は、ブローカーに 否定応答 を送信し、ブローカーがこのメッセージをコンシューマーに再配信するようにトリガーします。 否定応答は、negativeAckRedeliveryDelayMicros
プロパティーとnegativeAck.redeliveryBackoff
プロパティーを使用してさらに設定できます。 -
fail
の場合、アプリケーションが失敗します。メッセージはそれ以上処理されません。 -
ignore
の場合、失敗がログに記録されますが、確認応答ストラテジーが適用され、処理が続行されます。 -
continue
の場合、失敗がログに記録されますが、確認応答や否定応答を適用せずに処理が続行されます。このストラテジーは、確認応答タイムアウト 設定とともに使用できます。 -
reconsume-later
は、reconsumeLater
API を使用して、遅延して再消費されるメッセージを 再試行レタートピック に送信します。 遅延はreconsumeLater.delay
プロパティーを使用して設定でき、デフォルトは 3 秒です。 失敗メタデータにio.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata
のインスタンスを追加することで、メッセージごとにカスタムの遅延またはプロパティーを設定できます。
4.6.1. 確認応答タイムアウト
Pulsar クライアントは、否定応答と同様に、 確認応答タイムアウト メカニズムにより、指定された ackTimeout 期間にわたって、確認応答されていないメッセージを追跡し、ブローカーに 確認応答されていないメッセージの再配信リクエスト を送信します。これを受けて、ブローカーは確認応答されていないメッセージをコンシューマーに再送信します。
タイムアウトと再配信バックオフメカニズムを設定するには、 ackTimeoutMillis
プロパティーと ackTimeout.redeliveryBackoff
プロパティーを設定します。
ackTimeout.redeliveryBackoff
には、最小遅延 (ミリ秒単位)、最大遅延 (ミリ秒単位)、および乗数をコンマ区切り値として指定できます。
mp.messaging.incoming.out.failure-strategy=continue
mp.messaging.incoming.out.ackTimeoutMillis=10000
mp.messaging.incoming.out.ackTimeout.redeliveryBackoff=1000,60000,2
4.6.2. 後の再消費と再試行レタートピック
再試行レタートピック は、正常に消費されなかったメッセージをデッドレタートピックにプッシュし、メッセージの消費を継続します。 デッドレタートピックは、確認応答タイムアウト、否定応答、再試行レタートピックなどのさまざまなメッセージ再配信方法で使用できることに注意してください。
mp.messaging.incoming.data.failure-strategy=reconsume-later
mp.messaging.incoming.data.reconsumeLater.delay=5000
mp.messaging.incoming.data.retryEnable=true
mp.messaging.incoming.data.negativeAck.redeliveryBackoff=1000,60000,2
4.6.3. デッドレタートピック
デッドレタートピック は、正常に消費されなかったメッセージをデッドレタートピックにプッシュし、メッセージの消費を継続します。 デッドレタートピックは、確認応答タイムアウト、否定応答、再試行レタートピックなどのさまざまなメッセージ再配信方法で使用できることに注意してください。
mp.messaging.incoming.data.failure-strategy=nack
mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount=2
mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic
mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName=my-dlq-subscription
mp.messaging.incoming.data.subscriptionType=Shared
否定応答 または 確認応答タイムアウト 再配信方法では、少なくとも未処理のメッセージを含むメッセージのバッチ全体が再配信されます。 詳細は、プロデューサーのバッチ処理 を参照してください。 |
4.7. Pulsar メッセージの一括受信
デフォルトでは、incoming メソッドは各 Pulsar メッセージを個別に受信します。
batchReceive=true
プロパティーを使用するか、コンシューマー設定で batchReceivePolicy
を指定することで、バッチモードを有効にすることができます。
@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<org.apache.pulsar.client.api.Messages<Double>> messages) {
for (org.apache.pulsar.client.api.Message<Double> msg : messages.getPayload()) {
String key = msg.getKey();
String topic = msg.getTopicName();
long timestamp = msg.getEventTime();
//... process messages
}
// ack will commit the latest offsets (per partition) of the batch.
return messages.ack();
}
@Incoming("prices")
public void consumeRecords(org.apache.pulsar.client.api.Messages<Double> messages) {
for (org.apache.pulsar.client.api.Message<Double> msg : messages) {
//... process messages
}
}
または、consume メソッドにペイロードのリストを直接受信することもできます。
@Incoming("prices")
public void consume(List<Double> prices) {
for (double price : prices) {
// process price
}
}
Quarkus は、着信チャネルのバッチタイプを自動検出し、バッチ設定を自動的に設定します。
|
5. Pulsar へのメッセージの送信
Pulsar Connectorはリアクティブ・メッセージング Message
をPulsarメッセージとして書き込むことができます。
5.1. 例
Pulsar ブローカーが実行中で、 pulsar:6650
アドレスを使用してアクセスできるとします。
次のように、 prices
チャネルからのメッセージを Pulsar メッセージに書き込むようにアプリケーションを設定します。
mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650 (1)
-
Pulsar ブローカーサービスの URL を設定します。
Pulsar トピックやプロデューサー名を設定する必要はありません。
デフォルトでは、コネクターはチャネル名 ( |
次に、アプリケーションは、 Message<Double>
を prices
チャネルに送信する必要があります。次のスニペットのように、 double
ペイロードを使用できます。
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class PulsarPriceProducer {
private final Random random = new Random();
@Outgoing("prices-out")
public Multi<Double> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble());
}
}
generate メソッドは、 Multi<Double>
を返します。これは、 Flow.Publisher
インターフェイスを実装します。
このパブリッシャーは、メッセージを生成し、設定された Pulsar トピックに送信するために、フレームワークによって使用されます。
ペイロードを返す代わりに、 io.smallrye.reactive.messaging.pulsar.OutgoingMessage
を返して、Pulsar メッセージを送信することもできます。
@Outgoing("out")
public Multi<OutgoingMessage<Double>> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> OutgoingMessage.of("my-key", random.nextDouble()));
}
ペイロードを org.eclipse.microprofile.reactive.messaging.Message
内にラップして、書き込まれたレコードをより詳細に制御できます。
@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> Message.of(random.nextDouble())
.addMetadata(PulsarOutgoingMessageMetadata.builder()
.withKey("my-key")
.withProperties(Map.of("property-key", "value"))
.build()));
}
Messages
を送信するときに io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata
のインスタンスを追加して、メッセージが Pulsar に書き込まれる方法に影響を与えることができます。
発信メソッドは、 Flow.Publisher
を返すメソッドシグネチャーの他に、単一のメッセージを返すこともできます。
この場合、プロデューサーがこのメソッドをジェネレーターとして使用して無限のストリームを作成します。
@Outgoing("prices-out") T generate(); // T excluding void
@Outgoing("prices-out") Message<T> generate();
@Outgoing("prices-out") Uni<T> generate();
@Outgoing("prices-out") Uni<Message<T>> generate();
@Outgoing("prices-out") CompletionStage<T> generate();
@Outgoing("prices-out") CompletionStage<Message<T>> generate();
5.2. シリアライゼーションと Pulsar Schema
Pulsar コネクターを使用すると、基盤となる Pulsar プロデューサーのスキーマ設定を設定できます。 詳細は、Pulsar Schema 設定と自動スキーマ検出 を参照してください。
5.3. キー/値のペアの送信
キー/値のペアを Pulsar に送信するには、Pulsar プロデューサースキーマを KeyValue スキーマを使用して設定できます。
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.common.annotation.Identifier;
@ApplicationScoped
public class PulsarKeyValueExample {
@Identifier("out")
@Produces
Schema<KeyValue<String, Long>> schema = Schema.KeyValue(Schema.STRING, Schema.INT64);
@Incoming("in")
@Outgoing("out")
public KeyValue<String, Long> process(long in) {
return new KeyValue<>("my-key", in);
}
}
書き込まれたレコードをさらに制御する必要がある場合は、 PulsarOutgoingMessageMetadata
を使用します。
5.4. 確認応答
Pulsar ブローカーは、プロデューサーからメッセージを受信すると、メッセージに MessageId
を割り当ててプロデューサーに送り返し、メッセージが公開されたことを確認します。
デフォルトでは、コネクターは Pulsar がレコードを確認応答するのを待ってから、処理 (受信した Message
の確認応答) を続行します。
これを無効にするには、 waitForWriteCompletion
属性を false
に設定します。
レコードを書き込めない場合、メッセージは nacked
になります。
Pulsar クライアントは、失敗した場合に、 送信タイムアウト に達するまで、自動的にメッセージの送信を再試行します。
送信タイムアウト は、 |
5.5. バックプレッシャーとインフライトレコード
Pulsar 発信コネクターは、バックプレッシャーを処理し、Pulsar ブローカーへの書き込みを待機している保留メッセージの数を監視します。
保留メッセージの数は、 maxPendingMessages
属性を使用して設定され、デフォルトは 1000 です。
コネクターは、その数のメッセージのみを同時に送信します。 少なくとも 1 つの保留メッセージがブローカーによって確認応答されるまで、他のメッセージは送信されません。 次に、ブローカーの保留メッセージの 1 つが確認応答されると、コネクターが Pulsar に新しいメッセージを書き込みます。
また、 maxPendingMessages
を 0
に設定することで、保留メッセージの制限を解除することもできます。
Pulsar では、 maxPendingMessagesAcrossPartitions
を使用して、パーティションごとの保留メッセージの数を設定することもできます。
5.6. プロデューサーのバッチ処理
デフォルトでは、Pulsar プロデューサーは個々のメッセージをまとめてバッチ処理し、ブローカーに公開します。
バッチ処理のパラメーターは、 batchingMaxPublishDelayMicros
、 batchingPartitionSwitchFrequencyByPublishDelay
、 batchingMaxMessages
、 batchingMaxBytes
設定プロパティーを使用して設定できます。 batchingEnabled=false
で完全に無効にすることもできます。
Key_Shared
コンシューマーサブスクリプションを使用する場合、 batcherBuilder
を BatcherBuilder.KEY_BASED
に設定できます。
6. Pulsar トランザクションと exactly-once 処理
Pulsar トランザクション を使用すると、イベントストリーミングアプリケーションが 1 つのアトミック操作でメッセージを消費、処理、生成できます。
トランザクションを使用すると、1 つまたは複数のプロデューサーが複数のトピックにメッセージのバッチを送信できるようになります。バッチ内のすべてのメッセージが、最終的にすべてのコンシューマーにみえるか、またはコンシューマーに一切みえなくなります。
使用するには、ブローカー設定で |
クライアント側でも、 PulsarClient
設定でトランザクションのサポートを有効にする必要があります。
mp.messaging.outgoing.tx-producer.enableTransaction=true
Pulsar コネクターは、トランザクション内にレコードを書き込むための PulsarTransactions
カスタムエミッターを提供します。
これは、通常のエミッター @Channel
として使用できます。
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.OutgoingMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;
@ApplicationScoped
public class PulsarTransactionalProducer {
@Inject
@Channel("tx-out-example")
PulsarTransactions<OutgoingMessage<Integer>> txProducer;
@Inject
@Channel("other-producer")
PulsarTransactions<String> producer;
@Incoming("in")
public Uni<Void> emitInTransaction(Message<Integer> in) {
return txProducer.withTransaction(emitter -> {
emitter.send(OutgoingMessage.of("a", 1));
emitter.send(OutgoingMessage.of("b", 2));
emitter.send(OutgoingMessage.of("c", 3));
producer.send(emitter, "4");
producer.send(emitter, "5");
producer.send(emitter, "6");
return Uni.createFrom().completionStage(in::ack);
});
}
}
withTransaction
メソッドに渡される関数は、レコードを生成する TransactionalEmitter
を受け取り、トランザクションの結果を提供する Uni
を返します。
処理が正常に完了すると、プロデューサーがフラッシュされ、トランザクションがコミットされます。
処理中に例外が発生したり、失敗した Uni
が返されたり、 TransactionalEmitter
が中止の対象としてマークされたりすると、トランザクションは中止されます。
複数のトランザクションプロデューサーが 1 つのトランザクションに参加できます。 これにより、すべてのメッセージが開始したトランザクションを使用して確実に送信されます。また、トランザクションがコミットされる前に、参加しているすべてのプロデューサーがフラッシュされます。 |
このメソッドが Vert.x コンテキストで呼び出されると、処理関数もそのコンテキストで呼び出されます。 それ以外の場合は、プロデューサーの送信スレッドで呼び出されます。
6.1. Exactly-Once 処理
Pulsar Transactions API を使用すると、生成されたメッセージとともに、トランザクション内のコンシューマーオフセットを管理することもできます。 これにより、コンシューマーとトランザクションプロデューサーを consume-transform-produce パターンでカップリングすることができます。これは exactly-once 処理としても知られています。 これは、アプリケーションがメッセージを消費および処理し、結果をトピックに公開して、消費されたメッセージのオフセットをトランザクションでコミットすることを意味します。
PulsarTransactions
エミッターは、トランザクション内の着信 Pulsar メッセージに exactly-once 処理を適用する方法も提供します。
次の例には、トランザクション内の Pulsar メッセージのバッチが含まれています。
mp.messaging.outgoing.tx-out-example.enableTransaction=true
# ...
mp.messaging.incoming.in-channel.enableTransaction=true
mp.messaging.incoming.in-channel.batchReceive=true
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.pulsar.client.api.Messages;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;
@ApplicationScoped
public class PulsarExactlyOnceProcessor {
@Inject
@Channel("tx-out-example")
PulsarTransactions<Integer> txProducer;
@Incoming("in-channel")
public Uni<Void> emitInTransaction(Message<Messages<Integer>> batch) {
return txProducer.withTransactionAndAck(batch, emitter -> {
for (org.apache.pulsar.client.api.Message<Integer> record : batch.getPayload()) {
emitter.send(PulsarMessage.of(record.getValue() + 1, record.getKey()));
}
return Uni.createFrom().voidItem();
});
}
}
処理が正常に完了すると、トランザクション内でメッセージが確認応答され、トランザクションがコミットされます。
exactly-once を使用する場合、累積的にではなく、個別にのみメッセージを確認応答できます。 |
処理を中止する必要がある場合、メッセージは否定応答されます。処理を再試行したり、フェイルストップしたりするために、いずれかの失敗ストラテジーを使用することができます。
トランザクションが失敗して中止された場合、 withTransaction
から返される Uni
は失敗することに注意してください。
アプリケーションでエラーケースを処理することもできます。しかし、メッセージの消費を続行するには、 @Incoming
メソッドから返される Uni
が失敗しないようにする必要があります。
PulsarTransactions#withTransactionAndAck
メソッドは、メッセージの確認応答と否定応答を行いますが、リアクティブストリームを停止しません。
失敗を無視すると、最後にコミットされたオフセットにコンシューマーがリセットされ、そこから処理が再開されます。
失敗発生時に重複を回避するために、ブローカー側でメッセージの重複排除とバッチインデックスレベルの確認応答を有効にすることを推奨します。
|
7. Pulsar Schema 設定と自動スキーマ検出
Pulsar メッセージは、ペイロードとともに非構造化バイト配列として保存されます。 Pulsar スキーマ は、構造化データを raw メッセージバイトにシリアライズする方法を定義します。 スキーマ は、強制的なデータ構造での書き込みと読み取りを行うために、プロデューサーとコンシューマーに適用されます。 スキーマは、データがトピックに公開される前にデータを raw バイトにシリアライズします。また、raw バイトがコンシューマーに配信される前に raw バイトをデシリアライズします。
Pulsar は、登録されたスキーマ情報を保存する中央リポジトリーとしてスキーマレジストリーを使用します。 これにより、プロデューサー/コンシューマーがブローカーを通じてトピックのメッセージのスキーマを調整できるようになります。 デフォルトでは、スキーマの保存に Apache BookKeeper が使用されます。
Pulsar コネクターでは、 schema
プロパティーを使用してスキーマをプリミティブ型として指定できます。
mp.messaging.incoming.prices.connector=smallrye-pulsar
mp.messaging.incoming.prices.schema=INT32
mp.messaging.outgoing.prices-out.connector=smallrye-pulsar
mp.messaging.outgoing.prices-out.schema=DOUBLE
schema
プロパティーの値が スキーマタイプ と一致する場合、そのタイプを使用した単純なスキーマが作成され、該当するチャネルに使用されます。
Pulsar コネクターを使用すると、 @Identifier
修飾子で識別される Schema
Bean を CDI を通じて提供することで、複雑なスキーマタイプを設定できます。
たとえば、次の Bean は JSON スキーマとキー/値スキーマを提供します。
package pulsar.configuration;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import io.smallrye.common.annotation.Identifier;
@ApplicationScoped
public class PulsarSchemaProvider {
@Produces
@Identifier("user-schema")
Schema<User> userSchema = Schema.JSON(User.class);
@Produces
@Identifier("a-channel")
Schema<KeyValue<Integer, User>> keyValueSchema() {
return Schema.KeyValue(Schema.INT32, Schema.JSON(User.class), KeyValueEncodingType.SEPARATED);
}
public static class User {
String name;
int age;
}
}
定義されたスキーマを使用して着信チャネル users
を設定するには、 schema
プロパティーをスキーマ user-schema
の識別子に設定する必要があります。
mp.messaging.incoming.users.connector=smallrye-pulsar
mp.messaging.incoming.users.schema=user-schema
schema
プロパティーが見つからない場合、コネクターはチャネル名で識別される Schema
Bean を検索します。
たとえば、発信チャネル a-channel
はキー/値スキーマを使用します。
mp.messaging.outgoing.a-channel.connector=smallrye-pulsar
スキーマ情報が提供されていない場合、着信チャネルは Schema.AUTO_CONSUME()
を使用します。一方、発信チャネルは Schema.AUTO_PRODUCE_BYTES()
スキーマを使用します。
7.1. 自動スキーマ検出
Quarkus Messaging Pulsar (io.quarkus:quarkus-messaging-pulsar
) を使用すると、Quarkus は多くの場合、設定する正しい Pulsar Schema を自動的に検出できます。
この自動検出は、 @Incoming
メソッドと @Outgoing
メソッドの宣言、および注入された @Channel
に基づいています。
たとえば、以下のように宣言した場合
@Outgoing("generated-price")
public Multi<Integer> generate() {
...
}
このとき、設定において generated-price
チャネルで smallrye-pulsar
コネクターを使用するよう指定されていると、Quarkus は generated-price
チャネルの schema
属性を Pulsar Schema INT32
に自動的に設定します。
同様に、以下を宣言した場合
@Incoming("my-pulsar-consumer")
public void consume(org.apache.pulsar.api.client.Message<byte[]> record) {
...
}
設定において my-pulsar-consumer
チャネルで smallrye-pulsar
コネクターを使用するよう指定されていると、Quarkus は schema
属性を Pulsar BYTES
Schema に自動的に設定します。
最後に、以下を宣言した場合
@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;
設定において price-create
チャネルが smallrye-pulsar
コネクターを使用するよう指定されていると、Quarkus は schema
を Pulsar INT64
Schema に自動的に設定します。
Pulsar Schema の自動検出でサポートされているすべてのタイプは、次のとおりです。
-
short
およびjava.lang.Short
-
int
およびjava.lang.Integer
-
long
およびjava.lang.Long
-
float
およびjava.lang.Float
-
double
およびjava.lang.Double
-
byte[]
-
java.time.Instant
-
java.sql.Timestamp
-
java.time.LocalDate
-
java.time.LocalTime
-
java.time.LocalDateTime
-
java.nio.ByteBuffer
-
Avro スキーマから生成されたクラスと Avro
GenericRecord
は、AVRO
スキーマタイプで設定されます。 -
Protobuf スキーマから生成されたクラスは、
PROTOBUF
スキーマタイプで設定されます。 -
その他のクラスは、自動的に
JSON
スキーマタイプで設定されます。
|
Pulsar が提供するこれらのスキーマに加えて、Quarkus は 検証を適用することなく 次のスキーマ実装を提供します。
-
io.vertx.core.buffer.Buffer
は、io.quarkus.pulsar.schema.BufferSchema
スキーマで設定されます。 -
io.vertx.core.json.JsonObject
は、io.quarkus.pulsar.schema.JsonObjectSchema
スキーマで設定されます。 -
io.vertx.core.json.JsonArray
は、io.quarkus.pulsar.schema.JsonArraySchema
スキーマで設定されます。 -
スキーマレス JSON シリアライゼーションでは、
schema
設定がObjectMapper<fully_qualified_name_of_the_bean>
に設定されている場合、JacksonObjectMapper
を使用してスキーマが生成されます。このとき、Pulsar Schema 検証は適用されません。io.quarkus.pulsar.schema.ObjectMapperSchema
を使用すると、検証なしで JSON スキーマを明示的に設定できます。
schema
が設定されている場合、自動検出によって置き換えられることはありません。
シリアライザーの自動検出に問題がある場合は、 quarkus.messaging.pulsar.serializer-autodetection.enabled=false
を設定することで、これを完全にオフにすることができます。
オフにする必要があると認識した場合は、Quarkus issue tracker にバグを報告していただければ、どのような問題でも修正することができます。
8. Dev Services for Pulsar
Quarkus Messaging Pulsar エクステンション (quarkus-messaging-pulsar
) を使用すると、Dev Services for Pulsar が開発モードおよびテスト実行時に自動で Pulsar ブローカーを起動します。
そのため、ブローカーを手動で起動する必要はありません。
また、アプリケーションの設定も自動的に行われます。
8.1. Dev Services for Pulsar の有効化/無効化
以下の場合を除き、Dev Services for Pulsar は自動的に有効になります。
-
quarkus.pulsar.devservices.enabled
がfalse
に設定されている -
`pulsar.client.serviceUrl`が設定されている
-
すべての Reactive Messaging Pulsar チャネルには `serviceUrl`属性が設定されている
Dev Services for Pulsar は、ブローカーを起動するために Docker に依存しています。環境が Docker をサポートしていない場合は、ブローカーを手動で起動するか、すでに実行中のブローカーに接続する必要があります。
pulsar.client.
を使用してブローカーアドレスを設定できます。
8.2. 共有ブローカー
ほとんどの場合、アプリケーション間でブローカーを共有する必要があります。Dev Services for Plusar は、 開発 モードで動作する複数の Quarkus アプリケーションが 1 つのブローカーを共有するための サービスディスカバリー メカニズムを実装しています。
Dev Services for Pulsar は、コンテナーを識別するために使用される quarkus-dev-service-pulsar ラベルを使用してコンテナーを起動します。
|
複数の (共有) ブローカーが必要な場合は、quarkus.pulsar.devservices.service-name
属性を設定してブローカー名を指定できます。
値が同じコンテナーを検索し、見つからない場合は新しいコンテナーを開始します。
デフォルトのサービス名は pulsar
です。
開発モードでは共有はデフォルトで有効になっていますが、テストモードでは無効になっています。
quarkus.pulsar.devservices.shared=false
で共有を無効にできます。
8.3. ポートの設定
デフォルトでは、Dev Services for Pulsar はランダムなポートを選択し、アプリケーションを設定します。
quarkus.pulsar.devservices.port
プロパティーを設定することでポートを設定できます。
Pulsar がアドバタイズするアドレスは、選択したポートで自動的に設定されることに注意してください。
8.4. イメージの設定
Dev Services for Pulsar は、https://hub.docker.com/r/apachepulsar/pulsar[公式 Apache Pulsar イメージ] をサポートしています。
カスタムイメージ名は次のように設定できます。
quarkus.pulsar.devservices.image-name=datastax/lunastreaming-all:2.10_4.7
8.5. Pulsar ブローカーの設定
カスタムブローカー設定を使用して、Dev Services for Pulsar を設定できます。
次の例では、トランザクションサポートを有効にします。
quarkus.pulsar.devservices.broker-config.transaction-coordinator-enabled=true
quarkus.pulsar.devservices.broker-config.system-topic-enabled=true
8.6. 設定リファレンス
ビルド時に固定される設定プロパティ - その他の設定プロパティは実行時にオーバーライド可能です。
Configuration property |
タイプ |
デフォルト |
---|---|---|
If Dev Services for Pulsar has been explicitly enabled or disabled. Dev Services are generally enabled by default, unless there is an existing configuration present. For Pulsar, Dev Services starts a broker unless Environment variable: Show more |
ブーリアン |
|
Optional fixed port the dev service will listen to. If not defined, the port will be chosen randomly. Environment variable: Show more |
int |
|
The image to use. Note that only Apache Pulsar images are supported. Specifically, the image repository must end with Environment variable: Show more |
string |
|
Indicates if the Pulsar broker managed by Quarkus Dev Services is shared. When shared, Quarkus looks for running containers using label-based service discovery. If a matching container is found, it is used, and so a second one is not started. Otherwise, Dev Services for Pulsar starts a new container. The discovery uses the Container sharing is only used in dev mode. Environment variable: Show more |
ブーリアン |
|
The value of the This property is used when you need multiple shared Pulsar brokers. Environment variable: Show more |
string |
|
Broker config to set on the Pulsar instance Environment variable: Show more |
Map<String,String> |
9. Pulsar クライアントの設定
Pulsar のクライアント、コンシューマー、プロデューサーは、Pulsar クライアントアプリケーションの動作を設定するために、細かくカスタマイズすることが可能です。
Pulsar コネクターは、Pulsar クライアントと、チャネルごとのコンシューマーまたはプロデューサーを作成します。それぞれに実用的なデフォルトが設定されているため、設定が容易です。 作成は処理されますが、利用できる設定オプションは、すべて Pulsar チャネルを通じて設定可能です。
通常、 PulsarClient
、 PulsarConsumer
、 PulsarProducer
を作成するには、ビルダー API を使用します。ビルダー API は、その性質上、実装に渡す設定オブジェクトを毎回ビルドします。
該当する設定オブジェクトは、
ClientConfigurationData、
ConsumerConfigurationData、
および ProducerConfigurationData です。
Pulsar コネクターを使用すると、これらの設定オブジェクトのプロパティーを直接受信できます。
たとえば、 PulsarClient
のブローカー認証情報は、 authPluginClassName
および authParams
プロパティーを使用して受信されます。
受信チャネル data
の認証を設定するには、以下のように指定します。
mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://localhost:6650
mp.messaging.incoming.data.topic=topic
mp.messaging.incoming.data.subscriptionInitialPosition=Earliest
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"}
Pulsar コンシューマーのプロパティー subscriptionInitialPosition
も、列挙値 SubscriptionInitialPosition.Earliest
として表される Earliest
値で設定されていることに注意してください。
この方法は、ほとんどの設定ケースに対応できます。
ただし、 CryptoKeyReader
、 ServiceUrlProvider
などのシリアライズできないオブジェクトは、この方法では設定できません。
Pulsar コネクターを使用すると、Pulsar 設定データオブジェクト ClientConfigurationData
、 ConsumerConfigurationData
、 ProducerConfigurationData
のインスタンスを考慮できます。
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
class PulsarConfig {
@Produces
@Identifier("my-consumer-options")
public ConsumerConfigurationData<String> getConsumerConfig() {
ConsumerConfigurationData<String> data = new ConsumerConfigurationData<>();
data.setAckReceiptEnabled(true);
data.setCryptoKeyReader(DefaultCryptoKeyReader.builder()
//...
.build());
return data;
}
}
このインスタンスは、コネクターが使用するクライアントを設定するために取得および使用されます。
client-configuration
、 consumer-configuration
、または producer-configuration
属性を使用してクライアントの名前を指定する必要があります。
mp.messaging.incoming.prices.consumer-configuration=my-consumer-options
[client|consumer|producer]-configuration
が設定されていない場合、コネクターはチャネル名で識別されるインスタンスを検索します。
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
class PulsarConfig {
@Produces
@Identifier("prices")
public ClientConfigurationData getClientConfig() {
ClientConfigurationData data = new ClientConfigurationData();
data.setEnableTransaction(true);
data.setServiceUrlProvider(AutoClusterFailover.builder()
// ...
.build());
return data;
}
}
また、キーごとの設定値を含む Map<String, Object>
を指定することもできます。
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import java.util.Map;
class PulsarConfig {
@Produces
@Identifier("prices")
public Map<String, Object> getProducerConfig() {
return Map.of(
"batcherBuilder", BatcherBuilder.KEY_BASED,
"sendTimeoutMs", 3000,
"customMessageRouter", new PartialRoundRobinMessageRouterImpl(4));
}
}
さまざまな設定ソースは、重要度の最も低いものから最も高いものの順に、次の優先順序で読み込まれます。
-
デフォルトの設定識別子 (
default-pulsar-client
、default-pulsar-consumer
、default-pulsar-producer
) を使用して生成されたMap<String, Object>
設定マップ -
設定内またはチャネル名内の識別子を使用して生成された
Map<String, Object>
設定マップ -
チャネル設定内またはチャネル名内の識別子を使用して生成された
[Client|Producer|Consuemr]ConfigurationData
オブジェクト -
[Client|Producer|Consuemr]ConfigurationData
フィールド名を使用して名前が付けられたチャネル設定プロパティー
設定オプションの完全なリストについては、設定リファレンス を参照してください。
9.1. Pulsar 認証の設定
Pulsar は、プラグ可能な認証フレームワークを提供します。Pulsar ブローカー/プロキシーは、このメカニズムを使用してクライアントを認証します。
クライアントは、 authPluginClassName
および authParams
属性を使用して application.properties
ファイルで設定できます。
pulsar.client.serviceUrl=pulsar://pulsar:6650
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
pulsar.client.authParams={"userId":"superuser","password":"admin"}
または、プログラムで設定することもできます。
import java.util.Map;
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
class PulsarConfig {
@Produces
@Identifier("prices")
public ClientConfigurationData config() {
var data = new ClientConfigurationData();
var auth = new AuthenticationBasic();
auth.configure(Map.of("userId", "superuser", "password", "admin"));
data.setAuthentication(auth);
return data;
}
}
9.1.1. mTLSを使ったPulsarへの認証設定
Pulsar Messagingエクステンションは Quarkus TLSレジストリ と統合し、mTLSを使ってクライアントを認証します。
パルサー・チャンネルにmTLSを設定するには、 application.properties
に名前付きTLS設定を提供する必要があります:
quarkus.tls.my-tls-config.trust-store.p12.path=target/certs/pulsar-client-truststore.p12
quarkus.tls.my-tls-config.trust-store.p12.password=secret
quarkus.tls.my-tls-config.key-store.p12.path=target/certs/pulsar-client-keystore.p12
quarkus.tls.my-tls-config.key-store.p12.password=secret
mp.messaging.incoming.prices.tls-configuration-name=my-tls-config
9.1.2. Datastax Luna Streaming へのアクセスの設定
Luna Streaming は、DataStax のツールとサポートを備えた、Apache Pulsar の製品版ディストリビューションです。 DataStax Luna Pulsar テナントを作成したら、自動生成されたトークンをメモし、トークン認証を設定します。
pulsar.client.serviceUrl=pulsar+ssl://pulsar-aws-eucentral1.streaming.datastax.com:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
pulsar.client.authParams=token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2ODY4MTc4MzQsImlzcyI6ImRhdGFzdGF4Iiwic3ViIjoiY2xpZW50OzA3NGZhOTI4LThiODktNDBhNC04MDEzLWNlNjVkN2JmZWIwZTtjSEpwWTJWejsyMDI5ODdlOGUyIiwidG9rZW5pZCI6IjIwMjk4N2U4ZTIifQ....
必ず事前にトピックを作成するか、namespace 設定で 自動トピック作成 を有効にしてください。
トピック設定では、トピックの完全な名前を参照する必要があることに注意してください。
mp.messaging.incoming.prices.topic=persistent://my-tenant/default/prices
9.1.3. StreamNative Cloud へのアクセスの設定
StreamNative Cloud は、フルマネージドの Pulsar-as-a-Service です。完全ホスト型、パブリッククラウド上で StreamNative によって管理する方式、Kubernetes 上で自己管理する方式など、さまざまなデプロイ方法で利用できます。
StreamNative Pulsar クラスターは Oauth2 認証を使用します。そのため、アプリケーションが使用している Pulsar namespace/トピックへの権限 を持つ サービスアカウント が存在することを確認する必要があります。
次に、サービスアカウントの キーファイル (秘密鍵 として機能します) をダウンロードし、発行者 URL (通常は https://auth.streamnative.cloud/
) とクラスターの オーディエンス (urn:sn:pulsar:o-rf3ol:redhat
など) をメモする必要があります。
この手順を行う際には、StreamNative Cloud コンソールの Admin セクションにある Pulsar Clients ページが役立ちます。
Pulsar Oauth2 認証を使用してアプリケーションを設定するには、以下のように指定します。
pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
pulsar.client.authParams={"type":"client_credentials","privateKey":"data:application/json;base64,<base64-encoded value>","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:o-rfwel:redhat"}
pulsar.client.authParams
設定には、 issuerUrl
、 audience
、および data:application/json;base64,<base64-encoded-key-file>
という形式の privateKey
を含む Json 文字列が含まれていることに注意してください。
または、プログラムで認証を設定することもできます。
package org.acme.pulsar;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
@ApplicationScoped
public class PulsarAuth {
@ConfigProperty(name = "pulsar.issuerUrl")
String issuerUrl;
@ConfigProperty(name = "pulsar.credentials")
String credentials;
@ConfigProperty(name = "pulsar.audience")
String audience;
@Produces
@Identifier("pulsar-auth")
public ClientConfigurationData pulsarClientConfig() throws MalformedURLException {
var data = new ClientConfigurationData();
data.setAuthentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), PulsarAuth.class.getResource(credentials), audience));
return data;
}
}
これは、キーファイルがリソースとしてアプリケーションクラスパスに含まれていることを前提としています。この場合、設定は次のようになります。
mp.messaging.incoming.prices.client-configuration=pulsar-auth
pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.issuerUrl=https://auth.streamnative.cloud/
pulsar.audience=urn:sn:pulsar:o-rfwel:redhat
pulsar.credentials=/o-rfwel-quarkus-app.json
pulsar-auth
で識別されるクライアント設定を使用するチャネルには、 client-configuration
属性を設定する必要があることに注意してください。
10. ヘルスチェック
Quarkus エクステンションは、Pulsar コネクターによって管理される各チャネルの起動、準備状況、および稼働状況を報告します。 ヘルスチェックは、Pulsar クライアントに依存してブローカーとの接続が確立されていることを確認します。
着信チャネルと発信チャネルの Startup および Readiness プローブは、ブローカーとの接続が確立されると、OK を報告します。
着信チャネルと発信チャネルの Liveness プローブは、ブローカーとの接続が確立され、かつ 失敗が発見されなかった場合に、OK を報告します。
メッセージ処理が失敗すると、メッセージが 否定応答 されることに注意してください。その場合、メッセージは失敗ストラテジーによって処理されます。失敗ストラテジーは、失敗を報告し、liveness チェックの結果に影響を与えます。失敗ストラテジー `fail`は、失敗を報告するため、liveness チェックも失敗を報告します。
11. 設定リファレンス
以下は、Pulsar コネクターのチャネル、コンシューマー、プロデューサー、およびクライアントの設定属性のリストです。 Pulsar クライアントの設定方法の詳細は、Pulsar クライアントの設定 を参照してください。
11.1. 着信チャネル設定 (Pulsar からの受信)
以下の属性は以下のように設定します:
mp.messaging.incoming.your-channel-name.attribute=value
属性 (alias) | 説明 | タイプ | 必須 | デフォルト |
---|---|---|---|---|
ack-strategy |
レコードから生成されたメッセージが確認されたときに適用するコミット戦略を指定します。値は |
string |
false |
|
ackTimeout.redeliveryBackoff |
カンマ区切りの値で ack タイムアウトを設定 MultiplierRedeliveryBackoff、min delay、max delay、multiplier。 |
string |
false |
|
batchReceive |
バッチ受信を使用してメッセージを消費するかどうか |
boolean |
false |
|
client-configuration |
このチャネルのデフォルトPulsarクライアント設定を提供するCDI Beanの識別子。チャネル構成は、任意の属性をオーバーライドすることができます。BeanはMap<String, Object>型を持ち、@io.smallrye.common.annotation.Identifierアノテーションを使用して識別子を設定する必要があります。 |
string |
false |
|
consumer-configuration |
このチャネルのデフォルトPulsarコンシューマ設定を提供するCDI Beanの識別子。チャネル構成は、任意の属性をオーバーライドすることができます。BeanはMap<String, Object>型を持ち、@io.smallrye.common.annotation.Identifierアノテーションを使用して識別子を設定する必要があります。 |
string |
false |
|
deadLetterPolicy.deadLetterTopic |
失敗メッセージが送信されるデッドレタートピックの名前 |
string |
false |
|
deadLetterPolicy.initialSubscriptionName |
デッド・レタートピックの初期サブスクリプション名 |
string |
false |
|
deadLetterPolicy.maxRedeliverCount |
デッドレター・トピックに送信される前にメッセージが再配信される最大回数。 |
int |
false |
|
deadLetterPolicy.retryLetterTopic |
失敗したメッセージが送信される再試行トピックの名前 |
string |
false |
|
failure-strategy |
レコードから生成されたメッセージが否定応答(nack)されたときに適用する失敗ストラテジーを指定します。指定できる値は |
string |
false |
|
health-enabled |
ヘルスレポートが有効(デフォルト)か無効か |
boolean |
false |
|
negativeAck.redeliveryBackoff |
負の ack を設定するためのカンマ区切りの値 MultiplierRedeliveryBackoff、min delay、max delay、multiplier。 |
string |
false |
|
reconsumeLater.delay |
リコンシューム失敗戦略のデフォルト遅延時間(秒 |
long |
false |
|
schema |
このチャネルのPulsarスキーマ・タイプ。構成されると、指定されたSchemaTypeでスキーマが構築され、チャネルに使用されます。指定されていない場合、スキーマは |
string |
false |
|
serviceUrl |
パルサー・サービスのサービスURL |
string |
false |
|
topic |
消費された/入力されたパルサー・トピック。設定されていない場合、チャンネル名が使用されます |
string |
false |
|
tracing-enabled |
トレースが有効(デフォルト)か無効か |
boolean |
false |
|
基盤となる Pulsar コンシューマーがサポートするプロパティーを設定することもできます。
これらのプロパティーは、 pulsar.consumer
接頭辞を使用してグローバルに設定することもできます。
pulsar.consumer.subscriptionInitialPosition=Earliest
属性 | 説明 | タイプ | 設定ファイル | デフォルト |
---|---|---|---|---|
topicNames |
トピック名 |
Set |
true |
[] |
topicsPattern |
トピックパターン |
Pattern |
true |
|
subscriptionName |
サブスクリプション名 |
文字列 |
true |
|
subscriptionType |
サブスクリプションタイプ。
4つのサブスクリプションタイプがあります: |
SubscriptionType |
true |
Exclusive |
subscriptionProperties |
Map |
true |
||
subscriptionMode |
SubscriptionMode |
true |
Durable |
|
messageListener |
MessageListener |
false |
||
consumerEventListener |
ConsumerEventListener |
false |
||
negativeAckRedeliveryBackoff |
カスタム・メッセージのインタフェースは negativeAcked ポリシーです。コンシューマには |
RedeliveryBackoff |
false |
|
ackTimeoutRedeliveryBackoff |
カスタム・メッセージのインターフェースは ackTimeout ポリシーです。コンシューマに対して |
RedeliveryBackoff |
false |
|
receiverQueueSize |
コンシューマの受信キューのサイズ。 例えば、アプリケーションが デフォルト値より大きい値を設定すると、メモリ使用量が増加しますが、コンシューマのスループットが向上します。 |
int |
true |
1000 |
acknowledgementsGroupTimeMicros |
コンシューマの確認応答を指定時間グループ化します。 デフォルトでは、コンシューマはブローカに確認応答を送信するのに100msのグループ化時間を使用します。 グループ化時間を 0 に設定すると、確認応答を直ちに送信します。 グループ化時間を長くすると効率的ですが、その分失敗後のメッセージ再送が若干増えます。 |
long |
true |
100000 |
maxAcknowledgmentGroupSize |
メッセージの数だけコンシューマー確認をグループ化します。 |
int |
true |
1000 |
negativeAckRedeliveryDelayMicros |
処理に失敗したメッセージを再配信するまでの待ち時間。 アプリケーションが |
long |
true |
60000000 |
maxTotalReceiverQueueSizeAcrossPartitions |
パーティション間のレシーバ・キュー・サイズの合計の最大値。 この設定により、合計レシーバ・キュー・サイズがこの値を超えた場合、個々のパーティションのレシーバ・キュー・サイズが減少します。 |
int |
true |
50000 |
consumerName |
コンシューマー名 |
文字列 |
true |
|
ackTimeoutMillis |
非暗号化メッセージのタイムアウト |
long |
true |
0 |
tickDurationMillis |
ack-timeout 再配信の粒度。 |
long |
true |
1000 |
priorityLevel |
共有サブスクリプションタイプでメッセージをディスパッチする際に、ブローカーがより優先順位を与えるコンシューマーの優先順位レベル。 ブローカーは優先順位の降順に従います。例えば、0=最大優先度、1、2、…のように。 共有サブスクリプションタイプでは、ブローカーは まず、最大優先度レベルのコンシューマーが許可を持っていれば、そのコンシューマーにメッセージをディスパッチ します。そうでない場合、ブローカーは次の優先順位のコンシューマーを考慮します。 例1 サブスクリプションに コンシューマの優先度、レベル、パーミット C1, 0, 2 C2, 0, 1 C3, 0, 1 C4, 1, 2 C5, 1, 1 |
int |
true |
0 |
maxPendingChunkedMessage |
保留中のチャンク・メッセージを保持するキューの最大サイズ。この閾値に達すると、コンシューマは保留中のメッセージを削除してメモリ使用率を最適化します。 |
int |
true |
10 |
autoAckOldestChunkedMessageOnQueueFull |
|
boolean |
true |
false |
expireTimeOfIncompleteChunkedMessageMillis |
コンシューマが指定された時間内にすべてのチャンクを受信できなかった場合に、不完全なチャンクを失効させる時間間隔。デフォルト値は 1 分です。 |
long |
true |
60000 |
cryptoKeyReader |
CryptoKeyReader |
false |
||
messageCrypto |
MessageCrypto |
false |
||
cryptoFailureAction |
復号化できないメッセージを受信した場合、コンシューマーは以下のアクションを取る 必要があります。 * FAIL : 暗号化が成功するまでメッセージをフェイルするデフォルトのオプション。 DISCARD* :(メッセージを)黙認し、アプリケーションに配信しません。 CONSUME* :暗号化されたメッセージをアプリケーションに配信します。メッセージを復号化するのはアプリケーションの責任です。 |
ConsumerCryptoFailureAction |
true |
FAIL |
properties |
このコンシューマの名前または値のプロパティ。 |
SortedMap |
true |
{} |
readCompacted |
コンシューマーは、バックログを圧縮するときのトピック メッセージ内のポイントに到達するまで、圧縮されたトピック内の各キーの最新の値のみを参照します。そのポイントを超えると、通常どおりメッセージを送信します。 アクティブなコンシューマーが 1 つある永続トピック (失敗または排他サブスクリプションなど) へのサブスクリプションでのみ、 非永続トピックへのサブスクリプションまたは共有サブスクリプションでこれを有効にしようとすると、サブスクリプション呼び出しで |
boolean |
true |
false |
subscriptionInitialPosition |
最初にトピックを購読するときにカーソルを設定する初期位置。 |
SubscriptionInitialPosition |
true |
Latest |
patternAutoDiscoveryPeriod |
トピックの消費者にパターンを使用する場合のトピック自動検出期間。 |
int |
true |
60 |
regexSubscriptionMode |
正規表現を使ってトピックを購読する場合、特定の種類のトピックを選ぶことができます。 |
RegexSubscriptionMode |
true |
PersistentOnly |
deadLetterPolicy |
コンシューマー向けのデッドレター ポリシー。 デフォルトでは、一部のメッセージは何度も再配信される可能性があり、停止しないこともあります。 デッドレター メカニズムを使用すると、メッセージの再配信回数が最大になります。再配信の最大回数を超えると、メッセージはデッドレター トピックに送信され、自動的に確認応答されます。 デッドレター メカニズムを有効にするには、
|
DeadLetterPolicy |
true |
|
retryEnable |
boolean |
true |
false |
|
batchReceivePolicy |
BatchReceivePolicy |
false |
||
autoUpdatePartitions |
|
boolean |
true |
true |
autoUpdatePartitionsIntervalSeconds |
long |
true |
60 |
|
replicateSubscriptionState |
|
boolean |
true |
false |
resetIncludeHead |
boolean |
true |
false |
|
keySharedPolicy |
KeySharedPolicy |
false |
||
batchIndexAckEnabled |
boolean |
true |
false |
|
ackReceiptEnabled |
boolean |
true |
false |
|
poolMessages |
boolean |
true |
false |
|
payloadProcessor |
MessagePayloadProcessor |
false |
||
startPaused |
boolean |
true |
false |
|
autoScaledReceiverQueueSizeEnabled |
boolean |
true |
false |
|
topicConfigurations |
List |
true |
[] |
11.2. 発信チャネルの設定 (Pulsar への公開)
属性 (alias) | 説明 | タイプ | 必須 | デフォルト |
---|---|---|---|---|
client-configuration |
このチャネルのデフォルトPulsarクライアント設定を提供するCDI Beanの識別子。チャネル設定は、任意の属性をオーバーライドすることができます。BeanはMap<String, Object>型を持ち、@io.smallrye.common.annotation.Identifierアノテーションを使用して識別子を設定する必要があります。 |
string |
false |
|
health-enabled |
ヘルスレポートが有効(デフォルト)か無効か |
boolean |
false |
|
maxPendingMessages |
保留メッセージ、つまりブローカーからの確認応答を待っているメッセージを保持するキューの最大サイズ。 |
int |
false |
|
producer-configuration |
このチャネルのデフォルトPulsarプロデューサー設定を提供するCDI Beanの識別子。チャネル設定は、任意の属性をオーバーライドすることができます。BeanはMap<String, Object>型を持ち、@io.smallrye.common.annotation.Identifierアノテーションを使用して識別子を設定する必要があります。 |
string |
false |
|
schema |
このチャネルのPulsarスキーマ・タイプ。設定されると、指定されたSchemaTypeでスキーマが構築され、チャネルに使用されます。指定されていない場合、スキーマは |
string |
false |
|
serviceUrl |
パルサー・サービスのサービスURL |
string |
false |
|
topic |
消費された/入力されたパルサー・トピック。設定されていない場合、チャンネル名が使用されます |
string |
false |
|
tracing-enabled |
トレースが有効(デフォルト)か無効か |
boolean |
false |
|
waitForWriteCompletion |
顧客がメッセージを確認する前に、ブローカーが書かれたレコードを確認するのを待つかどうか。 |
boolean |
false |
|
基盤となる Pulsar プロデューサーがサポートするプロパティーを設定することもできます。
これらのプロパティーは、 pulsar.producer
接頭辞を使用してグローバルに設定することもできます。
pulsar.producer.batchingEnabled=false
属性 | 説明 | タイプ | 設定ファイル | デフォルト |
---|---|---|---|---|
topicName |
トピック名 |
文字列 |
true |
|
producerName |
プロデューサー名 |
文字列 |
true |
|
sendTimeoutMs |
ミリ秒単位のメッセージ送信のタイムアウト。
|
long |
true |
30000 |
blockIfQueueFull |
|
boolean |
true |
false |
maxPendingMessages |
保留中のメッセージを保持するキューの最大サイズ。 |
int |
true |
0 |
maxPendingMessagesAcrossPartitions |
パーティション間の保留メッセージの最大数。 |
int |
true |
0 |
messageRoutingMode |
パーティショニングされたトピック 上のプロデューサに対するメッセージ・ルーティング・ロジック。 メッセージにキーを設定しない場合にのみ、このロジックを適用します。 使用可能なオプションは以下のとおりです:
* |
MessageRoutingMode |
true |
|
hashingScheme |
特定のメッセージを公開するパーティションを決定するハッシュ関数 (パーティション化されたトピックのみ)。
使用可能なオプションは以下のとおりです:+
* |
HashingScheme |
true |
JavaStringHash |
cryptoFailureAction |
暗号化に失敗した場合、プロデューサはアクションを起こす必要があります。+ * FAIL : 暗号化に失敗した場合、暗号化されていないメッセージは送信されませ ん。+ * SEND : 暗号化に失敗した場合、暗号化されていないメッセージが送信されます。 |
ProducerCryptoFailureAction |
true |
FAIL |
customMessageRouter |
MessageRouter |
false |
||
batchingMaxPublishDelayMicros |
メッセージ送信のバッチ期間。 |
long |
true |
1000 |
batchingPartitionSwitchFrequencyByPublishDelay |
int |
true |
10 |
|
batchingMaxMessages |
バッチで許可されるメッセージの最大数。 |
int |
true |
1000 |
batchingMaxBytes |
int |
true |
131072 |
|
batchingEnabled |
メッセージのバッチ処理を有効にします。 |
boolean |
true |
true |
batcherBuilder |
BatcherBuilder |
false |
||
chunkingEnabled |
メッセージのチャンキングを有効にします。 |
boolean |
true |
false |
chunkMaxMessageSize |
int |
true |
-1 |
|
cryptoKeyReader |
CryptoKeyReader |
false |
||
messageCrypto |
MessageCrypto |
false |
||
encryptionKeys |
Set |
true |
[] |
|
compressionType |
プロデューサーが使用するメッセージデータ圧縮タイプ。+
使用可能なオプション:+
* LZ4 |
CompressionType |
true |
NONE |
initialSequenceId |
Long |
true |
||
autoUpdatePartitions |
boolean |
true |
true |
|
autoUpdatePartitionsIntervalSeconds |
long |
true |
60 |
|
multiSchema |
boolean |
true |
true |
|
accessMode |
ProducerAccessMode |
true |
Shared |
|
lazyStartPartitionedProducers |
boolean |
true |
false |
|
properties |
SortedMap |
true |
{} |
|
initialSubscriptionName |
トピックの作成時に初期サブスクリプションを自動的に作成するには、この設定を使用します。このフィールドが設定されていない場合、初期サブスクリプションは作成されません。 |
文字列 |
true |
11.3. Pulsar クライアントの設定
以下は、基盤となる PulsarClient
の設定リファレンスです。
これらのオプションは、チャネル属性を使用して設定できます。
mp.messaging.incoming.your-channel-name.numIoThreads=4
または、 pulsar.client
接頭辞を使用してグローバルに設定します。
pulsar.client.serviceUrl=pulsar://pulsar:6650
属性 | 説明 | タイプ | 設定ファイル | デフォルト |
---|---|---|---|---|
serviceUrl |
ブローカーに接続するためのPulsarクラスタHTTP URL。 |
文字列 |
true |
|
serviceUrlProvider |
ServiceUrl を生成する ServiceUrlProvider の実装クラスです。 |
ServiceUrlProvider |
false |
|
authentication |
クライアントの認証設定。 |
認証 |
false |
|
authPluginClassName |
クライアントの認証プラグインのクラス名。 |
文字列 |
true |
|
authParams |
クライアントの認証パラメータ。 |
文字列 |
true |
|
authParamMap |
クライアントの認証マップ。 |
Map |
true |
|
operationTimeoutMs |
クライアント操作のタイムアウト(ミリ秒)。 |
long |
true |
30000 |
lookupTimeoutMs |
クライアント・ルックアップ・タイムアウト(ミリ秒)。 |
long |
true |
-1 |
statsIntervalSeconds |
クライアントの統計情報を出力する間隔(秒)。 |
long |
true |
60 |
numIoThreads |
IOスレッド数。 |
int |
true |
10 |
numListenerThreads |
コンシューマー・リスナーのスレッド数。 |
int |
true |
10 |
connectionsPerBroker |
クライアントと各 Broker 間で確立される接続の数。値 0 は、コネクション・プーリングを無効にすることを意味します。 |
int |
true |
1 |
connectionMaxIdleSeconds |
[connectionMaxIdleSeconds] 秒以上接続が使用されない場合、接続を解放します。[connectionMaxIdleSeconds] < 0 の場合、アイドル状態の接続を自動的に解放する機能を無効にします。 |
int |
true |
180 |
useTcpNoDelay |
TCP NoDelayオプションを使用するかどうか。 |
boolean |
true |
true |
useTls |
TLSを使用するかどうか。 |
boolean |
true |
false |
tlsKeyFilePath |
TLSキーファイルへのパス。 |
文字列 |
true |
|
tlsCertificateFilePath |
TLS証明書ファイルへのパス。 |
文字列 |
true |
|
tlsTrustCertsFilePath |
信頼するTLS証明書ファイルへのパス。 |
文字列 |
true |
|
tlsAllowInsecureConnection |
クライアントがブローカーから信頼されていない TLS 証明書を受け入れるかどうか。 |
boolean |
true |
false |
tlsHostnameVerificationEnable |
クライアントがブローカーとTLS接続を作成するときにホスト名を検証するかどうか。 |
boolean |
true |
false |
concurrentLookupRequest |
各ブローカー接続で同時に送信できるルックアップ要求の数。最大値を設定することで、ブローカーに過負荷がかかるのを防ぎます。 |
int |
true |
5000 |
maxLookupRequest |
ブローカーへの過負荷を防ぐため、各ブローカー接続で許可されるルックアップ要求の最大数。 |
int |
true |
50000 |
maxLookupRedirects |
リダイレクトされたルックアップ要求の最大回数。 |
int |
true |
20 |
maxNumberOfRejectedRequestPerConnection |
現在の接続が閉じられ、クライアントが別のブローカーに接続するために新しい接続を作成した後、一定時間内(60秒)にブローカーが拒否したリクエストの最大数。 |
int |
true |
50 |
keepAliveIntervalSeconds |
各クライアント・ブローカー接続のキープアライブインターバルの秒数。 |
int |
true |
30 |
connectionTimeoutMs |
ブローカーからの応答がないまま期間が経過すると、接続の試みは中止されます。 |
int |
true |
10000 |
requestTimeoutMs |
リクエスト完了までの最長時間。 |
int |
true |
60000 |
readTimeoutMs |
リクエストの最大読み取り時間。 |
int |
true |
60000 |
autoCertRefreshSeconds |
証明書の自動更新の秒数。 |
int |
true |
300 |
initialBackoffIntervalNanos |
最初のバックオフ間隔(ナノ秒単位)。 |
long |
true |
100000000 |
maxBackoffIntervalNanos |
最大バックオフ間隔(ナノ秒)。 |
long |
true |
60000000000 |
enableBusyWait |
EpollEventLoopGroupのBusyWaitを有効にするかどうか。 |
boolean |
true |
false |
listenerName |
ルックアップするリスナー名。クライアントは、ネットワークがアクセス可能である限り、 listenerName を使用して、ブローカーへの接続を作成するサービス URL としてリスナーのいずれかを選択できます。"advertisedListeners" はブローカー側で有効になっている必要があります。 |
文字列 |
true |
|
useKeyStoreTls |
KeyStoreの方法でTLSを設定します。 |
boolean |
true |
false |
sslProvider |
内部クライアントが他のPulsarブローカーとの認証に使用するTLSプロバイダー。 |
文字列 |
true |
|
tlsKeyStoreType |
TLS KeyStoreタイプの設定。 |
文字列 |
true |
JKS |
tlsKeyStorePath |
TLS KeyStore のパス。 |
文字列 |
true |
|
tlsKeyStorePassword |
TLS KeyStoreのパスワード。 |
文字列 |
true |
|
tlsTrustStoreType |
TLS TrustStoreタイプの設定。クライアント認証が必要な場合は、この設定を行う必要があります。 |
文字列 |
true |
JKS |
tlsTrustStorePath |
TLS TrustStoreのパス。 |
文字列 |
true |
|
tlsTrustStorePassword |
TLS TrustStoreのパスワード。 |
文字列 |
true |
|
tlsCiphers |
TLS Ciphersのセット。 |
Set |
true |
[] |
tlsProtocols |
TLSのプロトコル。 |
Set |
true |
[] |
memoryLimitBytes |
クライアントのメモリ使用量の上限 (バイト単位)。デフォルトの 64M は高いプロデューサのスループットを保証します。 |
long |
true |
67108864 |
proxyServiceUrl |
proxyServiceUrl と proxyProtocol は相互に含める必要があります。 |
文字列 |
true |
|
proxyProtocol |
プロキシサービスのプロトコル。proxyServiceUrl と proxyProtocol は相互に含める必要があります。 |
ProxyProtocol |
true |
|
enableTransaction |
トランザクションを有効にするかどうか。 |
boolean |
true |
false |
clock |
Clock |
false |
||
dnsLookupBindAddress |
PulsarクライアントのDNSルックアップバインド・アドレス、デフォルトの動作は0.0.0.0にバインドされます。 |
文字列 |
true |
|
dnsLookupBindPort |
dnsLookupBindAddressが構成されている場合に有効となります。デフォルト値は0です。 |
int |
true |
0 |
socks5ProxyAddress |
SOCKS5プロキシのアドレス。 |
InetSocketAddress |
true |
|
socks5ProxyUsername |
SOCKS5プロキシのユーザー名。 |
文字列 |
true |
|
socks5ProxyPassword |
SOCKS5プロキシのパスワード。 |
文字列 |
true |
|
description |
クライアント・バージョンの追加説明。長さは64を超えることはできません。 |
文字列 |
true |
設定ファイルで設定できない (シリアライズできない) 設定プロパティーは、 |
12. さらに詳しく
このガイドでは、Quarkus を使用して Pulsar と対話する方法を説明しました。 Quarkus は、Quarkus Messaging を利用してデータストリーミングアプリケーションを構築します。
さらに詳しく知りたい場合は、Quarkusで使用されている実装、 SmallRye Reactive Messaging のドキュメントを確認してください。