The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

リアクティブメッセージング AMQP1.0 コネクターリファレンスドキュメント

このガイドは、AMQP 1.0 スタートガイド のドキュメントです。リアクティブメッセージング用の AMQP コネクターの設定と使用法について詳しく説明します。

このドキュメントは、コネクターのすべての詳細を網羅しているわけではありません。詳細については、 SmallRye Reactive Messaging website サイトを参照してください。

The AMQP connector allows Quarkus applications to send and receive messages using the AMQP 1.0 protocol. More details about the protocol can be found in the AMQP 1.0 specification. It’s important to note that AMQP 1.0 and AMQP 0.9.1 (implemented by RabbitMQ) are incompatible. Check Using RabbitMQ to get more details.

AMQP コネクターエクステンション

コネクターを使用するには、quarkus-smallrye-reactive-messaging-amqp エクステンションを追加する必要があります。

次のコマンドでプロジェクトにエクステンションを追加することができます。

CLI
quarkus extension add 'quarkus-smallrye-reactive-messaging-amqp'
Maven
./mvnw quarkus:add-extension -Dextensions="quarkus-smallrye-reactive-messaging-amqp"
Gradle
./gradlew addExtension --extensions="quarkus-smallrye-reactive-messaging-amqp"

あるいは、以下の依存関係をプロジェクトに追加するだけです。

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-smallrye-reactive-messaging-amqp")

プロジェクトに追加したら、connector 属性を設定することで、channels を AMQP アドレスにマッピングできます。

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-amqp

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-amqp
コネクターの自動取り付け

クラスパスに単一のコネクターがある場合は、connector 属性の設定を省略できます。Quarkus は、orphan チャネルをクラスパスにある (一意の) コネクターに自動的に関連付けます。Orphans チャネルは、ダウンストリームコンシューマーのない outgoing チャネル、またはアップストリームプロデューサーのない incoming チャネルです。

この自動取り付けは、以下を使用して無効にできます。

quarkus.reactive-messaging.auto-connector-attachment=false

AMQP ブローカーアクセスの設定

AMQP コネクターは、Apache ActiveMQ や Artemis などの AMQP 1.0 ブローカーに接続します。ブローカーの場所と資格情報を設定するには、application.properties に次のプロパティーを追加します。

amqp-host=amqp (1)
amqp-port=5672 (2)
amqp-username=my-username (3)
amqp-password=my-password (4)

mp.messaging.incoming.prices.connector=smallrye-amqp (5)
1 ブローカー/ルーターのホスト名を設定します。チャネルごとに (host 属性を使用して)、または amqp-host を使用してグローバルに実行できます。
2 ブローカー/ルーターポートを設定します。チャネルごとに (port 属性を使用して)、または amqp-port を使用してグローバルに実行できます。デフォルトは 5672 です。
3 必要に応じて、ブローカー/ルーターのユーザー名を設定します。チャネルごとに (username 属性を使用して)、または amqp-username を使用してグローバルに実行できます。
4 必要に応じてブローカー/ルーターのパスワードを設定します。チャネルごとに (password 属性を使用して)、または amqp-password を使用してグローバルに実行できます。
5 価格チャネルに AMQP コネクターで管理するように指示します

開発モードでテストを実行すると、Dev Services for AMQP が自動的に AMQP ブローカーを開始します。

AMQP メッセージの受信

アプリケーションが メッセージ <Double> を受信すると想像してみましょう。ペイロードを直接消費できます。

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class AmqpPriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

または、メッセージ <Double> を取得できます:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class AmqpPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> price) {
        // process your price.

        // Acknowledge the incoming message, marking the AMQP message as `accepted`.
        return price.ack();
    }

}

インバウンドメタデータ

AMQP からのメッセージには、メタデータに IncomingAmqpMetadata のインスタンスが含まれています。

Optional<IncomingAmqpMetadata> metadata = incoming.getMetadata(IncomingAmqpMetadata.class);
metadata.ifPresent(meta -> {
    String address = meta.getAddress();
    String subject = meta.getSubject();
    boolean durable = meta.isDurable();
    // Use io.vertx.core.json.JsonObject
    JsonObject properties = meta.getProperties();
    // ...
});

デシリアライズ

このコネクターは、受信した AMQP Messages をリアクティブメッセージング Message<T> インスタンスに変換します。T` は受信した AMQP メッセージの body に依存します。

The AMQP Type System defines the supported types.

AMQP ボディタイプ <T>

AMQP Value containing a AMQP Primitive Type

対応する Java タイプ

Binary タイプを使用した AMQP 値

byte[]

AMQP シーケンス

List

AMQP データ (バイナリーコンテンツを含む) および content-typeapplication/json に設定されます

JsonObject

異なる content-type を持つ AMQP データ

byte[]

この AMQP コネクター (アウトバウンドコネクター) を使用してオブジェクトを送信すると、JSON としてエンコードされ、バイナリーとして送信されます。content-typeapplication/json に設定されます。したがって、次のようにオブジェクトを再構築できます。

import io.vertx.core.json.JsonObject;
//
@ApplicationScoped
public static class Consumer {

    List<Price> prices = new CopyOnWriteArrayList<>();

    @Incoming("from-amqp") (1)
    public void consume(JsonObject p) {   (2)
        Price price = p.mapTo(Price.class);  (3)
        prices.add(price);
    }

    public List<Price> list() {
        return prices;
    }
}
1 Price インスタンスはコネクターによって自動的に JSON にエンコードされます
2 JsonObject を使用して受け取ることができます
3 次に、mapTo メソッドを使用してインスタンスを再構築できます
mapTo メソッドは Quarkus Jackson マッパーを使用します。マッパーの設定の詳細については、このガイド を確認してください。

謝辞

When a Reactive Messaging Message associated with an AMQP Message is acknowledged, it informs the broker that the message has been accepted.

障害管理

AMQP メッセージから生成されたメッセージが nacked の場合、障害戦略が適用されます。AMQP コネクターは、次の 6 つの戦略をサポートしています。

  • fail - アプリケーションに失敗します。これ以上 AMQP メッセージは処理されません (デフォルト)。AMQP メッセージは拒否済みとしてマークされます。

  • accept - this strategy marks the AMQP message as accepted. The processing continues ignoring the failure. Refer to the accepted delivery state documentation.

  • release - this strategy marks the AMQP message as released. The processing continues with the next message. The broker can redeliver the message. Refer to the released delivery state documentation.

  • reject - this strategy marks the AMQP message as rejected. The processing continues with the next message. Refer to the rejected delivery state documentation.

  • modified-failed - this strategy marks the AMQP message as modified and indicates that it failed (with the delivery-failed attribute). The processing continues with the next message, but the broker may attempt to redeliver the message. Refer to the modified delivery state documentation

  • modified-failed-undeliverable-here - this strategy marks the AMQP message as modified and indicates that it failed (with the delivery-failed attribute). It also indicates that the application cannot process the message, meaning that the broker will not attempt to redeliver the message to this node. The processing continues with the next message. Refer to the modified delivery state documentation

AMQP メッセージの送信

シリアル化

Message<T> を送信するとき、コネクターはメッセージを AMQP Message に変換します。ペイロードは AMQP Message の body に変換されます。

T AMQP メッセージ本文

プリミティブ型または 文字列

ペイロードを含む AMQP 値

Instant または UUID

対応する AMQP タイプを使用した AMQP 値

JsonObject または JsonArray

バイナリーコンテンツを使用した AMQP データ。content-typeapplication/json に設定されます

io.vertx.mutiny.core.buffer.Buffer

バイナリーコンテンツを使用した AMQP データ。content-type が設定されていません

その他のクラス

ペイロードは (Json Mapper を使用して) JSON に変換されます。結果は、バイナリーコンテンツを使用して AMQP データにラップされます。content-typeapplication/json に設定されます

メッセージペイロードを JSON にシリアル化できない場合、メッセージは nacked されます。

アウトバウンドメタデータ

Messages を送信するときに、OutgoingAmqpMetadata のインスタンスを追加して、メッセージが AMQP に送信される方法に影響を与えることができます。たとえば、サブジェクト、プロパティーを設定できます。

 OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withDurable(true)
    .withSubject("my-subject")
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

動的アドレス名

メッセージの宛先を動的に選択することが望ましい場合があります。この場合、アプリケーション設定ファイル内でアドレスを設定するのではなく、アウトバウンドメタデータを使用してアドレスを設定する必要があります。

たとえば、incoming メッセージに基づいて動的アドレスに送信できます。

String addressName = selectAddressFromIncommingMessage(incoming);
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withAddress(addressName)
    .withDurable(true)
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
メッセージごとにアドレスを設定できるようにするために、コネクターは anonymous sender を使用しています。

謝辞

デフォルトでは、ブローカーがメッセージを確認したときに、リアクティブメッセージングの Message が確認されます。ルーターを使用する場合、この確認応答が有効にならない場合があります。この場合、メッセージがルーターに送信されるとすぐにメッセージを確認するように、auto-acknowledgement 属性を設定します。

AMQP メッセージがブローカーによって拒否/解放/変更された場合 (または正常に送信できない場合)、メッセージはナックされます。

バックプレッシャーとクレジット

バックプレッシャーは AMQP credits によって処理されます。アウトバウンドコネクターは、許可されたクレジットの量のみを要求します。クレジットの量が 0 に達すると、ブローカーが AMQP 送信者にさらにクレジットを付与するまで (非ブロッキング方式で) 待機します。

AMQP アドレスの設定

address 属性を使用して AMQP アドレスを設定できます。

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.address=my-queue

mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.outgoing.orders.address=my-order-queue

address 属性が設定されていない場合、コネクターはチャネル名を使用します。

既存のキューを使用するには、addresscontainer-id、およびオプションで link-name 属性を設定する必要があります。たとえば、Apache Artemis ブローカーが次のように設定されている場合:

<queues>
    <queue name="people">
        <address>people</address>
        <durable>true</durable>
        <user>artemis</user>
    </queue>
</queues>

次の設定が必要です。

mp.messaging.outgoing.people.connector=smallrye-amqp
mp.messaging.outgoing.people.durable=true
mp.messaging.outgoing.people.address=people
mp.messaging.outgoing.people.container-id=people

キュー名がチャネル名でない場合は、link-name 属性を設定する必要がある場合があります。

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=people
mp.messaging.outgoing.people-out.container-id=people
mp.messaging.outgoing.people-out.link-name=people

MULTICAST キューを使用するには、キューの名前だけでなく、FQQN(完全修飾キュー名) を指定する必要があります。

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=foo
mp.messaging.outgoing.people-out.container-id=foo

mp.messaging.incoming.people-out.connector=smallrye-amqp
mp.messaging.incoming.people-out.durable=true
mp.messaging.incoming.people-out.address=foo::bar # Note the syntax: address-name::queue-name
mp.messaging.incoming.people-out.container-id=bar
mp.messaging.incoming.people-out.link-name=people

AMQP アドレスモデルの詳細については、 Artemis documentation を参照してください。

実行モデルとブロッキング処理

リアクティブメッセージングは、I/O スレッドでメソッドを呼び出します。このトピックの詳細については、Quarkus Reactive Architecture documentation を参照してください。ただし、多くの場合、リアクティブメッセージングとデータベースインタラクションなどのブロック処理を組み合わせる必要があります。このためには、処理が blocking であり、呼び出し元のスレッドで実行しないことを示す @Blocking アノテーションを使用する必要があります。

例えば、以下のコードは、Hibernate with Panacheを 使用してデータベースに受信ペイロードを格納する方法を示しています。

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;
import javax.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

@Blocking アノテーションは 2 つあります。

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

効果はどちらも同じです。したがって、両方を使うことができます。最初のものは、使用するワーカープールや順序を保持するかどうかなど、より細かい調整が可能です。2 番目のものは、Quarkus の他のリアクティブ機能でも使用され、デフォルトのワーカープールを使用し、順序を保持します。

@Transactional

メソッドに @Transactional アノテーションが付けられている場合、メソッドに @Blocking アノテーションが付けられていなくても、自動的に blocking と見なされます。

基盤となる AMQP クライアントのカスタマイズ

コネクターは、その下にある Vert.xAMQP クライアントを使用します。このクライアントの詳細については、 Vert.x website を参照してください。

次のように AmqpClientOptions のインスタンスを生成することにより、基盤となるクライアント設定をカスタマイズできます。

@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
  // You can use the produced options to configure the TLS connection
  PemKeyCertOptions keycert = new PemKeyCertOptions()
    .addCertPath("./tls/tls.crt")
    .addKeyPath("./tls/tls.key");
  PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");
  return new AmqpClientOptions()
        .setSsl(true)
        .setPemKeyCertOptions(keycert)
        .setPemTrustOptions(trust)
        .addEnabledSaslMechanism("EXTERNAL")
        .setHostnameVerificationAlgorithm("")
        .setConnectTimeout(30000)
        .setReconnectInterval(5000)
        .setContainerId("my-container");
}

このインスタンスが取得され、コネクターが使用するクライアントを設定するために使用されます。client-options-name 属性を使用してクライアントの名前を指定する必要があります。

mp.messaging.incoming.prices.client-options-name=my-named-options

ヘルスレポート

quarkus-smallrye-health エクステンションを備えた AMQP コネクターを使用すると、readiness と liveness のプローブに役立ちます。AMQP コネクターは、コネクターによって管理される各チャネルの readiness と liveness を報告します。現時点では、AMQP コネクターは readiness と liveness のチェックに同じロジックを使用しています。

ヘルスレポートを無効にするには、チャネルの health-enabled 属性を false に設定します。インバウンド側 (AMQP からのメッセージの受信) では、チェックは受信者がブローカーに接続されていることを確認します。アウトバウンド側 (AMQP へのレコードの送信) では、チェックにより、送信者がブローカーに接続されていることが確認されます。

Note that a message processing failures nacks the message, which is then handled by the failure-strategy. It is the responsibility of the failure-strategy to report the failure and influence the outcome of the checks. The fail failure strategy reports the failure, and so the check will report the fault.

RabbitMQ の使用

このコネクターは AMQP 1.0 用です。RabbitMQ は AMQP 0.9.1 を実装します。RabbitMQ はデフォルトで AMQP 1.0 を提供していませんが、そのためのプラグインがあります。このコネクターで RabbitMQ を使用するには、AMQP 1.0 プラグインを有効にして設定します。

プラグインが存在するにもかかわらず、一部の AMQP 1.0 機能は RabbitMQ では機能しません。したがって、次の設定をお勧めします。

RabbitMQ からメッセージを受信するには:

  • 耐久性を false に設定

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.durable=false

RabbitMQ にメッセージを送信するには:

  • 宛先アドレスを設定します (匿名の送信者はサポートされていません)

  • use-anonymous-sender を false に設定します

mp.messaging.outgoing.generated-price.connector=smallrye-amqp
mp.messaging.outgoing.generated-price.address=prices
mp.messaging.outgoing.generated-price.use-anonymous-sender=false

結果として、RabbitMQ を使用する場合、(メッセージメタデータを使用して) 宛先を動的に変更することはできません。

クラウドイベントの受信

AMQP コネクターは Cloud Events をサポートします。コネクターは、structured または binary Cloud Events を検出すると、IncomingCloudEventMetadata<T>Message のメタデータに追加します。IncomingCloudEventMetadata は必須であり、オプションのクラウドイベントの属性へのアクセサを含みます。

コネクターがクラウドイベントメタデータを抽出できない場合、コネクターはメタデータなしでメッセージを送信します。

クラウドイベントの受信の詳細は、SmallRye リアクティブメッセージングのドキュメントの Receiving Cloud Events を参照してください。

クラウドイベントの送信

AMQP コネクターは Cloud Events をサポートします。コネクターは、次の場合にアウトバウンドレコードをクラウドイベントとして送信します。

  • メッセージメタデータには、io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata インスタンスが含まれています。

  • チャネル設定は、cloud-events-type および cloud-events-source 属性を定義します。

クラウドイベントの送信の詳細については、SmallRye リアクティブメッセージングのドキュメントの Sending Cloud Events を参照してください。

AMQP コネクター設定リファレンス

Quarkus 固有の設定

ビルド時に固定される設定プロパティ - それ以外の設定プロパティは実行時に上書き可能

Configuration property

タイプ

デフォルト

If Dev Services for AMQP has been explicitly enabled or disabled. Dev Services are generally enabled by default, unless there is an existing configuration present. For AMQP, Dev Services starts a broker unless amqp-host or amqp-port are set or if all the Reactive Messaging AMQP channel are configured with host or port.

Environment variable: QUARKUS_AMQP_DEVSERVICES_ENABLED

boolean

Optional fixed port the dev service will listen to. If not defined, the port will be chosen randomly.

Environment variable: QUARKUS_AMQP_DEVSERVICES_PORT

int

The image to use. Note that only ActiveMQ Artemis images are supported. Specifically, the image repository must end with artemiscloud/activemq-artemis-broker. Check https://quay.io/repository/artemiscloud/activemq-artemis-broker to find the available versions.

Environment variable: QUARKUS_AMQP_DEVSERVICES_IMAGE_NAME

string

quay.io/artemiscloud/activemq-artemis-broker:0.1.2

The value of the AMQ_EXTRA_ARGS environment variable to pass to the container.

Environment variable: QUARKUS_AMQP_DEVSERVICES_EXTRA_ARGS

string

--no-autotune --mapped --no-fsync

Indicates if the AMQP broker managed by Quarkus Dev Services is shared. When shared, Quarkus looks for running containers using label-based service discovery. If a matching container is found, it is used, and so a second one is not started. Otherwise, Dev Services for AMQP starts a new container. The discovery uses the quarkus-dev-service-amqp label. The value is configured using the service-name property. Container sharing is only used in dev mode.

Environment variable: QUARKUS_AMQP_DEVSERVICES_SHARED

boolean

true

The value of the quarkus-dev-service-aqmp label attached to the started container. This property is used when shared is set to true. In this case, before starting a container, Dev Services for AMQP looks for a container with the quarkus-dev-service-amqp label set to the configured value. If found, it will use this container instead of starting a new one. Otherwise, it starts a new container with the quarkus-dev-service-amqp label set to the specified value. This property is used when you need multiple shared AMQP brokers.

Environment variable: QUARKUS_AMQP_DEVSERVICES_SERVICE_NAME

string

amqp

受信チャネルの設定

属性 (alias) 説明 必須 デフォルト

address

AMQP アドレス。設定されていない場合、チャネル名が使用されます

タイプ: string

false

auto-acknowledgement

受信した AMQP メッセージを受信時に確認する必要があるかどうか

タイプ: boolean

false

false

broadcast

受信した AMQP メッセージを複数の subscribers にディスパッチする必要があるかどうか

タイプ: boolean

false

false

client-options-name

(amqp-client-options-name)

AMQP クライアント設定をカスタマイズするために使用される AMQP クライアントオプション Bean の名前

タイプ: string

false

cloud-events

クラウド イベント サポートを有効(デフォルト)または無効にします。 incoming チャネルで有効にすると、コネクタは受信レコードを分析し、Cloud Event メタデータの作成を試みます。 outgoing 側で有効にすると、メッセージに Cloud Event Metadata が含まれている場合、コネクタはoutgoingメッセージを Cloud Event として送信します。

Type: boolean

false

true

connect-timeout

(amqp-connect-timeout)

ミリ秒単位の接続タイムアウト

タイプ: int

false

1000

container-id

AMQP コンテナー ID

タイプ: string

false

durable

AMQP サブスクリプションが永続的かどうか

タイプ: boolean

false

false

failure-strategy

AMQP メッセージから生成されたメッセージがナックされたときに適用する失敗戦略を指定します。受け入れられる値は、fail(デフォルト)、acceptreleaserejectmodified-failedmodified-failed-undeliverable-here です。

タイプ: string

false

fail

health-timeout

readiness チェックのためにブローカーとの接続がまだ確立されているかどうかを判別するために待機する最大秒数。そのしきい値を超えると、チェックは失敗したと見なされます。

タイプ: int

false

3

host

(amqp-host)

ブローカーのホスト名

タイプ: string

false

localhost

link-name

リンクの名前。設定されていない場合、チャネル名が使用されます。

タイプ: string

false

password

(amqp-password)

ブローカーへの認証に使用されるパスワード

タイプ: string

false

port

(amqp-port)

ブローカーポート

タイプ: int

false

5672

reconnect-attempts

(amqp-reconnect-attempts)

再接続の試行回数

タイプ: int

false

100

reconnect-interval

(amqp-reconnect-interval)

2 回の再接続試行間の秒間隔

タイプ: int

false

10

sni-server-name

(amqp-sni-server-name)

設定されている場合、TLS SNI サーバー名に使用するホスト名を明示的にオーバーライドします

タイプ: string

false

tracing-enabled

トレースを有効(デフォルト)にするか、無効にするか

Type: boolean

false

true

use-ssl

(amqp-use-ssl)

AMQP 接続が SSL/TLS を使用するかどうか

タイプ: boolean

false

false

username

(amqp-username)

ブローカーへの認証に使用されるユーザー名

タイプ: string

false

virtual-host

(amqp-virtual-host)

設定されている場合は、接続に使用されるホスト名の値を設定します AMQP オープンフレームと TLS SNI サーバー名 (TLS が使用されている場合)

タイプ: string

false

発信チャネル設定

属性 (alias) 説明 必須 デフォルト

address

AMQP アドレス。設定されていない場合、チャネル名が使用されます

タイプ: string

false

client-options-name

(amqp-client-options-name)

AMQP クライアント設定をカスタマイズするために使用される AMQP クライアントオプション Bean の名前

タイプ: string

false

cloud-events

クラウド イベント サポートを有効(デフォルト)または無効にします。 incoming チャネルで有効にすると、コネクタは受信レコードを分析し、Cloud Event メタデータの作成を試みます。 outgoing 側で有効にすると、メッセージに Cloud Event Metadata が含まれている場合、コネクタはoutgoingメッセージを Cloud Event として送信します。

Type: boolean

false

true

cloud-events-data-content-type

(cloud-events-default-data-content-type)

outgoing Cloud Eventのデフォルトの datacontenttype 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが datacontenttype 属性を設定していない場合に使用されます。

Type: string

false

cloud-events-data-schema

(cloud-events-default-data-schema)

outgoing Cloud Eventのデフォルトの dataschema 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが dataschema 属性を設定していない場合に使用されます。

Type: string

false

cloud-events-insert-timestamp

(cloud-events-default-timestamp)

Whether the connector should insert automatically the time attribute into the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the time attribute itself

Type: boolean

false

true

cloud-events-mode

Cloud Eventのモード( structured または binary (デフォルト))。outgoing レコードにCloud Eventをどのように書き込むかを示します

Type: string

false

binary

cloud-events-source

(cloud-events-default-source)

outgoing Cloud Eventのデフォルトの source 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが source 属性を設定していない場合に使用されます。

Type: string

false

cloud-events-subject

(cloud-events-default-subject)

outgoing Cloud Eventのデフォルトの subject 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが subject 属性を設定していない場合に使用されます。

Type: string

false

cloud-events-type

(cloud-events-default-type)

outgoing Cloud Eventのデフォルトの type 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが type 属性を設定していない場合に使用されます。

Type: string

false

connect-timeout

(amqp-connect-timeout)

ミリ秒単位の接続タイムアウト

タイプ: int

false

1000

container-id

AMQP コンテナー ID

タイプ: string

false

credit-retrieval-period

ブローカーによって付与されたクレジットを取得する 2 つの試行の間の期間 (ミリ秒単位)。この時間は、送信者がクレジットを使い果たしたときに使用されます。

タイプ: int

false

2000

durable

送信された AMQP メッセージが耐久性があるとマークされているかどうか

タイプ: boolean

false

false

health-timeout

readiness チェックのためにブローカーとの接続がまだ確立されているかどうかを判別するために待機する最大秒数。そのしきい値を超えると、チェックは失敗したと見なされます。

タイプ: int

false

3

host

(amqp-host)

ブローカーのホスト名

タイプ: string

false

localhost

link-name

リンクの名前。設定されていない場合、チャネル名が使用されます。

タイプ: string

false

merge

コネクタが複数のアップストリームを許可するかどうか

Type: boolean

false

false

password

(amqp-password)

ブローカーへの認証に使用されるパスワード

タイプ: string

false

port

(amqp-port)

ブローカーポート

タイプ: int

false

5672

reconnect-attempts

(amqp-reconnect-attempts)

再接続の試行回数

タイプ: int

false

100

reconnect-interval

(amqp-reconnect-interval)

2 回の再接続試行間の秒間隔

タイプ: int

false

10

sni-server-name

(amqp-sni-server-name)

設定されている場合、TLS SNI サーバー名に使用するホスト名を明示的にオーバーライドします

タイプ: string

false

tracing-enabled

トレースを有効(デフォルト)にするか、無効にするか

Type: boolean

false

true

ttl

The time-to-live of the sent AMQP messages. 0 to disable the TTL

Type: long

false

0

use-anonymous-sender

Whether the connector should use an anonymous sender. Default value is true if the broker supports it, false otherwise. If not supported, it is not possible to dynamically change the destination address.

Type: boolean

false

use-ssl

(amqp-use-ssl)

AMQP 接続が SSL/TLS を使用するかどうか

タイプ: boolean

false

false

username

(amqp-username)

ブローカーへの認証に使用されるユーザー名

タイプ: string

false

virtual-host

(amqp-virtual-host)

設定されている場合は、接続に使用されるホスト名の値を設定します AMQP オープンフレームと TLS SNI サーバー名 (TLS が使用されている場合)

タイプ: string

false