The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.
このページを編集

RabbitMQを使用したQuarkusメッセージング入門

このガイドでは、QuarkusアプリケーションがQuarkus Messagingを利用してRabbitMQとやり取りする方法を示します。

この技術は、previewと考えられています。

preview では、下位互換性やエコシステムでの存在は保証されていません。具体的な改善には設定や API の変更が必要になるかもしれませんが、 stable になるための計画は現在進行中です。フィードバックは メーリングリストGitHub の課題管理 で受け付けています。

とりうるステータスの完全なリストについては、 FAQの項目 を参照してください。

前提条件

このガイドを完成させるには、以下が必要です:

  • 約15分

  • IDE

  • JDK 17+がインストールされ、 JAVA_HOME が適切に設定されていること

  • Apache Maven 3.9.9

  • Docker と Docker Compose、または Podman 、および Docker Compose

  • 使用したい場合は、 Quarkus CLI

  • ネイティブ実行可能ファイルをビルドしたい場合、MandrelまたはGraalVM(あるいはネイティブなコンテナビルドを使用する場合はDocker)をインストールし、 適切に設定していること

アーキテクチャ

このガイドでは、RabbitMQ ブローカーと通信する 2 つのアプリケーションを開発します。1つ目のアプリケーションは、RabbitMQの 見積要求 エクスチェンジに 見積要求 を送信し、 見積 キューからメッセージを消費します。2 番目のアプリケーションは、 見積要求 を受信し、 見積 を送り返します。

Architecture

最初のアプリケーションである producer では、ユーザーが HTTP エンドポイントを介していくつかの見積を要求することができます。各見積要求に対して、ランダムな識別子が生成され、ユーザーに返され、見積要求が pending されます。同時に、生成されたリクエストIDは、 quote-requests のエクスチェンジに送信されます。

Producer App UI

2 番目のアプリケーションである processor は、 quote-requests のキューから読み取った見積にランダムな価格を付けて、 quotes という名前のエクスチェンジに送信します。

最後に、producer は見積りを読み取り、サーバーから送信されたイベントを使用してこれらをブラウザーに送信します。したがって、ユーザーには、見積り価格が pending から受信した価格にリアルタイムで更新されていることがわかります。

ソリューション

次の章で紹介する手順に沿って、ステップを踏んでアプリを作成することをお勧めします。ただし、完成した例にそのまま進んでも構いません。

Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.gitアーカイブ をダウンロードします。

ソリューションは rabbitmq-quickstart ディレクトリ にあります。

Mavenプロジェクトの作成

まず、 producerprocessor の2つのプロジェクトを作成する必要があります。

producer プロジェクトを作成するには、ターミナルで次のように実行します:

コマンドラインインタフェース
quarkus create app org.acme:rabbitmq-quickstart-producer \
    --extension='messaging-rabbitmq,rest-jackson' \
    --no-code

Gradleプロジェクトを作成するには、 --gradle または --gradle-kotlin-dsl オプションを追加します。

Quarkus CLIのインストールと使用方法の詳細については、 Quarkus CLI ガイドを参照してください。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.16.3:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=rabbitmq-quickstart-producer \
    -Dextensions='messaging-rabbitmq,rest-jackson' \
    -DnoCode

Gradleプロジェクトを作成するには、 -DbuildTool=gradle または -DbuildTool=gradle-kotlin-dsl オプションを追加します。

Windowsユーザーの場合:

  • cmdを使用する場合、(バックスラッシュ \ を使用せず、すべてを同じ行に書かないでください)。

  • Powershellを使用する場合は、 -D パラメータを二重引用符で囲んでください。例: "-DprojectArtifactId=rabbitmq-quickstart-producer"

このコマンドは、プロジェクト構造を作成し、使用する2つのQuarkusエクステンションを選択します:

  1. Reactive Messaging RabbitMQコネクター

  2. Quarkus REST(旧RESTEasy Reactive)と、そのJacksonサポートによるJSONペイロードの処理

Quarkusプロジェクトがすでに設定されている場合は、プロジェクトのベースディレクトリで次のコマンドを実行することで、 messaging-rabbitmq エクステンションをプロジェクトに追加できます:

コマンドラインインタフェース
quarkus extension add messaging-rabbitmq
Maven
./mvnw quarkus:add-extension -Dextensions='messaging-rabbitmq'
Gradle
./gradlew addExtension --extensions='messaging-rabbitmq'

これにより、 pom.xml に以下が追加されます:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-rabbitmq</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-rabbitmq")

processor プロジェクトを作成するには、同じディレクトリから、次のように実行します:

コマンドラインインタフェース
quarkus create app org.acme:rabbitmq-quickstart-processor \
    --extension='messaging-rabbitmq' \
    --no-code

Gradleプロジェクトを作成するには、 --gradle または --gradle-kotlin-dsl オプションを追加します。

Quarkus CLIのインストールと使用方法の詳細については、 Quarkus CLI ガイドを参照してください。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.16.3:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=rabbitmq-quickstart-processor \
    -Dextensions='messaging-rabbitmq' \
    -DnoCode

Gradleプロジェクトを作成するには、 -DbuildTool=gradle または -DbuildTool=gradle-kotlin-dsl オプションを追加します。

Windowsユーザーの場合:

  • cmdを使用する場合、(バックスラッシュ \ を使用せず、すべてを同じ行に書かないでください)。

  • Powershellを使用する場合は、 -D パラメータを二重引用符で囲んでください。例: "-DprojectArtifactId=rabbitmq-quickstart-processor"

この時点では、以下のような構成になっているはずです:

.
├── rabbitmq-quickstart-processor
│  ├── README.md
│  ├── mvnw
│  ├── mvnw.cmd
│  ├── pom.xml
│  └── src
│     └── main
│        ├── docker
│        ├── java
│        └── resources
│           └── application.properties
└── rabbitmq-quickstart-producer
   ├── README.md
   ├── mvnw
   ├── mvnw.cmd
   ├── pom.xml
   └── src
      └── main
         ├── docker
         ├── java
         └── resources
            └── application.properties

2つのプロジェクトをお好みのIDEで開きます。

Quoteオブジェクト

Quote クラスは、 producerprocessor の両方のプロジェクトで使用されます。簡単にするために、このクラスを複製します。両方のプロジェクトで、次のような内容の src/main/java/org/acme/rabbitmq/model/Quote.java ファイルを作成します。

package org.acme.rabbitmq.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Quote {

    public String id;
    public int price;

    /**
    * Default constructor required for Jackson serializer
    */
    public Quote() { }

    public Quote(String id, int price) {
        this.id = id;
        this.price = price;
    }

    @Override
    public String toString() {
        return "Quote{" +
                "id='" + id + '\'' +
                ", price=" + price +
                '}';
    }
}

Quote オブジェクトのJSON表現は、RabbitMQキューに送信されるメッセージや、ブラウザクライアントに送信されるserver-sent eventで使用されます。

QuarkusにはJSONのRabbitMQメッセージを処理する機能が組み込まれています。

@RegisterForReflection

The @RegisterForReflection annotation instructs Quarkus to keep the class, its fields, and methods when creating a native executable. This is crucial when we later run our applications as native executables within containers. Without this annotation, the native compilation process would discard the fields and methods during the dead-code elimination phase, which would lead to runtime errors. More details about the @RegisterForReflection annotation can be found on the native application tips page.

見積もり依頼の送信

producer プロジェクト内で、生成された src/main/java/org/acme/rabbitmq/producer/QuotesResource.java ファイルを探し、コンテンツを次のように更新します:

package org.acme.rabbitmq.producer;

import java.util.UUID;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.rabbitmq.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import io.smallrye.mutiny.Multi;

@Path("/quotes")
public class QuotesResource {

    @Channel("quote-requests") Emitter<String> quoteRequestEmitter; (1)

    /**
     * Endpoint to generate a new quote request id and send it to "quote-requests" channel (which
     * maps to the "quote-requests" RabbitMQ exchange) using the emitter.
     */
    @POST
    @Path("/request")
    @Produces(MediaType.TEXT_PLAIN)
    public String createRequest() {
        UUID uuid = UUID.randomUUID();
        quoteRequestEmitter.send(uuid.toString()); (2)
        return uuid.toString();
    }
}
1 Reactive Messaging Emitter を注入して、 quote-requests チャンネルにメッセージを送信します。
2 投稿リクエストで、ランダムなUUIDを生成し、エミッターを使ってRabbitMQのキューに送信します。

このチャンネルは、 application.properties ファイルの設定を使用して、RabbitMQ のエクスチェンジにマッピングされます。 src/main/resource/application.properties ファイルを開き、次のように追加します:

# Configure the outgoing RabbitMQ exchange `quote-requests`
mp.messaging.outgoing.quote-requests.connector=smallrye-rabbitmq
mp.messaging.outgoing.quote-requests.exchange.name=quote-requests

指定する必要があるのは、 smallrye-rabbitmq のコネクタだけです。デフォルトでは、リアクティブ・メッセージングは、チャネル名 quote-requests を同じ RabbitMQ のエクスチェンジ名にマッピングします。

見積依頼の処理

それでは、見積依頼を消費して、価格を出してみましょう。 processor プロジェクト内で、 src/main/java/org/acme/rabbitmq/processor/QuoteProcessor.java ファイルを探し、以下を追加します:

package org.acme.rabbitmq.processor;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.acme.rabbitmq.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Blocking;

/**
 * A bean consuming data from the "quote-requests" RabbitMQ queue and giving out a random quote.
 * The result is pushed to the "quotes" RabbitMQ exchange.
 */
@ApplicationScoped
public class QuoteProcessor {

    private Random random = new Random();

    @Incoming("requests")       (1)
    @Outgoing("quotes")         (2)
    @Blocking                   (3)
    public Quote process(String quoteRequest) throws InterruptedException {
        // simulate some hard-working task
        Thread.sleep(1000);
        return new Quote(quoteRequest, random.nextInt(100));
    }
}
1 メソッドが requests チャンネルのアイテムを消費することを示します
2 メソッドが返すオブジェクトが quotes チャンネルに送信されることを示します
3 処理が blocking であり、呼び出し元のスレッドでは実行できないことを示します。

process メソッドは、 quote-requests キューからの RabbitMQ のメッセージごとに呼び出され、 Quote オブジェクトを quotes エクスチェンジに送信します。

前述の例と同様に、 application.properties ファイルでコネクタを設定する必要があります。 src/main/resources/application.properties ファイルを開き、次のように追加します:

# Configure the incoming RabbitMQ queue `quote-requests`
mp.messaging.incoming.requests.connector=smallrye-rabbitmq
mp.messaging.incoming.requests.queue.name=quote-requests
mp.messaging.incoming.requests.exchange.name=quote-requests

# Configure the outgoing RabbitMQ exchange `quotes`
mp.messaging.outgoing.quotes.connector=smallrye-rabbitmq
mp.messaging.outgoing.quotes.exchange.name=quotes

この場合、着信側と発信側のコネクタ設定が1つずつあり、それぞれに明確な名前が付けられていることに注意してください。 設定プロパティは以下のような構造になっています:

mp.messaging.[outgoing|incoming].{channel-name}.property=value

channel-name セグメントは、 @Incoming および @Outgoing アノテーションで設定された値と一致する必要があります:

  • quote-requests → 見積依頼を読み出すRabbitMQのキュー

  • quotes → 見積を書き込むRabbitMQのエクスチェンジ

見積の受け取り

producer プロジェクトに戻ります。引用符を使用するように QuotesResource を変更し、それを HTTP エンドポイントにバインドして、クライアントにイベントを送信しましょう。

import io.smallrye.mutiny.Multi;
//...

@Channel("quotes") Multi<Quote> quotes;     (1)

/**
 * Endpoint retrieving the "quotes" queue and sending the items to a server sent event.
 */
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
    return quotes; (3)
}
1 @Channel 修飾子を使用して quotes のチャネルを注入します
2 Server Sent Events を使用してコンテンツが送信されたことを示します
3 ストリーム (Reactive Stream) を返します。

再度、 producer プロジェクト内で quotes の受信チャンネルを設定する必要があります。 application.properties ファイル内に以下を追加します:

# Configure the outgoing `quote-requests` queue
mp.messaging.outgoing.quote-requests.connector=smallrye-rabbitmq

# Configure the incoming `quotes` queue
mp.messaging.incoming.quotes.connector=smallrye-rabbitmq

HTMLページ

最後に、SSE を使って変換された価格を読み込む HTML ページです。

producer プロジェクト中の src/main/resources/META-INF/resources/quotes.html に以下の内容のファイルを作成します:

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Quotes</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
    <div class="card">
        <div class="card-body">
            <h2 class="card-title">Quotes</h2>
            <button class="btn btn-info" id="request-quote">Request Quote</button>
            <div class="quotes"></div>
        </div>
    </div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script>
    $("#request-quote").click((event) => {
        fetch("/quotes/request", {method: "POST"})
        .then(res => res.text())
        .then(qid => {
            var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
            $(".quotes").append(row);
        });
    });
    var source = new EventSource("/quotes");
    source.onmessage = (event) => {
      var json = JSON.parse(event.data);
      $(`#${json.id}`).html(function(index, html) {
        return html.replace("Pending", `\$\xA0${json.price}`);
      });
    };
</script>
</html>

ここでは、特筆すべきものは何もありません。受け取った見積もりごとに、ページが更新されます。

実行

両方のアプリケーションを実行するためには以下コマンドを打つだけです:

mvn -f rabbitmq-quickstart-producer quarkus:dev

そして、別の端末で以下を実行します:

mvn -f rabbitmq-quickstart-processor quarkus:dev

Quarkusは、RabbitMQブローカーを自動的に起動し、アプリケーションを構成し、異なるアプリケーション間でブローカーインスタンスを共有します。詳細は、 RabbitMQ用のDev Servicesを参照してください。

http://localhost:8080/quotes.html をブラウザで開き、ボタンをクリックして見積を依頼してください。

JVMまたはNativeモードでの実行

開発モードやテストモードで実行していないときは、RabbitMQブローカーを起動する必要があります。 RabbitMQ Dockerウェブサイトの指示に従うか、以下の内容の docker-compose.yaml ファイルを作成します:

version: '2'

services:

  rabbit:
    image: rabbitmq:3.12-management
    ports:
      - "5672:5672"
    networks:
      - rabbitmq-quickstart-network

  producer:
    image: quarkus-quickstarts/rabbitmq-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: rabbitmq-quickstart-producer
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      RABBITMQ_HOST: rabbit
      RABBITMQ_PORT: 5672
    ports:
      - "8080:8080"
    networks:
      - rabbitmq-quickstart-network

  processor:
    image: quarkus-quickstarts/rabbitmq-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: rabbitmq-quickstart-processor
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      RABBITMQ_HOST: rabbit
      RABBITMQ_PORT: 5672
    networks:
      - rabbitmq-quickstart-network

networks:
  rabbitmq-quickstart-network:
    name: rabbitmq-quickstart

RabbitMQ ブローカーのロケーションがどのように構成されているかに注意してください。 rabbitmq-hostrabbitmq-port プロパティ( 環境変数だと AMQP_HOSTAMQP_PORT ) でロケーションを設定します。

まず、アプリケーションが停止していることを確認し、両方のアプリケーションをJVMモードでビルドします:

mvn -f rabbitmq-quickstart-producer clean package
mvn -f rabbitmq-quickstart-processor clean package

パッケージ化したら、docker compose up --build を実行します。UI は http://localhost:8080/quotes.html で公開されています

アプリケーションをネイティブで動作させるには、まず、ネイティブ実行可能ファイルをビルドする必要があります:

mvn -f rabbitmq-quickstart-producer package -Dnative  -Dquarkus.native.container-build=true
mvn -f rabbitmq-quickstart-processor package -Dnative -Dquarkus.native.container-build=true

-Dquarkus.native.container-build=true は、コンテナー内で実行できる Linux 64 ビットのネイティブ実行可能ファイルをビルドするように Quarkus に指示します。次に、以下を使用してシステムを実行します。

export QUARKUS_MODE=native
docker compose up --build

UIは前と同じように http://localhost:8080/quotes.html で公開されます。

さらに詳しく

このガイドでは、Quarkusを使ってRabbitMQと対話する方法を紹介しました。 SmallRye Reactive Messagingを利用して、データストリーミングアプリケーションを構築しています。

Kafkaをやった方は、同じコードであることに気づいたと思います。唯一の違いは、コネクタの設定とJSONのマッピングです。

関連コンテンツ