The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.
このページを編集

Reactive Messaging AMQP 1.0 コネクターリファレンスドキュメント

このガイドは、 AMQP 1.0 入門 の副読本です。Reactive Messaging 用の AMQP コネクターの設定と使用方法について詳しく説明します。

このドキュメントでは、コネクターのすべての詳細を網羅しているわけではありません。詳細については、 SmallRye Reactive Messaging Web サイト を参照してください。

AMQP コネクターを使用すると、 Quarkus アプリケーションは AMQP 1.0 プロトコルを使用してメッセージを送受信できます。プロトコルの詳細は、 AMQP 1.0 仕様 を参照してください。AMQP 1.0 と AMQP 0.9.1 (RabbitMQ で実装) は互換性がないことに注意してください。詳細は RabbitMQ の使用 を確認してください。

AMQP コネクターエクステンション

コネクターを使用するには、 quarkus-messaging-amqp エクステンションを追加する必要があります。

以下の方法でエクステンションをプロジェクトに追加できます。

コマンドラインインタフェース
quarkus extension add quarkus-messaging-amqp
Maven
./mvnw quarkus:add-extension -Dextensions='quarkus-messaging-amqp'
Gradle
./gradlew addExtension --extensions='quarkus-messaging-amqp'

または、以下の依存関係をプロジェクトに追加してください。

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-amqp</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-amqp")

プロジェクトに追加したら、 connector 属性を設定することで チャネル を AMQP アドレスにマッピングできます。

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-amqp

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-amqp
コネクターの自動アタッチ

クラスパス上にコネクターが 1 つしかない場合は、 connector 属性の設定を省略できます。 Quarkus は、クラスパス上で見つかった (唯一の) コネクターに 孤立した (orphan) チャネルを自動的に関連付けます。 孤立した チャネルとは、ダウンストリームのコンシューマーを持たない送信チャネル、またはアップストリームのプロデューサーを持たない受信チャネルのことです。

この自動アタッチは、以下を使用して無効にできます。

quarkus.messaging.auto-connector-attachment=false

AMQP ブローカーアクセスの設定

AMQP コネクターは、 Apache ActiveMQ Classic や ActiveMQ Artemis などの AMQP 1.0 ブローカーに接続します。ブローカーの場所とクレデンシャルを設定するには、 application.properties に以下のプロパティーを追加します。

amqp-host=amqp (1)
amqp-port=5672 (2)
amqp-username=my-username (3)
amqp-password=my-password (4)

mp.messaging.incoming.prices.connector=smallrye-amqp (5)
1 ブローカー/ルーターのホスト名を設定します。チャネルごとに設定する ( host 属性を使用) か、 amqp-host を使用してグローバルに設定できます。
2 ブローカー/ルーターのポートを設定します。チャネルごとに設定する ( port 属性を使用) か、 amqp-port を使用してグローバルに設定できます。デフォルトは 5672 です。
3 必要に応じてブローカー/ルーターのユーザー名を設定します。チャネルごとに設定する ( username 属性を使用) か、 amqp-username を使用してグローバルに設定できます。
4 必要に応じてブローカー/ルーターのパスワードを設定します。チャネルごとに設定する ( password 属性を使用) か、 amqp-password を使用してグローバルに設定できます。
5 prices チャネルが AMQP コネクターによって管理されるように指示します

開発モードおよびテストの実行時には、 Dev Services for AMQP が自動的に AMQP ブローカーを起動します。

AMQP メッセージの受信

アプリケーションが Message<Double> を受信するとします。ペイロードを直接消費できます。

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class AmqpPriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

あるいは、 Message<Double> を取得することもできます。

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class AmqpPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> price) {
        // process your price.

        // Acknowledge the incoming message, marking the AMQP message as `accepted`.
        return price.ack();
    }

}

インバウンドメタデータ

AMQP から送られてくるメッセージには、メタデータに IncomingAmqpMetadata のインスタンスが含まれています。

Optional<IncomingAmqpMetadata> metadata = incoming.getMetadata(IncomingAmqpMetadata.class);
metadata.ifPresent(meta -> {
    String address = meta.getAddress();
    String subject = meta.getSubject();
    boolean durable = meta.isDurable();
    // Use io.vertx.core.json.JsonObject
    JsonObject properties = meta.getProperties();
    // ...
});

デシリアライズ

コネクターは、受信した AMQP メッセージを Reactive Messaging の Message<T> インスタンスに変換します。 T は受信した AMQP メッセージの ボディ (body) に依存します。

サポートされている型は AMQP Type System で定義されています。

AMQP ボディ型 <T>

AMQP プリミティブ型 を含む AMQP Value

対応する Java 型

Binary 型を使用した AMQP Value

byte[]

AMQP Sequence

List

AMQP Data (バイナリコンテンツ) かつ content-typeapplication/json に設定されているもの

JsonObject

異なる content-type を持つ AMQP Data

byte[]

この AMQP コネクター (アウトバウンドコネクター) を使用してオブジェクトを送信すると、 JSON としてエンコードされ、バイナリとして送信されます。 content-typeapplication/json に設定されます。そのため、以下のようにオブジェクトを再構築できます。

import io.vertx.core.json.JsonObject;
//
@ApplicationScoped
public static class Consumer {

    List<Price> prices = new CopyOnWriteArrayList<>();

    @Incoming("from-amqp") (1)
    public void consume(JsonObject p) {   (2)
        Price price = p.mapTo(Price.class);  (3)
        prices.add(price);
    }

    public List<Price> list() {
        return prices;
    }
}
1 Price インスタンスはコネクターによって自動的に JSON にエンコードされます
2 JsonObject を使用して受信できます
3 次に、 mapTo メソッドを使用してインスタンスを再構築できます
mapTo メソッドは Quarkus の Jackson マッパーを使用します。マッパーの設定については、 このガイド を参照してください。

確認応答 (Acknowledgement)

AMQP メッセージに関連付けられた Reactive Messaging の Message が確認応答 (ack) されると、メッセージが 受理 (accepted) されたことをブローカーに通知します。

失敗管理

AMQP メッセージから生成されたメッセージが nacked された場合、失敗戦略が適用されます。 AMQP コネクターは 6 つの戦略をサポートしています。

  • fail - アプリケーションを失敗させます。それ以降、 AMQP メッセージは処理されません (デフォルト)。 AMQP メッセージは拒否 (rejected) としてマークされます。

  • accept - この戦略は AMQP メッセージを 受理 (accepted) としてマークします。失敗を無視して処理を続行します。 accepted 配信状態のドキュメント を参照してください。

  • release - この戦略は AMQP メッセージを 解放 (released) としてマークします。次のメッセージの処理を続行します。ブローカーはメッセージを再配信できます。 released 配信状態のドキュメント を参照してください。

  • reject - この戦略は AMQP メッセージを拒否 (rejected) としてマークします。次のメッセージの処理を続行します。 rejected 配信状態のドキュメント を参照してください。

  • modified-failed - この戦略は AMQP メッセージを 変更 (modified) としてマークし、失敗したこと ( delivery-failed 属性を使用) を示します。次のメッセージの処理を続行しますが、ブローカーはメッセージの再配信を試みる場合があります。 modified 配信状態のドキュメント を参照してください。

  • modified-failed-undeliverable-here - この戦略は AMQP メッセージを 変更 (modified) としてマークし、失敗したこと ( delivery-failed 属性を使用) を示します。また、アプリケーションがメッセージを処理できないことも示すため、ブローカーはこのノードへのメッセージの再配信を試みません。次のメッセージの処理を続行します。 modified 配信状態のドキュメント を参照してください。

AMQP メッセージの送信

シリアライズ

Message<T> を送信するとき、コネクターはメッセージを AMQP メッセージに変換します。ペイロードは AMQP メッセージの ボディ に変換されます。

T AMQP メッセージボディ

プリミティブ型または String

ペイロードを含む AMQP Value

Instant または UUID

対応する AMQP 型を使用した AMQP Value

JsonObject または JsonArray

バイナリコンテンツを使用した AMQP Data。 content-typeapplication/json に設定されます

io.vertx.mutiny.core.buffer.Buffer

バイナリコンテンツを使用した AMQP Data。 content-type は設定されません

その他のクラス

ペイロードは (Json マッパーを使用して) JSON に変換されます。結果は バイナリ コンテンツを使用した AMQP Data にラップされます。 content-typeapplication/json に設定されます

メッセージのペイロードを JSON にシリアライズできない場合、メッセージは nacked されます。

アウトバウンドメタデータ

Messages を送信するときに、 OutgoingAmqpMetadata のインスタンスを追加して、 AMQP へのメッセージ送信方法に影響を与えることができます。例えば、サブジェクトやプロパティーを設定できます。

 OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withDurable(true)
    .withSubject("my-subject")
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

動的なアドレス名

メッセージの宛先を動的に選択したい場合があります。この場合、アプリケーションの設定ファイル内にアドレスを設定するのではなく、アウトバウンドメタデータを使用してアドレスを設定する必要があります。

例えば、受信したメッセージに基づいて動的なアドレスに送信できます。

String addressName = selectAddressFromIncomingMessage(incoming);
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withAddress(addressName)
    .withDurable(true)
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
メッセージごとにアドレスを設定できるようにするために、コネクターは 匿名送信者 (anonymous sender) を使用しています。

確認応答 (Acknowledgement)

デフォルトでは、 Reactive Messaging の Message は、ブローカーがメッセージを確認したときに確認応答 (ack) されます。ルーターを使用している場合、この確認応答が有効にならないことがあります。その場合は、 auto-acknowledgement 属性を設定して、ルーターに送信された直後にメッセージを確認応答するようにしてください。

AMQP メッセージがブローカーによって拒否/解放/変更された場合 (あるいは正常に送信できない場合)、メッセージは否定応答 (nack) されます。

バックプレッシャーとクレジット

バックプレッシャーは AMQP クレジット (credits) によって処理されます。アウトバウンドコネクターは、許可されたクレジット量のみを要求します。クレジット量が 0 になると、ブローカーが AMQP 送信者にさらにクレジットを付与するまで (非ブロッキング方式で) 待機します。

AMQP アドレスの設定

address 属性を使用して AMQP アドレスを設定できます。

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.address=my-queue

mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.outgoing.orders.address=my-order-queue

address 属性が設定されていない場合、コネクターはチャネル名を使用します。

既存のキューを使用するには、 addresscontainer-id 、およびオプションで link-name 属性を設定する必要があります。例えば、以下のように設定された Apache ActiveMQ Artemis ブローカーがある場合:

<addresses>
    <address name="people">
        <anycast>
            <queue name="people">
                <user>artemis</user>
            </queue>
        </anycast>
    </address>
</addresses>

以下の設定が必要になります。

mp.messaging.outgoing.people.connector=smallrye-amqp
mp.messaging.outgoing.people.durable=true
mp.messaging.outgoing.people.address=people
mp.messaging.outgoing.people.container-id=people

キュー名がチャネル名でない場合は、 link-name 属性を設定する必要があるかもしれません。

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=people
mp.messaging.outgoing.people-out.container-id=people
mp.messaging.outgoing.people-out.link-name=people

MULTICAST キューを使用するには、キュー名だけでなく FQQN (完全修飾キュー名) を指定する必要があります。

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=foo
mp.messaging.outgoing.people-out.container-id=foo

mp.messaging.incoming.people-out.connector=smallrye-amqp
mp.messaging.incoming.people-out.durable=true
mp.messaging.incoming.people-out.address=foo::bar # Note the syntax: address-name::queue-name
mp.messaging.incoming.people-out.container-id=bar
mp.messaging.incoming.people-out.link-name=people

AMQP アドレスモデルの詳細については、 ActiveMQ Artemis ドキュメント を参照してください。

実行モデルとブロッキング処理

Reactive Messaging は、 I/O スレッド上でメソッドを呼び出します。このトピックの詳細については、 Quarkus Reactive Architecture ドキュメント を参照してください。しかし、データベースのやり取りなど、 Reactive Messaging とブロッキング処理を組み合わせる必要があることがよくあります。このためには、処理が ブロッキング であり、呼び出し元スレッドで実行すべきではないことを示す @Blocking アノテーションを使用する必要があります。

例えば、次のコードは、 Hibernate with Panache を使用して受信ペイロードをデータベースに保存する方法を示しています。

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

@Blocking アノテーションには 2 つあります。

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

これらは同じ効果があります。したがって、両方を使用できます。最初のものは、使用するワーカープールや順序を維持するかどうかなど、よりきめ細かな調整を提供します。 2 番目のものは、 Quarkus の他のリアクティブ機能でも使用されており、デフォルトのワーカープールを使用し、順序を維持します。

@RunOnVirtualThread

Java 仮想スレッド 上でブロッキング処理を実行する方法については、 Reactive Messaging を使用した Quarkus 仮想スレッドのサポートに関するドキュメント を参照してください。

@Transactional

メソッドに @Transactional がアノテートされている場合、メソッドに @Blocking がアノテートされていなくても、自動的に ブロッキング と見なされます。

基礎となる AMQP クライアントのカスタマイズ

コネクターは内部で Vert.x AMQP クライアントを使用しています。このクライアントの詳細については、 Vert.x Web サイト を参照してください。

以下のように AmqpClientOptions のインスタンスを作成することで、基礎となるクライアント設定をカスタマイズできます。

@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
  // You can use the produced options to configure the TLS connection
  PemKeyCertOptions keycert = new PemKeyCertOptions()
    .addCertPath("./tls/tls.crt")
    .addKeyPath("./tls/tls.key");
  PemTrustOptions trust = new PemTrustOptions().addCertPath("./tls/ca.crt");
  return new AmqpClientOptions()
        .setSsl(true)
        .setPemKeyCertOptions(keycert)
        .setPemTrustOptions(trust)
        .addEnabledSaslMechanism("EXTERNAL")
        .setHostnameVerificationAlgorithm("") // Disables the hostname verification. Defaults is "HTTPS"
        .setConnectTimeout(30000)
        .setReconnectInterval(5000)
        .setContainerId("my-container");
}

このインスタンスが取得され、コネクターで使用されるクライアントの設定に使用されます。 client-options-name 属性を使用してクライアントの名前を指定する必要があります。

mp.messaging.incoming.prices.client-options-name=my-named-options

ブローカーからの切断が頻繁に発生し、 AMQP 接続を永続的に維持する必要がある場合は、 AmqpClientOptions を使用してハートビートを設定することもできます。ブローカーによっては、一定のアイドルタイムアウト後に AMQP 接続を終了することがあります。ハートビート値を指定すると、 Vert.x proton クライアントがリモートピアへのトランスポートを開く際に、アイドルタイムアウトを通知するために使用されます。

@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
  // set a heartbeat of 30s (in milliseconds)
  return new AmqpClientOptions()
        .setHeartbeat(30000);
}

TLS 設定

AMQP 1.0 Messaging エクステンションは Quarkus TLS レジストリ と統合され、 Vert.x AMQP クライアントを設定します。

AMQP 1.0 チャネルの TLS を設定するには、 application.properties で名前付き TLS 設定を指定する必要があります。

quarkus.tls.your-tls-config.trust-store.pem.certs=ca.crt,ca2.pem
# ...
mp.messaging.incoming.prices.tls-configuration-name=your-tls-config

ヘルスレポート

quarkus-smallrye-health エクステンションとともに AMQP コネクターを使用すると、 readiness probe および liveness probe に貢献します。 AMQP コネクターは、コネクターによって管理される各チャネルの readiness と liveness をレポートします。現時点では、 AMQP コネクターは readiness チェックと liveness チェックに同じロジックを使用しています。

ヘルスレポートを無効にするには、チャネルの health-enabled 属性を false に設定します。インバウンド側 (AMQP からメッセージを受信) では、レシーバーがブローカーにアタッチされていることをチェックで確認します。アウトバウンド側 (AMQP にレコードを送信) では、センダーがブローカーにアタッチされていることをチェックで確認します。

メッセージ処理の失敗はメッセージを否定応答 (nack) し、その後 failure-strategy によって処理されることに注意してください。失敗をレポートし、チェックの結果に影響を与えるのは failure-strategy の責任です。 fail 失敗戦略は失敗をレポートするため、チェックは失敗をレポートします。

RabbitMQ の使用

このコネクターは AMQP 1.0 用です。 RabbitMQ は AMQP 0.9.1 を実装しています。 RabbitMQ はデフォルトで AMQP 1.0 を提供していませんが、そのためのプラグインがあります。このコネクターで RabbitMQ を使用するには、 AMQP 1.0 プラグインを有効にして設定してください。

プラグインが存在するにもかかわらず、一部の AMQP 1.0 機能は RabbitMQ では動作しません。そのため、以下の設定をお勧めします。

RabbitMQ からメッセージを受信するには:

  • durable を false に設定する

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.durable=false

RabbitMQ にメッセージを送信するには:

  • 宛先アドレスを設定する (匿名送信者はサポートされていません)

  • use-anonymous-sender を false に設定する

mp.messaging.outgoing.generated-price.connector=smallrye-amqp
mp.messaging.outgoing.generated-price.address=prices
mp.messaging.outgoing.generated-price.use-anonymous-sender=false

その結果、 RabbitMQ を使用する場合、 (メッセージメタデータを使用して) 宛先を動的に変更することはできません。

Cloud Events の受信

AMQP コネクターは Cloud Events をサポートしています。コネクターが 構造化 (structured) または バイナリ (binary) Cloud Events を検出すると、 Message のメタデータに IncomingCloudEventMetadata<T> を追加します。 IncomingCloudEventMetadata には、必須およびオプションの Cloud Event 属性へのアクセサーが含まれています。

コネクターが Cloud Event メタデータを抽出できない場合、メタデータなしで Message を送信します。

Cloud Events の受信に関する詳細については、 SmallRye Reactive Messaging ドキュメントの Receiving Cloud Events を参照してください。

Cloud Events の送信

AMQP コネクターは Cloud Events をサポートしています。以下の場合、コネクターは送信レコードを Cloud Events として送信します。

  • メッセージメタデータに io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata インスタンスが含まれている場合、

  • チャネル設定で cloud-events-type および cloud-events-source 属性が定義されている場合。

Cloud Events の送信に関する詳細については、 SmallRye Reactive Messaging ドキュメントの Sending Cloud Events を参照してください。

AMQP コネクター設定リファレンス

Quarkus 固有の設定

ビルド時に固定される設定プロパティー - 他のすべての設定プロパティーは実行時に上書き可能です

Configuration property

タイプ

デフォルト

Dev Services

タイプ

デフォルト

If Dev Services for AMQP has been explicitly enabled or disabled. Dev Services are generally enabled by default, unless there is an existing configuration present. For AMQP, Dev Services starts a broker unless amqp-host or amqp-port are set or if all the Reactive Messaging AMQP channel are configured with host or port.

Environment variable: QUARKUS_AMQP_DEVSERVICES_ENABLED

Show more

ブーリアン

Optional fixed port the dev service will listen to.

If not defined, the port will be chosen randomly.

Environment variable: QUARKUS_AMQP_DEVSERVICES_PORT

Show more

int

The image to use. Note that only ActiveMQ Artemis images are supported. Specifically, the image repository must end with artemiscloud/activemq-artemis-broker.

Check the activemq-artemis-broker on Quay page to find the available versions.

Environment variable: QUARKUS_AMQP_DEVSERVICES_IMAGE_NAME

Show more

string

quay.io/artemiscloud/activemq-artemis-broker:1.0.25

The value of the AMQ_EXTRA_ARGS environment variable to pass to the container. For ActiveMQ Artemis Broker ⇐ 1.0.21, set this property to --no-autotune --mapped --no-fsync --relax-jolokia --http-host 0.0.0.0

Environment variable: QUARKUS_AMQP_DEVSERVICES_EXTRA_ARGS

Show more

string

--no-autotune --mapped --no-fsync --relax-jolokia

Indicates if the AMQP broker managed by Quarkus Dev Services is shared. When shared, Quarkus looks for running containers using label-based service discovery. If a matching container is found, it is used, and so a second one is not started. Otherwise, Dev Services for AMQP starts a new container.

The discovery uses the quarkus-dev-service-amqp label. The value is configured using the service-name property.

Container sharing is only used in dev mode.

Environment variable: QUARKUS_AMQP_DEVSERVICES_SHARED

Show more

ブーリアン

true

The value of the quarkus-dev-service-aqmp label attached to the started container. This property is used when shared is set to true. In this case, before starting a container, Dev Services for AMQP looks for a container with the quarkus-dev-service-amqp label set to the configured value. If found, it will use this container instead of starting a new one. Otherwise, it starts a new container with the quarkus-dev-service-amqp label set to the specified value.

This property is used when you need multiple shared AMQP brokers.

Environment variable: QUARKUS_AMQP_DEVSERVICES_SERVICE_NAME

Show more

string

amqp

Environment variables that are passed to the container.

Environment variable: QUARKUS_AMQP_DEVSERVICES_CONTAINER_ENV__ENVIRONMENT_VARIABLE_NAME_

Show more

Map<String,String>

受信チャネルの設定

属性 (エイリアス) 説明 必須 デフォルト値

address

AMQP アドレス。設定されていない場合は、チャネル名が使用されます

型: string

false

auto-acknowledgement

受信した AMQP メッセージを受信時に確認応答 (ack) するかどうか

型: boolean

false

false

broadcast

受信した AMQP メッセージを複数の サブスクライバー にディスパッチする必要があるかどうか

型: boolean

false

false

capabilities

送信者または受信者クライアントによって提案されたケーパビリティのカンマ区切りリスト。

型: string

false

client-options-name

(amqp-client-options-name)

AMQP クライアント設定をカスタマイズするために使用される AMQP Client Option Bean の名前

型: string

false

cloud-events

Cloud Event サポートを有効 (デフォルト) または無効にします。 受信 チャネルで有効な場合、コネクターは受信レコードを分析し、 Cloud Event メタデータの作成を試みます。 送信 チャネルで有効な場合、メッセージに Cloud Event メタデータが含まれていれば、コネクターは送信メッセージを Cloud Event として送信します。

型: boolean

false

true

connect-timeout

(amqp-connect-timeout)

接続タイムアウト (ミリ秒)

型: int

false

1000

container-id

AMQP コンテナー ID

型: string

false

durable

AMQP サブスクリプションが永続的かどうか

型: boolean

false

false

failure-strategy

AMQP メッセージから生成されたメッセージが否定応答 (nack) されたときに適用する失敗戦略を指定します。使用可能な値は fail (デフォルト)、 acceptreleaserejectmodified-failedmodified-failed-undeliverable-here です

型: string

false

fail

health-timeout

readiness チェックのためにブローカーとの接続が維持されているかどうかを判断するまでの最大待機秒数。このしきい値を超えると、チェックは失敗したと見なされます。

型: int

false

3

host

(amqp-host)

ブローカーのホスト名

型: string

false

localhost

link-name

リンクの名前。設定されていない場合は、チャネル名が使用されます。

型: string

false

password

(amqp-password)

ブローカーへの認証に使用されるパスワード

型: string

false

port

(amqp-port)

ブローカーのポート

型: int

false

5672

reconnect-attempts

(amqp-reconnect-attempts)

再接続の試行回数

型: int

false

100

reconnect-interval

(amqp-reconnect-interval)

再接続試行間の間隔 (秒)

型: int

false

10

sni-server-name

(amqp-sni-server-name)

設定されている場合、 TLS SNI サーバー名に使用するホスト名を明示的に上書きします

型: string

false

selector

メッセージセレクターを設定します。この属性は、ソース終端で apache.org:selector-filter:string フィルターを定義するために使用されます。 SQL ベースの構文を使用して、サーバーがレシーバーに配信するメッセージをフィルター処理するように要求します (対象のサーバーでサポートされている場合)。サポートされる正確な機能と必要な構文は、サーバーによって異なる場合があります。

型: string

false

tracing-enabled

トレースを有効 (デフォルト) にするか無効にするか

型: boolean

false

true

use-ssl

(amqp-use-ssl)

AMQP 接続で SSL/TLS を使用するかどうか

型: boolean

false

false

username

(amqp-username)

ブローカーへの認証に使用されるユーザー名

型: string

false

virtual-host

(amqp-virtual-host)

設定されている場合、接続の AMQP Open フレームおよび TLS SNI サーバー名 (TLS 使用時) に使用されるホスト名値を設定します

型: string

false

送信チャネルの設定

属性 (エイリアス) 説明 必須 デフォルト値

address

AMQP アドレス。設定されていない場合は、チャネル名が使用されます

型: string

false

capabilities

送信者または受信者クライアントによって提案されたケーパビリティのカンマ区切りリスト。

型: string

false

client-options-name

(amqp-client-options-name)

AMQP クライアント設定をカスタマイズするために使用される AMQP Client Option Bean の名前

型: string

false

cloud-events

Cloud Event サポートを有効 (デフォルト) または無効にします。 受信 チャネルで有効な場合、コネクターは受信レコードを分析し、 Cloud Event メタデータの作成を試みます。 送信 チャネルで有効な場合、メッセージに Cloud Event メタデータが含まれていれば、コネクターは送信メッセージを Cloud Event として送信します。

型: boolean

false

true

cloud-events-data-content-type

(cloud-events-default-data-content-type)

送信 Cloud Event のデフォルトの datacontenttype 属性を設定します。 cloud-eventstrue に設定されている必要があります。この値は、メッセージ自体に datacontenttype 属性が設定されていない場合に使用されます。

型: string

false

cloud-events-data-schema

(cloud-events-default-data-schema)

送信 Cloud Event のデフォルトの dataschema 属性を設定します。 cloud-eventstrue に設定されている必要があります。この値は、メッセージ自体に dataschema 属性が設定されていない場合に使用されます。

型: string

false

cloud-events-insert-timestamp

(cloud-events-default-timestamp)

コネクターが送信 Cloud Event に time 属性を自動的に挿入するかどうか。 cloud-eventstrue に設定する必要があります。この値は、メッセージ自体に time 属性が設定されていない場合に使用されます。

Type: boolean

false

true

cloud-events-mode

Cloud Event モード (structured または binary (デフォルト))。送信レコードにおける Cloud Event の書き込み方法を指定します。

Type: string

false

binary

cloud-events-source

(cloud-events-default-source)

送信 Cloud Event のデフォルトの source 属性を設定します。 cloud-eventstrue に設定する必要があります。この値は、メッセージ自体に source 属性が設定されていない場合に使用されます。

Type: string

false

cloud-events-subject

(cloud-events-default-subject)

送信 Cloud Event のデフォルトの subject 属性を設定します。 cloud-eventstrue に設定する必要があります。この値は、メッセージ自体に subject 属性が設定されていない場合に使用されます。

Type: string

false

cloud-events-type

(cloud-events-default-type)

送信 Cloud Event のデフォルトの type 属性を設定します。 cloud-eventstrue に設定する必要があります。この値は、メッセージ自体に type 属性が設定されていない場合に使用されます。

Type: string

false

connect-timeout

(amqp-connect-timeout)

接続タイムアウト (ミリ秒)

型: int

false

1000

container-id

AMQP コンテナー ID

型: string

false

credit-retrieval-period

ブローカーによって付与されたクレジットを取得するための 2 回の試行間の間隔 (ミリ秒単位)。この時間は、送信者のクレジットがなくなったときに使用されます。

Type: int

false

2000

durable

送信された AMQP メッセージを耐久性 (durable) ありとしてマークするかどうか

Type: boolean

false

false

health-timeout

readiness チェックのためにブローカーとの接続が維持されているかどうかを判断するまでの最大待機秒数。このしきい値を超えると、チェックは失敗したと見なされます。

型: int

false

3

host

(amqp-host)

ブローカーのホスト名

型: string

false

localhost

link-name

リンクの名前。設定されていない場合は、チャネル名が使用されます。

型: string

false

merge

コネクターが複数のアップストリームを許可するかどうか

Type: boolean

false

false

password

(amqp-password)

ブローカーへの認証に使用されるパスワード

型: string

false

port

(amqp-port)

ブローカーのポート

型: int

false

5672

reconnect-attempts

(amqp-reconnect-attempts)

再接続の試行回数

型: int

false

100

reconnect-interval

(amqp-reconnect-interval)

再接続試行間の間隔 (秒)

型: int

false

10

sni-server-name

(amqp-sni-server-name)

設定されている場合、 TLS SNI サーバー名に使用するホスト名を明示的に上書きします

型: string

false

tracing-enabled

トレースを有効 (デフォルト) にするか無効にするか

型: boolean

false

true

ttl

送信された AMQP メッセージの生存時間 (TTL)。0 は TTL を無効にします。

Type: long

false

0

use-anonymous-sender

コネクターが匿名送信者を使用するかどうか。デフォルト値は、ブローカーがサポートしている場合は true 、そうでない場合は false です。サポートされていない場合、送信先アドレスを動的に変更することはできません。

Type: boolean

false

use-ssl

(amqp-use-ssl)

AMQP 接続で SSL/TLS を使用するかどうか

型: boolean

false

false

username

(amqp-username)

ブローカーへの認証に使用されるユーザー名

型: string

false

virtual-host

(amqp-virtual-host)

設定されている場合、接続の AMQP Open フレームおよび TLS SNI サーバー名 (TLS 使用時) に使用されるホスト名値を設定します

型: string

false

チャネルの条件付き設定

特定のプロファイルを使用してチャネルを設定できます。これにより、指定したプロファイルが有効な場合にのみチャネルが設定され (アプリケーションに追加され) ます。

これを実現するには、以下が必要になります。

  1. mp.messaging.[incoming|outgoing].$channel エントリーの前に %my-profile を付けます (例: %my-profile.mp.messaging.[incoming|outgoing].$channel.key=value)。

  2. プロファイルが有効な場合にのみ有効にする必要がある @Incoming(channel) および @Outgoing(channel) アノテーションを含む CDI Bean で @IfBuildProfile("my-profile") を使用します。

Reactive Messaging はグラフが完全であることを検証することに注意してください。したがって、このような条件付き設定を使用する場合は、プロファイルが有効な場合と無効な場合の両方でアプリケーションが動作することを確認してください。

このアプローチは、プロファイルに基づいてチャネル設定を変更するためにも使用できることに注意してください。

関連コンテンツ