The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

RabbitMQによるSmallRyeリアクティブメッセージング入門

このガイドでは、QuarkusアプリケーションがSmallRye Reactive Messagingを利用してRabbitMQと対話する方法を説明します。

この技術は、previewと考えられています。

preview では、下位互換性やエコシステムでの存在は保証されていません。具体的な改善には設定や API の変更が必要になるかもしれませんが、 stable になるための計画は現在進行中です。フィードバックは メーリングリストGitHub の課題管理 で受け付けています。

とりうるステータスの完全なリストについては、 FAQの項目 を参照してください。

前提条件

このガイドを完成させるには、以下が必要です:

  • 約15分

  • IDE

  • JDK 11+ がインストールされ、 JAVA_HOME が適切に設定されていること

  • Apache Maven 3.9.1

  • Docker と Docker Compose、または Podman 、および Docker Compose

  • 使用したい場合は、 Quarkus CLI

  • ネイティブ実行可能ファイルをビルドしたい場合、MandrelまたはGraalVM(あるいはネイティブなコンテナビルドを使用する場合はDocker)をインストールし、 適切に設定していること

アーキテクチャ

このガイドでは、RabbitMQ ブローカーと通信する 2 つのアプリケーションを開発します。1つ目のアプリケーションは、RabbitMQの 見積要求 エクスチェンジに 見積要求 を送信し、 見積 キューからメッセージを消費します。2 番目のアプリケーションは、 見積要求 を受信し、 見積 を送り返します。

アーキテクチャ,

最初のアプリケーションである producer では、ユーザーが HTTP エンドポイントを介していくつかの見積を要求することができます。各見積要求に対して、ランダムな識別子が生成され、ユーザーに返され、見積要求が pending されます。同時に、生成されたリクエストIDは、 quote-requests のエクスチェンジに送信されます。

プロデューサー

2 番目のアプリケーションである processor は、 quote-requests のキューから読み取った見積にランダムな価格を付けて、 quotes という名前のエクスチェンジに送信します。

最後に、 producer が見積を読み取り、server-sent event を使ってブラウザに送信します。そのため、ユーザーは見積が pending から受信した価格にリアルタイムで更新されるのを見ることができます。

ソリューション

次のセクションの説明に沿って、段階的にアプリケーションを作成することをお勧めします。しかし、完成した例をすぐに見ることもできます。

Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.gitアーカイブ をダウンロードします。

このソリューションは rabbitmq-quickstart ディレクトリにあります。

Mavenプロジェクトの作成

まず、 producerprocessor の2つのプロジェクトを作成する必要があります。

producer プロジェクトを作成するには、ターミナルで次のように実行します:

コマンドラインインタフェース
quarkus create app org.acme:rabbitmq-quickstart-producer \
    --extension='smallrye-reactive-messaging-rabbitmq,resteasy-reactive-jackson' \
    --no-code

Gradleプロジェクトを作成するには、 --gradle または --gradle-kotlin-dsl オプションを追加します。

Quarkus CLIのインストール方法や使用方法については、 Quarkus CLIガイド を参照してください。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.1.0.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=rabbitmq-quickstart-producer \
    -Dextensions='smallrye-reactive-messaging-rabbitmq,resteasy-reactive-jackson' \
    -DnoCode

Gradleプロジェクトを作成するには、 -DbuildTool=gradle または -DbuildTool=gradle-kotlin-dsl オプションを追加します。

このコマンドは、プロジェクト構造を作成し、使用する2つのQuarkusエクステンションを選択します:

  1. Reactive Messaging RabbitMQコネクター

  2. RESTEasy ReactiveとJSONペイロードを扱うためのJacksonサポート

processor プロジェクトを作成するには、同じディレクトリから、次のように実行します:

コマンドラインインタフェース
quarkus create app org.acme:rabbitmq-quickstart-processor \
    --extension='smallrye-reactive-messaging-rabbitmq' \
    --no-code

Gradleプロジェクトを作成するには、 --gradle または --gradle-kotlin-dsl オプションを追加します。

Quarkus CLIのインストール方法や使用方法については、 Quarkus CLIガイド を参照してください。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.1.0.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=rabbitmq-quickstart-processor \
    -Dextensions='smallrye-reactive-messaging-rabbitmq' \
    -DnoCode

Gradleプロジェクトを作成するには、 -DbuildTool=gradle または -DbuildTool=gradle-kotlin-dsl オプションを追加します。

この時点では、以下のような構成になっているはずです:

.
├── rabbitmq-quickstart-processor
│  ├── README.md
│  ├── mvnw
│  ├── mvnw.cmd
│  ├── pom.xml
│  └── src
│     └── main
│        ├── docker
│        ├── java
│        └── resources
│           └── application.properties
└── rabbitmq-quickstart-producer
   ├── README.md
   ├── mvnw
   ├── mvnw.cmd
   ├── pom.xml
   └── src
      └── main
         ├── docker
         ├── java
         └── resources
            └── application.properties

2つのプロジェクトをお好みのIDEで開きます。

Quoteオブジェクト

Quote クラスは、 producerprocessor の両方のプロジェクトで使用されます。楽をするためにこのクラスを複製します。両方のプロジェクトで、次のような内容の src/main/java/org/acme/rabbitmq/model/Quote.java ファイルを作成します:

package org.acme.rabbitmq.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Quote {

    public String id;
    public int price;

    /**
    * Default constructor required for Jackson serializer
    */
    public Quote() { }

    public Quote(String id, int price) {
        this.id = id;
        this.price = price;
    }

    @Override
    public String toString() {
        return "Quote{" +
                "id='" + id + '\'' +
                ", price=" + price +
                '}';
    }
}

Quote オブジェクトのJSON表現は、RabbitMQキューに送信されるメッセージや、ブラウザクライアントに送信されるserver-sent eventで使用されます。

QuarkusにはJSONのRabbitMQメッセージを処理する機能が組み込まれています。

@RegisterForReflection

@RegisterForReflection アノテーションは、ネイティブ実行可能ファイルをビルドする際に、クラス(フィールドとメソッドを含む)を含めるようにQuarkusに指示します。これは、後でアプリケーションをコンテナ内のネイティブ実行可能ファイルとして実行する際に役立ちます。これがないと、ネイティブのコンパイル時にデッドコードの除去段階でフィールドやメソッドが削除されてしまいます。

見積もり依頼の送信

producer プロジェクト内で、生成された src/main/java/org/acme/rabbitmq/producer/QuotesResource.java ファイルを探し、コンテンツを次のように更新します:

package org.acme.rabbitmq.producer;

import java.util.UUID;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.rabbitmq.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import io.smallrye.mutiny.Multi;

@Path("/quotes")
public class QuotesResource {

    @Channel("quote-requests") Emitter<String> quoteRequestEmitter; (1)

    /**
     * Endpoint to generate a new quote request id and send it to "quote-requests" channel (which
     * maps to the "quote-requests" RabbitMQ exchange) using the emitter.
     */
    @POST
    @Path("/request")
    @Produces(MediaType.TEXT_PLAIN)
    public String createRequest() {
        UUID uuid = UUID.randomUUID();
        quoteRequestEmitter.send(uuid.toString()); (2)
        return uuid.toString();
    }
}
1 Reactive Messaging Emitter を注入して、 quote-requests チャンネルにメッセージを送信します。
2 投稿リクエストで、ランダムなUUIDを生成し、エミッターを使ってRabbitMQのキューに送信します。

このチャンネルは、 application.properties ファイルの設定を使用して、RabbitMQ のエクスチェンジにマッピングされます。 src/main/resource/application.properties ファイルを開き、次のように追加します:

# Configure the outgoing RabbitMQ exchange `quote-requests`
mp.messaging.outgoing.quote-requests.connector=smallrye-rabbitmq
mp.messaging.outgoing.quote-requests.exchange.name=quote-requests

指定する必要があるのは、 smallrye-rabbitmq のコネクタだけです。デフォルトでは、リアクティブ・メッセージングは、チャネル名 quote-requests を同じ RabbitMQ のエクスチェンジ名にマッピングします。

見積依頼の処理

それでは、見積依頼を消費して、価格を出してみましょう。 processor プロジェクト内で、 src/main/java/org/acme/rabbitmq/processor/QuoteProcessor.java ファイルを探し、以下を追加します:

package org.acme.rabbitmq.processor;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.acme.rabbitmq.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Blocking;

/**
 * A bean consuming data from the "quote-requests" RabbitMQ queue and giving out a random quote.
 * The result is pushed to the "quotes" RabbitMQ exchange.
 */
@ApplicationScoped
public class QuoteProcessor {

    private Random random = new Random();

    @Incoming("requests")       (1)
    @Outgoing("quotes")         (2)
    @Blocking                   (3)
    public Quote process(String quoteRequest) throws InterruptedException {
        // simulate some hard-working task
        Thread.sleep(1000);
        return new Quote(quoteRequest, random.nextInt(100));
    }
}
1 メソッドが requests チャンネルのアイテムを消費することを示します
2 メソッドが返すオブジェクトが quotes チャンネルに送信されることを示します
3 処理が blocking であり、呼び出し元のスレッドでは実行できないことを示します。

process メソッドは、 quote-requests キューからの RabbitMQ のメッセージごとに呼び出され、 Quote オブジェクトを quotes エクスチェンジに送信します。

前述の例と同様に、 application.properties ファイルでコネクタを設定する必要があります。 src/main/resources/application.properties ファイルを開き、次のように追加します:

# Configure the incoming RabbitMQ queue `quote-requests`
mp.messaging.incoming.requests.connector=smallrye-rabbitmq
mp.messaging.incoming.requests.queue.name=quote-requests
mp.messaging.incoming.requests.exchange.name=quote-requests

# Configure the outgoing RabbitMQ exchange `quotes`
mp.messaging.outgoing.quotes.connector=smallrye-rabbitmq
mp.messaging.outgoing.quotes.exchange.name=quotes

この場合、着信用と発信用のコネクタ構成がそれぞれ1つずつあり、それぞれに明確な名前がついていることに注意してください。設定のキーは以下のように構成されています:

mp.messaging.[outgoing|incoming].{channel-name}.property=value

channel-name セグメントは、 @Incoming および @Outgoing アノテーションで設定された値と一致する必要があります:

  • quote-requests → 見積依頼を読み出すRabbitMQのキュー

  • quotes → 見積を書き込むRabbitMQのエクスチェンジ

見積の受け取り

producer プロジェクトに戻ります。見積を消費するために QuotesResource を変更して、イベントをクライアントに送信するためにHTTPエンドポイントにバインドしてみましょう:

import io.smallrye.mutiny.Multi;
//...

@Channel("quotes") Multi<Quote> quotes;     (1)

/**
 * Endpoint retrieving the "quotes" queue and sending the items to a server sent event.
 */
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
    return quotes; (3)
}
1 @Channel 修飾子を使用して quotes のチャネルを注入します
2 Server Sent Events を使用してコンテンツが送信されたことを示します
3 ストリーム (Reactive Stream) を返します。

再度、 producer プロジェクト内で quotes の受信チャンネルを設定する必要があります。 application.properties ファイル内に以下を追加します:

# Configure the outgoing `quote-requests` queue
mp.messaging.outgoing.quote-requests.connector=smallrye-rabbitmq

# Configure the incoming `quotes` queue
mp.messaging.incoming.quotes.connector=smallrye-rabbitmq

HTMLページ

最後に、SSE を使って変換された価格を読み込む HTML ページです。

producer プロジェクト中の src/main/resources/META-INF/resources/quotes.html に以下の内容のファイルを作成します:

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Quotes</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
    <div class="card">
        <div class="card-body">
            <h2 class="card-title">Quotes</h2>
            <button class="btn btn-info" id="request-quote">Request Quote</button>
            <div class="quotes"></div>
        </div>
    </div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script>
    $("#request-quote").click((event) => {
        fetch("/quotes/request", {method: "POST"})
        .then(res => res.text())
        .then(qid => {
            var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
            $(".quotes").append(row);
        });
    });
    var source = new EventSource("/quotes");
    source.onmessage = (event) => {
      var json = JSON.parse(event.data);
      $(`#${json.id}`).html(function(index, html) {
        return html.replace("Pending", `\$\xA0${json.price}`);
      });
    };
</script>
</html>

目を見張るものはありません。受け取った見積の内容が更新されていきます。

実行

両方のアプリケーションを実行するためには以下コマンドを打つだけです:

> mvn -f rabbitmq-quickstart-producer quarkus:dev

そして、別の端末で以下を実行します:

> mvn -f rabbitmq-quickstart-processor quarkus:dev

Quarkusは、RabbitMQブローカーを自動的に起動し、アプリケーションを構成し、異なるアプリケーション間でブローカーインスタンスを共有します。詳細は、 RabbitMQ用のDev Servicesを参照してください。

http://localhost:8080/quotes.html をブラウザで開き、ボタンをクリックして見積を依頼してください。

JVMまたはNativeモードでの実行

開発モードやテストモードで実行していないときは、RabbitMQブローカーを起動する必要があります。 RabbitMQ Dockerウェブサイトの指示に従うか、以下の内容の docker-compose.yaml ファイルを作成します:

version: '2'

services:

  rabbit:
    image: rabbitmq:3.9-management
    ports:
      - "5672:5672"
    networks:
      - rabbitmq-quickstart-network

  producer:
    image: quarkus-quickstarts/rabbitmq-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: rabbitmq-quickstart-producer
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      RABBITMQ_HOST: rabbit
      RABBITMQ_PORT: 5672
    ports:
      - "8080:8080"
    networks:
      - rabbitmq-quickstart-network

  processor:
    image: quarkus-quickstarts/rabbitmq-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: rabbitmq-quickstart-processor
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      RABBITMQ_HOST: rabbit
      RABBITMQ_PORT: 5672
    networks:
      - rabbitmq-quickstart-network

networks:
  rabbitmq-quickstart-network:
    name: rabbitmq-quickstart

RabbitMQ ブローカーのロケーションがどのように構成されているかに注意してください。 rabbitmq-hostrabbitmq-port プロパティ( 環境変数だと AMQP_HOSTAMQP_PORT ) でロケーションを設定します。

まず、アプリケーションが停止していることを確認し、両方のアプリケーションをJVMモードでビルドします:

> mvn -f rabbitmq-quickstart-producer clean package
> mvn -f rabbitmq-quickstart-processor clean package

パッケージングされたら、 docker compose up --build を実行します。UI は http://localhost:8080/quotes.html で公開されます。

アプリケーションをネイティブで動作させるには、まず、ネイティブ実行可能ファイルをビルドする必要があります:

> mvn -f rabbitmq-quickstart-producer package -Pnative  -Dquarkus.native.container-build=true
> mvn -f rabbitmq-quickstart-processor package -Pnative -Dquarkus.native.container-build=true

-Dquarkus.native.container-build=true は、Quarkusにコンテナ内で実行可能なLinux 64ビットのネイティブ実行可能ファイルをビルドするよう指示しています。その後、次を使用してシステムを実行します:

> export QUARKUS_MODE=native
> docker compose up --build

UIは前と同じように http://localhost:8080/quotes.html で公開されます。

さらに詳しく

このガイドではQuarkusを使ってRabbitMQと対話する方法を紹介しました。 SmallRye Reactive Messagingを利用して、データストリーミングアプリケーションを構築しています。

Kafkaを経験したことがある方は同じコードであることに気がついたと思います。違いはコネクタの設定とJSONのマッピングのみです。