The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

リアクティブメッセージング AMQP1.0 コネクターリファレンスドキュメント

このガイドは、AMQP 1.0 スタートガイド のドキュメントです。リアクティブメッセージング用の AMQP コネクターの設定と使用法について詳しく説明します。

このドキュメントは、コネクターのすべての詳細を網羅しているわけではありません。詳細については、 SmallRye Reactive Messaging website サイトを参照してください。

AMQP コネクターを使用すると、Quarkus アプリケーションは AMQP1.0 プロトコルを使用してメッセージを送受信できます。プロトコルの詳細については、 the AMQP 1.0 specification を参照してください。AMQP 1.0 と AMQP 0.9.1 (RabbitMQ によって実装) には互換性がないことに注意することが重要です。詳細は、RabbitMQ の使用 を参照してください。

AMQP コネクターエクステンション

To use the connector, you need to add the quarkus-messaging-amqp extension.

次のコマンドでプロジェクトにエクステンションを追加することができます:

コマンドラインインタフェース
quarkus extension add quarkus-messaging-amqp
Maven
./mvnw quarkus:add-extension -Dextensions='quarkus-messaging-amqp'
Gradle
./gradlew addExtension --extensions='quarkus-messaging-amqp'

あるいは、以下の依存関係をプロジェクトに追加するだけです:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-amqp</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-amqp")

プロジェクトに追加したら、connector 属性を設定することで、channels を AMQP アドレスにマッピングできます。

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-amqp

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-amqp
コネクターの自動アタッチ

クラスパスに単一のコネクターがある場合は、connector 属性の設定を省略できます。Quarkus は、orphan チャネルをクラスパスにある (一意の) コネクターに自動的に関連付けます。Orphans チャネルは、ダウンストリームコンシューマーのない outgoing チャネル、またはアップストリームプロデューサーのない incoming チャネルです。

この自動アタッチは、以下を使用して無効にできます。

quarkus.messaging.auto-connector-attachment=false

AMQP Broker のアクセス設定

AMQP コネクターは、Apache ActiveMQ や Artemis などの AMQP 1.0 ブローカーに接続します。ブローカーの場所と資格情報を設定するには、application.properties に次のプロパティーを追加します。

amqp-host=amqp (1)
amqp-port=5672 (2)
amqp-username=my-username (3)
amqp-password=my-password (4)

mp.messaging.incoming.prices.connector=smallrye-amqp (5)
1 ブローカー/ルーターのホスト名を設定します。チャネルごとに (host 属性を使用して)、または amqp-host を使用してグローバルに実行できます。
2 ブローカー/ルーターポートを設定します。チャネルごとに (port 属性を使用して)、または amqp-port を使用してグローバルに実行できます。デフォルトは 5672 です。
3 必要に応じて、ブローカー/ルーターのユーザー名を設定します。チャネルごとに (username 属性を使用して)、または amqp-username を使用してグローバルに実行できます。
4 必要に応じてブローカー/ルーターのパスワードを設定します。チャネルごとに (password 属性を使用して)、または amqp-password を使用してグローバルに実行できます。
5 価格チャネルに AMQP コネクターで管理するように指示します

開発モードでテストを実行すると、Dev Services for AMQP が自動的に AMQP ブローカーを開始します。

AMQP メッセージの受信

アプリケーションが メッセージ <Double> を受信すると想像してみましょう。ペイロードを直接消費できます。

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class AmqpPriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

または、Message<Double> を取得することができます。

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class AmqpPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> price) {
        // process your price.

        // Acknowledge the incoming message, marking the AMQP message as `accepted`.
        return price.ack();
    }

}

インバウンドメタデータ

AMQP からのメッセージには、メタデータに IncomingAmqpMetadata のインスタンスが含まれています。

Optional<IncomingAmqpMetadata> metadata = incoming.getMetadata(IncomingAmqpMetadata.class);
metadata.ifPresent(meta -> {
    String address = meta.getAddress();
    String subject = meta.getSubject();
    boolean durable = meta.isDurable();
    // Use io.vertx.core.json.JsonObject
    JsonObject properties = meta.getProperties();
    // ...
});

デシリアライズ

このコネクターは、受信した AMQP Messages をリアクティブメッセージング Message<T> インスタンスに変換します。T` は受信した AMQP メッセージの body に依存します。

AMQP Type System は、サポートされるタイプを定義します。

AMQP ボディ型 <T>

AMQP Primitive Type を含む AMQP 値

対応する Java の型

Binary タイプを使用した AMQP 値

byte[]

AMQP シーケンス

List

AMQP データ (バイナリーコンテンツ) で、content-typeapplication/json に設定されている場合。

JsonObject

異なる content-type を持つ AMQP データ

byte[]

この AMQP コネクター (アウトバウンドコネクター) を使用してオブジェクトを送信すると、JSON としてエンコードされ、バイナリーとして送信されます。content-typeapplication/json に設定されます。したがって、次のようにオブジェクトを再構築できます。

import io.vertx.core.json.JsonObject;
//
@ApplicationScoped
public static class Consumer {

    List<Price> prices = new CopyOnWriteArrayList<>();

    @Incoming("from-amqp") (1)
    public void consume(JsonObject p) {   (2)
        Price price = p.mapTo(Price.class);  (3)
        prices.add(price);
    }

    public List<Price> list() {
        return prices;
    }
}
1 Price インスタンスはコネクターによって自動的に JSON にエンコードされます。
2 それを、JsonObject を使用して受け取ることができます。
3 その後、mapTo メソッドを使用してインスタンスを再構築できます。
mapTo メソッドは Quarkus Jackson マッパーを使用します。マッパーの設定について、詳しくは このガイド を参照してください。

確認

AMQP メッセージに関連付けられたリアクティブメッセージングメッセージが確認されると、そのメッセージが accepted されたことをブローカーに通知します。

失敗の管理

AMQP メッセージから生成されたメッセージが nacked の場合、障害戦略が適用されます。AMQP コネクターは、次の 6 つの戦略をサポートしています。

  • fail - アプリケーションに失敗します。これ以上 AMQP メッセージは処理されません (デフォルト)。AMQP メッセージは拒否済みとしてマークされます。

  • accept - この戦略は、AMQP メッセージを accepted としてマークします。処理は失敗を無視して続行します。https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-accepted[accepted delivery state documentation] を参照してください。

  • release - この戦略は、AMQP メッセージを released としてマークします。処理は次のメッセージに進みます。ブローカーはメッセージを再配信できます。https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-released[released delivery state documentation] を参照してください。

  • reject - この戦略は、AMQP メッセージを拒否としてマークします。処理は次のメッセージに進みます。https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-rejected[rejected delivery state documentation] を参照してください。

  • modified-failed - この戦略は、AMQP メッセージを modified としてマークし、失敗したことを示します (delivery-failed 属性を使用)。処理は次のメッセージから続行されますが、ブローカーはメッセージの再配信を試みる可能性があります。https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-modified[modified delivery state documentation] を参照してください

  • modified-failed-undeliverable-here - この戦略は、AMQP メッセージを modified としてマークし、失敗したことを示します ( delivery-failed 属性を使用)。また、アプリケーションがメッセージを処理できないことも示しています。つまり、ブローカーはメッセージをこのノードに再配信しようとはしません。処理は次のメッセージに進みます。https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-modified[modified delivery state documentation] を参照してください

AMQP メッセージの送信

シリアル化

Message<T> を送信するとき、コネクターはメッセージを AMQP Message に変換します。ペイロードは AMQP Message の body に変換されます。

T AMQP メッセージボディ

プリミティブ型または 文字列

ペイロードを含む AMQP 値

Instant または UUID

対応する AMQP タイプを使用した AMQP 値

JsonObject または JsonArray

バイナリーコンテンツを使用した AMQP データ。content-typeapplication/json に設定されます

io.vertx.mutiny.core.buffer.Buffer

バイナリーコンテンツを使用した AMQP データ。content-type が設定されていません

その他のクラス

ペイロードは (Json Mapper を使用して) JSON に変換されます。結果は、バイナリーコンテンツを使用して AMQP データにラップされます。content-typeapplication/json に設定されます

メッセージペイロードを JSON にシリアライズできない場合、メッセージは nack されます。

アウトバウンドメタデータ

Messages を送信するときに、OutgoingAmqpMetadata のインスタンスを追加して、メッセージが AMQP に送信される方法に影響を与えることができます。たとえば、サブジェクト、プロパティーを設定できます。

 OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withDurable(true)
    .withSubject("my-subject")
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

動的アドレス名

メッセージの宛先を動的に選択することが望ましい場合があります。この場合、アプリケーション設定ファイル内でアドレスを設定するのではなく、アウトバウンドメタデータを使用してアドレスを設定する必要があります。

たとえば、incoming メッセージに基づいて動的アドレスに送信できます。

String addressName = selectAddressFromIncommingMessage(incoming);
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withAddress(addressName)
    .withDurable(true)
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
メッセージごとにアドレスを設定できるようにするために、コネクターは anonymous sender を使用しています。

確認

デフォルトでは、ブローカーがメッセージを確認したときに、リアクティブメッセージングの Message が確認されます。ルーターを使用する場合、この確認応答が有効にならない場合があります。この場合、メッセージがルーターに送信されるとすぐにメッセージを確認するように、auto-acknowledgement 属性を設定します。

AMQP メッセージがブローカーによって拒否/解放/変更された場合 (または正常に送信できない場合)、メッセージはナックされます。

バックプレッシャーとクレジット

バックプレッシャーは AMQP credits によって処理されます。アウトバウンドコネクターは、許可されたクレジットの量のみを要求します。クレジットの量が 0 に達すると、ブローカーが AMQP 送信者にさらにクレジットを付与するまで (非ブロッキング方式で) 待機します。

AMQP アドレスの設定

address 属性を使用して AMQP アドレスを設定できます。

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.address=my-queue

mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.outgoing.orders.address=my-order-queue

address 属性が設定されていない場合、コネクターはチャネル名を使用します。

既存のキューを使用するには、addresscontainer-id、およびオプションで link-name 属性を設定する必要があります。たとえば、Apache Artemis ブローカーが次のように設定されている場合:

<queues>
    <queue name="people">
        <address>people</address>
        <durable>true</durable>
        <user>artemis</user>
    </queue>
</queues>

次の設定が必要です。

mp.messaging.outgoing.people.connector=smallrye-amqp
mp.messaging.outgoing.people.durable=true
mp.messaging.outgoing.people.address=people
mp.messaging.outgoing.people.container-id=people

キュー名がチャネル名でない場合は、link-name 属性を設定する必要がある場合があります。

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=people
mp.messaging.outgoing.people-out.container-id=people
mp.messaging.outgoing.people-out.link-name=people

MULTICAST キューを使用するには、キューの名前だけでなく、FQQN(完全修飾キュー名) を指定する必要があります。

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=foo
mp.messaging.outgoing.people-out.container-id=foo

mp.messaging.incoming.people-out.connector=smallrye-amqp
mp.messaging.incoming.people-out.durable=true
mp.messaging.incoming.people-out.address=foo::bar # Note the syntax: address-name::queue-name
mp.messaging.incoming.people-out.container-id=bar
mp.messaging.incoming.people-out.link-name=people

AMQP アドレスモデルの詳細については、 Artemis documentation を参照してください。

実行モデルとブロッキング処理

リアクティブメッセージングは、I/O スレッドでメソッドを呼び出します。このトピックの詳細については、xQuarkus リアクティブアーキテクチャーのドキュメント を参照してください。ただし、多くの場合、リアクティブメッセージングとデータベースインタラクションなどのブロック処理を組み合わせる必要があります。このためには、処理が ブロッキング であり、呼び出し元のスレッドで実行するべきではないことを示す @Blocking アノテーションを使用する必要があります。

例えば、以下のコードは、Hibernate with Panacheを 使用してデータベースに受信ペイロードを格納する方法を示しています。

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

@Blocking アノテーションは 2 つあります。

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

効果はどちらも同じです。したがって、両方を使うことができます。最初のものは、使用するワーカープールや順序を保持するかどうかなど、より細かい調整が可能です。2 番目のものは、Quarkus の他のリアクティブ機能でも使用され、デフォルトのワーカープールを使用し、順序を保持します。

@RunOnVirtualThread

Java 仮想スレッドでの ブロッキング処理の実行については、 Quarkus Virtual Thread support with Reactive Messagingのドキュメント を参照してください。

@Transactional

メソッドに @Transactional アノテーションが付けられている場合、メソッドに @Blocking アノテーションが付けられていなくても、自動的に blocking と見なされます。

基盤となる AMQP クライアントのカスタマイズ

コネクターは、その下にある Vert.xAMQP クライアントを使用します。このクライアントの詳細については、 Vert.x website を参照してください。

次のように AmqpClientOptions のインスタンスを生成することにより、基盤となるクライアント設定をカスタマイズできます。

@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
  // You can use the produced options to configure the TLS connection
  PemKeyCertOptions keycert = new PemKeyCertOptions()
    .addCertPath("./tls/tls.crt")
    .addKeyPath("./tls/tls.key");
  PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");
  return new AmqpClientOptions()
        .setSsl(true)
        .setPemKeyCertOptions(keycert)
        .setPemTrustOptions(trust)
        .addEnabledSaslMechanism("EXTERNAL")
        .setHostnameVerificationAlgorithm("") // Disables the hostname verification. Defaults is "HTTPS"
        .setConnectTimeout(30000)
        .setReconnectInterval(5000)
        .setContainerId("my-container");
}

このインスタンスが取得され、コネクターが使用するクライアントを設定するために使用されます。client-options-name 属性を使用してクライアントの名前を指定する必要があります。

mp.messaging.incoming.prices.client-options-name=my-named-options

ヘルスレポート

quarkus-smallrye-health エクステンションを備えた AMQP コネクターを使用すると、readiness と liveness のプローブに役立ちます。AMQP コネクターは、コネクターによって管理される各チャネルの readiness と liveness を報告します。現時点では、AMQP コネクターは readiness と liveness のチェックに同じロジックを使用しています。

ヘルスレポートを無効にするには、チャネルの health-enabled 属性を false に設定します。インバウンド側 (AMQP からのメッセージの受信) では、チェックは受信者がブローカーに接続されていることを確認します。アウトバウンド側 (AMQP へのレコードの送信) では、チェックにより、送信者がブローカーに接続されていることが確認されます。

メッセージ処理の失敗はメッセージをnackし、failure-strategy で処理されることに注意してください。失敗を報告し、チェックの結果に影響を与えるのは failure-strategy です。失敗戦略の fail は失敗を報告し、チェックも失敗を報告します。

RabbitMQ の使用

このコネクターは AMQP 1.0 用です。RabbitMQ は AMQP 0.9.1 を実装します。RabbitMQ はデフォルトで AMQP 1.0 を提供していませんが、そのためのプラグインがあります。このコネクターで RabbitMQ を使用するには、AMQP 1.0 プラグインを有効にして設定します。

プラグインが存在するにもかかわらず、一部の AMQP 1.0 機能は RabbitMQ では機能しません。したがって、次の設定をお勧めします。

RabbitMQ からのメッセージを受信する場合。

  • 耐久性を false に設定

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.durable=false

RabbitMQ にメッセージを送信する場合。

  • 宛先アドレスの設定 (匿名送信はサポートされていません)

  • use-anonymous-sender を false に設定します

mp.messaging.outgoing.generated-price.connector=smallrye-amqp
mp.messaging.outgoing.generated-price.address=prices
mp.messaging.outgoing.generated-price.use-anonymous-sender=false

結果として、RabbitMQ を使用する場合、(メッセージメタデータを使用して) 宛先を動的に変更することはできません。

クラウドイベントの受信

AMQP コネクターは Cloud Events をサポートします。コネクターは、structured または binary Cloud Events を検出すると、IncomingCloudEventMetadata<T>Message のメタデータに追加します。IncomingCloudEventMetadata は必須であり、オプションのクラウドイベントの属性へのアクセサを含みます。

コネクターがクラウドイベントメタデータを抽出できない場合、コネクターはメタデータなしでメッセージを送信します。

クラウドイベントの受信の詳細は、SmallRye リアクティブメッセージングのドキュメントの Receiving Cloud Events を参照してください。

クラウドイベントの送信

AMQP コネクターは Cloud Events をサポートします。コネクターは、次の場合にアウトバウンドレコードをクラウドイベントとして送信します。

  • メッセージメタデータには、io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata インスタンスが含まれています。

  • チャネル設定は、cloud-events-type および cloud-events-source 属性を定義します。

クラウドイベントの送信の詳細については、SmallRye リアクティブメッセージングのドキュメントの Sending Cloud Events を参照してください。

AMQP コネクター設定リファレンス

Quarkus 固有の設定

ビルド時に固定される構成プロパティ - 他のすべての構成プロパティは実行時にオーバーライド可能

Configuration property

デフォルト

If Dev Services for AMQP has been explicitly enabled or disabled. Dev Services are generally enabled by default, unless there is an existing configuration present. For AMQP, Dev Services starts a broker unless amqp-host or amqp-port are set or if all the Reactive Messaging AMQP channel are configured with host or port.

Environment variable: QUARKUS_AMQP_DEVSERVICES_ENABLED

Show more

boolean

Optional fixed port the dev service will listen to.

If not defined, the port will be chosen randomly.

Environment variable: QUARKUS_AMQP_DEVSERVICES_PORT

Show more

int

The image to use. Note that only ActiveMQ Artemis images are supported. Specifically, the image repository must end with artemiscloud/activemq-artemis-broker.

Check the activemq-artemis-broker on Quay page to find the available versions.

Environment variable: QUARKUS_AMQP_DEVSERVICES_IMAGE_NAME

Show more

string

quay.io/artemiscloud/activemq-artemis-broker:1.0.25

The value of the AMQ_EXTRA_ARGS environment variable to pass to the container. For ActiveMQ Artemis Broker ⇐ 1.0.21, set this property to --no-autotune --mapped --no-fsync --relax-jolokia --http-host 0.0.0.0

Environment variable: QUARKUS_AMQP_DEVSERVICES_EXTRA_ARGS

Show more

string

--no-autotune --mapped --no-fsync --relax-jolokia

Indicates if the AMQP broker managed by Quarkus Dev Services is shared. When shared, Quarkus looks for running containers using label-based service discovery. If a matching container is found, it is used, and so a second one is not started. Otherwise, Dev Services for AMQP starts a new container.

The discovery uses the quarkus-dev-service-amqp label. The value is configured using the service-name property.

Container sharing is only used in dev mode.

Environment variable: QUARKUS_AMQP_DEVSERVICES_SHARED

Show more

boolean

true

The value of the quarkus-dev-service-aqmp label attached to the started container. This property is used when shared is set to true. In this case, before starting a container, Dev Services for AMQP looks for a container with the quarkus-dev-service-amqp label set to the configured value. If found, it will use this container instead of starting a new one. Otherwise, it starts a new container with the quarkus-dev-service-amqp label set to the specified value.

This property is used when you need multiple shared AMQP brokers.

Environment variable: QUARKUS_AMQP_DEVSERVICES_SERVICE_NAME

Show more

string

amqp

Environment variables that are passed to the container.

Environment variable: QUARKUS_AMQP_DEVSERVICES_CONTAINER_ENV

Show more

Map<String,String>

受信チャネルの設定

属性 (alias) Description 必須 デフォルト

address

AMQP アドレス。設定されていない場合、チャネル名が使用されます

タイプ: string

false

auto-acknowledgement

受信した AMQP メッセージを受信時に確認する必要があるかどうか

タイプ: boolean

false

false

broadcast

受信した AMQP メッセージを複数の subscribers にディスパッチする必要があるかどうか

タイプ: boolean

false

false

capabilities

送信側または受信側のクライアントが提案する能力のカンマ区切りのリスト。

Type: string

false

client-options-name

(amqp-client-options-name)

AMQP クライアント設定をカスタマイズするために使用される AMQP クライアントオプション Bean の名前

タイプ: string

false

cloud-events

クラウド イベント サポートを有効(デフォルト)または無効にします。 incoming チャネルで有効にすると、コネクタは受信レコードを分析し、Cloud Event メタデータの作成を試みます。 outgoing 側で有効にすると、メッセージに Cloud Event Metadata が含まれている場合、コネクタはoutgoingメッセージを Cloud Event として送信します。

タイプ: boolean

false

true

connect-timeout

(amqp-connect-timeout)

ミリ秒単位の接続タイムアウト

タイプ: int

false

1000

container-id

AMQP コンテナー ID

タイプ: string

false

durable

AMQP サブスクリプションが永続的であるかどうか

タイプ: boolean

false

false

failure-strategy

AMQP メッセージから生成されたメッセージがナックされたときに適用する失敗の戦略を指定します。指定できる値は fail (デフォルト)、acceptreleaserejectmodified-failedmodified-failed-undeliverable-here です。

タイプ: string

false

fail

health-timeout

readiness チェックのためにブローカーとの接続がまだ確立されているかどうかを判別するために待機する最大秒数。そのしきい値を超えると、チェックは失敗したと見なされます。

タイプ: int

false

3

host

(amqp-host)

ブローカーのホスト名

タイプ: string

false

localhost

link-name

リンクの名前。設定されていない場合、チャネル名が使用されます。

タイプ: string

false

password

(amqp-password)

ブローカーへの認証に使用されるパスワード

タイプ: string

false

port

(amqp-port)

ブローカーポート

タイプ: int

false

5672

reconnect-attempts

(amqp-reconnect-attempts)

再接続の試行回数

タイプ: int

false

100

reconnect-interval

(amqp-reconnect-interval)

2 回の再接続試行の間隔 (秒)

タイプ: int

false

10

sni-server-name

(amqp-sni-server-name)

設定されている場合、TLS SNI サーバー名に使用するホスト名を明示的にオーバーライドします

タイプ: string

false

selector

メッセージセレクタを設定する。この属性は、送信元端末に apache.org:selector-filter:string フィルタを定義するために使用されます。SQL ベースの構文を使用して、受信者に配信されるメッセージをサーバフィルタに要求します(該当するサーバでサポートされている場合)。サポートされる機能や必要な構文は、サーバーによって異なる場合があります。

Type: string

false

tracing-enabled

トレースを有効(デフォルト)にするか、無効にするか

タイプ: boolean

false

true

use-ssl

(amqp-use-ssl)

AMQP 接続が SSL/TLS を使用するかどうか

タイプ: boolean

false

false

username

(amqp-username)

ブローカーへの認証に使用されるユーザー名

タイプ: string

false

virtual-host

(amqp-virtual-host)

設定されている場合は、接続に使用されるホスト名の値を設定します AMQP オープンフレームと TLS SNI サーバー名 (TLS が使用されている場合)

タイプ: string

false

発信チャネル設定

属性 (alias) Description 必須 デフォルト

address

AMQP アドレス。設定されていない場合、チャネル名が使用されます

タイプ: string

false

capabilities

送信側または受信側のクライアントが提案する能力のカンマ区切りのリスト。

Type: string

false

client-options-name

(amqp-client-options-name)

AMQP クライアント設定をカスタマイズするために使用される AMQP クライアントオプション Bean の名前

タイプ: string

false

cloud-events

クラウド イベント サポートを有効(デフォルト)または無効にします。 incoming チャネルで有効にすると、コネクタは受信レコードを分析し、Cloud Event メタデータの作成を試みます。 outgoing 側で有効にすると、メッセージに Cloud Event Metadata が含まれている場合、コネクタはoutgoingメッセージを Cloud Event として送信します。

タイプ: boolean

false

true

cloud-events-data-content-type

(cloud-events-default-data-content-type)

outgoing Cloud Eventのデフォルトの datacontenttype 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが datacontenttype 属性を設定していない場合に使用されます。

タイプ: string

false

cloud-events-data-schema

(cloud-events-default-data-schema)

outgoing Cloud Eventのデフォルトの dataschema 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが dataschema 属性を設定していない場合に使用されます。

タイプ: string

false

cloud-events-insert-timestamp

(cloud-events-default-timestamp)

コネクターが、送信するクラウドイベントに自動的に time 属性を挿入するかどうかを指定します。cloud-eventstrue に設定されていることが必要です。この値は、メッセージに time 属性が設定されていない場合に使用されます。

タイプ: boolean

false

true

cloud-events-mode

Cloud Eventのモード( structured または binary (デフォルト))。outgoing レコードにCloud Eventをどのように書き込むかを示します

タイプ: string

false

binary

cloud-events-source

(cloud-events-default-source)

outgoing Cloud Eventのデフォルトの source 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが source 属性を設定していない場合に使用されます。

タイプ: string

false

cloud-events-subject

(cloud-events-default-subject)

outgoing Cloud Eventのデフォルトの subject 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが subject 属性を設定していない場合に使用されます。

タイプ: string

false

cloud-events-type

(cloud-events-default-type)

outgoing Cloud Eventのデフォルトの type 属性を設定します。 cloud-eventstrue を設定する必要があります。この値は、メッセージが type 属性を設定していない場合に使用されます。

タイプ: string

false

connect-timeout

(amqp-connect-timeout)

ミリ秒単位の接続タイムアウト

タイプ: int

false

1000

container-id

AMQP コンテナー ID

タイプ: string

false

credit-retrieval-period

ブローカーによって付与されたクレジットを取得する 2 つの試行の間の期間 (ミリ秒単位)。この時間は、送信者がクレジットを使い果たしたときに使用されます。

タイプ: int

false

2000

durable

送信された AMQP メッセージが耐久性があるとマークされているかどうか

タイプ: boolean

false

false

health-timeout

readiness チェックのためにブローカーとの接続がまだ確立されているかどうかを判別するために待機する最大秒数。そのしきい値を超えると、チェックは失敗したと見なされます。

タイプ: int

false

3

host

(amqp-host)

ブローカーのホスト名

タイプ: string

false

localhost

link-name

リンクの名前。設定されていない場合、チャネル名が使用されます。

タイプ: string

false

merge

コネクタが複数のアップストリームを許可するかどうか

タイプ: boolean

false

false

password

(amqp-password)

ブローカーへの認証に使用されるパスワード

タイプ: string

false

port

(amqp-port)

ブローカーポート

タイプ: int

false

5672

reconnect-attempts

(amqp-reconnect-attempts)

再接続の試行回数

タイプ: int

false

100

reconnect-interval

(amqp-reconnect-interval)

2 回の再接続試行の間隔 (秒)

タイプ: int

false

10

sni-server-name

(amqp-sni-server-name)

設定されている場合、TLS SNI サーバー名に使用するホスト名を明示的にオーバーライドします

タイプ: string

false

tracing-enabled

トレースを有効(デフォルト)にするか、無効にするか

タイプ: boolean

false

true

ttl

送信した AMQP メッセージの TTL (time-to-live)。TTL を無効にする場合は 0

タイプ: long

false

0

use-anonymous-sender

コネクターが匿名送信者を使用するかどうか。デフォルト値は、ブローカーがサポートしている場合は true で、それ以外の場合は false です。サポートしていない場合は、宛先アドレスを動的に変更することができません。 タイプです。

タイプ: boolean

false

use-ssl

(amqp-use-ssl)

AMQP 接続が SSL/TLS を使用するかどうか

タイプ: boolean

false

false

username

(amqp-username)

ブローカーへの認証に使用されるユーザー名

タイプ: string

false

virtual-host

(amqp-virtual-host)

設定されている場合は、接続に使用されるホスト名の値を設定します AMQP オープンフレームと TLS SNI サーバー名 (TLS が使用されている場合)

タイプ: string

false

条件付きでチャンネルを設定する

特定のプロファイルを使用してチャンネルを設定できます。 そのため、指定したプロファイルが有効な場合にのみ、チャンネルが設定されます(アプリケーションに追加されます)。

To achieve this, you need:

  1. Prefix the mp.messaging.[incoming|outgoing].$channel entries with %my-profile such as %my-profile.mp.messaging.[incoming|outgoing].$channel.key=value

  2. Use the @IfBuildProfile("my-profile") on the CDI beans containing @Incoming(channel) and @Outgoing(channel) annotations that need only to be enabled when the profile is enabled.

Note that reactive messaging verifies that the graph is complete. So, when using such a conditional configuration, ensure the application works with and without the profile enabled.

Note that this approach can also be used to change the channel configuration based on a profile.

関連コンテンツ