The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

QuarkusアプリケーションでのAMQP入門

AMQP 1.0 は、アプリケーションや組織間でメッセージを渡すための標準仕様です。AMQP はシステムを接続し、ビジネスプロセスに必要な情報を供給し、システム間の通信を確実に処理します。AMQP は、イベント駆動型のアプリケーションで広く使用されている堅牢で成熟したプロトコルです。

この投稿は、 Kafkaの入門記事 と同等の内容を説明しますが、AMQPの使用方法に焦点を当てています。QuarkusアプリケーションでAMQPを使い始める方法を10ステップ以内で学ぶことができます。イベント駆動型マイクロサービスを構築するための宣言的アプローチである SmallRye Reactive Messaging を使用します。

完全なコードは GitHub から入手できます。

ステップ 1 - プロジェクトの生成

最初の一歩として、正しい依存関係を持つ新しいプロジェクト構造を取得してみましょう。 https://code.quarkus.io にアクセスして、グループIDとアーティファクトIDを入力します。そして、エクステンションのリストで以下を選択します:

  • SmallRye Reactive Messaging - AMQP コネクタ

  • RESTEasy Jackson

getting started amqp code

"Example Code" を無効にすることで、生成されたプロジェクトにサンプルコードが含まれないようにすることができます。

そして、 Generate your application をクリックして、プロジェクトをzipファイルとしてダウンロードし、解凍して、お気に入りのIDEにロードします。

生成された pom.xml を開くと、 quarkus-smallrye-reactive-messaging-amqpquarkus-resteasy-jackson の依存関係が宣言されていることがわかります。なのでコードを書く準備が出来ています。

ステップ2 - 何を交換するか?

交換するものが必要です。あまりオリジナリティを持たずに、 Movie オブジェクトを送受信するとしましょう。プロジェクトの中で、以下の内容の org.acme.Movie クラスを作成します:

package org.acme;

public class Movie {

    public String title;
    public int year;

}

AMQPでは、複数の データ セクション(または複数のAMQPシーケンス、または単一のAMQP値セクション)を持つことができる メッセージ を交換します。私たちのアプリケーションでは、 Movie オブジェクトを交換しているので、インスタンスを JSON としてエンコードし、単一の データ セクションで転送しています。 content-type ヘッダは application/json です。

AMQPメッセージを送信先に送信します。簡単にするために、 movies という名前にしてみましょう。

ステップ3 - アプリケーションの設定

上記の通り、Reactive Messagingを使用します。リアクティブメッセージングを使用する場合、あるチャンネルにメッセージを送信し、別のチャンネルからメッセージを受信します。これらのチャンネルは、設定によって基礎となるメッセージング技術にマッピングされます。受信および公開チャンネルがアプリケーションで movies アドレスを使用することを示す必要があります。 src/main/resources/application.properties で、次の内容を追加します:

# The AMQP broker location and credentials
amqp-host=localhost
amqp-port=5672
amqp-username=quarkus
amqp-password=quarkus

# Configuring the incoming channel (reading from AMQP)
mp.messaging.incoming.movies-in.connector=smallrye-amqp
mp.messaging.incoming.movies-in.address=movies

# Configuring the outgoing channel (writing to AMQP)
mp.messaging.outgoing.movies-out.connector=smallrye-amqp
mp.messaging.outgoing.movies-out.address=movies

ブローカーの場所と資格情報( amqp- プロパティー)を設定した後、 movies-in (レコードを受信)と movies-out (レコードを公開)の2つのチャンネルを設定します。

チャンネルの設定には、 mp.messaging.incoming.movies-in というプレフィックスを使用します。 connector 属性は、このチャネルの責任者(ここでは AMQP コネクタ)を示します。また、 address 属性を使用して、消費先を指定する必要があります。

アウトバウンド movies-out チャネルを設定するには、 mp.messaging.outgoing.movies-out プレフィックスを使用します。そのチャネルを担当するコネクタを示すだけでなく、アドレスも設定する必要があります。

ステップ4 - AMQPへのムービーの公開

さて、いよいよメッセージを送信する時が来ました。以下の内容で org.acme.MovieProducer クラスを作成します:

package org.acme;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped
public class MovieProducer {

    @Inject
    @Channel("movies-out")
    Emitter<Movie> emitter;

    public void send(Movie movie) {
        emitter.send(movie);
    }
}

このクラスでは、 Emitter, 、つまりチャンネルへのメッセージ送信を担当するオブジェクトを注入します。このエミッタは movies-out チャンネルにアタッチされています (AMQP にメッセージを送信します)。コネクタは自動的にコンテンツを JSON としてエンコードし、 content-type ヘッダを設定します。

ペイロードが JSON にエンコードできることを確認する必要があります。

そこで、私たちのアプリケーションの残りの部分は、 send メソッドを使用して、AMQP の宛先にムービーを送信することができます。

ステップ5 - 映画を消費する

今度は反対側に目を向けて、AMQPからムービーを取得してみましょう。

package org.acme;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MovieConsumer {

    private final Logger logger = Logger.getLogger(MovieConsumer.class);

    @Incoming("movies-in")
    public void receive(Movie movie) {
        logger.infof("Got a movie: %d - %s", movie.year, movie.title);
    }
}

ここでは、 @Incoming アノテーションを使用して、受信したレコードごとに receive メソッドを呼び出すようにQuarkusに指示しています。

ムービーは JSON にエンコードされていることを思い出してください。コネクタが受信した JSON から Movie を生成するのを支援する必要があります。

以下の内容の org.acme.JsonToObjectConverter クラスを作成します:

package org.acme;

import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.amqp.IncomingAmqpMetadata;
import io.vertx.core.json.JsonObject;
import org.eclipse.microprofile.reactive.messaging.Message;

import javax.enterprise.context.ApplicationScoped;
import java.lang.reflect.Type;

@ApplicationScoped
public class JsonToObjectConverter implements MessageConverter {

    @Override
    public boolean canConvert(Message<?> in, Type target) {
        return in.getMetadata(IncomingAmqpMetadata.class)
                .map(meta -> meta.getContentType().equals("application/json")  && target instanceof Class)
                .orElse(false);

    }

    @Override
    public Message<?> convert(Message<?> in, Type target) {
        return in.withPayload(((JsonObject) in.getPayload()).mapTo((Class<?>) target));
    }
}

このクラスは converter です。 Message の内容を別の型にマッピングします。 canConvert メソッドでは、受信メッセージが AMQP からのものであること (つまり IncomingAmqpMetadata メタデータを含むこと) と、content-type が application/json に設定されていることを確認します。 convert メソッドは、受信した JsonObject をターゲット型 (私たちの場合は Movie ) にマッピングします。

このコンバータにより、 consume メソッドは Movie オブジェクトを受け取ります。

ステップ6 - RESTエンドポイントからのムービーの送信

RESTエンドポイントからAMQPにメッセージを送信することはよくあることです。これを行うには、以下の内容で org.acme.MovieResource クラスを作成します:

package org.acme;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class MovieResource {

    @Inject
    MovieProducer producer;

    @POST
    public Response send(Movie movie) {
        producer.send(movie);
        // Return an 202 - Accepted response.
        return Response.accepted().build();
    }
}

このクラスは、上で実装した MovieProducer を使用して movies を送信します。 Emitter を直接使用することもできます。

ステップ7 - これを実行してみましょう!

まず、AMQPブローカー、例えば Apache ActiveMQ Artemis が必要です。 Getting Started with Artemis のドキュメントに従うか、以下の docker-compose.yaml ファイルを使用してください:

version: '2'

services:

  artemis:
    image: vromero/activemq-artemis:2-alpine-latest
    ports:
      - "5672:5672"
      - "8161:8161"
      - "61616:61616"
    environment:
      ARTEMIS_USERNAME: quarkus
      ARTEMIS_PASSWORD: quarkus

プロジェクト内に docker-compose.yaml ファイルをコピーし、ターミナルから、ブローカーを起動します: docker-compose up -d

そして、アプリケーションを次のように実行します:

./mvnw quarkus:dev

アプリケーションは開発モードで動作します。つまり、コードをまだ変更することが出来、変更した場合、自動的にリロードされます。

別の端末で、次のようなHTTP POSTリクエストをいくつか送信します:

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1994, "title":"The Shawshank Redemption"}' \
http://localhost:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1972, "title":"The Godfather"}' \
http://localhost:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":2008, "title":"The Dark Knight"}' \
http://localhost:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1994, "title":"Pulp Fiction"}' \
http://localhost:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":2010, "title":"Inception"}' \
http://localhost:8080/

アプリケーションを実行しているターミナルでは、以下のように表示されます:

...
2021-01-27 09:29:41,087 INFO  [org.acm.MovieConsumer] (vert.x-eventloop-thread-9) Got a movie: 1994 - Pulp Fiction
2021-01-27 09:29:41,114 INFO  [org.acm.MovieConsumer] (vert.x-eventloop-thread-9) Got a movie: 2010 - Inception
...

動いています!

ステップ8 - ネイティブパッケージ

GraalVMが正しくインストールされ、設定されている場合 、このアプリケーションをネイティブ実行可能ファイルとしてパッケージ化することができます:

./mvnw package -Pnative

そして、 ./target/getting-started-amqp-1.0.0-SNAPSHOT-runner でネイティブ実行可能ファイルを実行すると、AMQPを使用したQuarkusアプリケーションが数ミリ秒で起動し、とんでもない量:100レコードを取り込んでもたったの33MB!になります。

$ rss getting-started-amqp-1.0.0-SNAPSHOT-runner
PID 0M COMMAND
54986 33M ./target/getting-started-amqp-1.0.0-SNAPSHOT-runner

まとめ

10分もしないうちに、AMQPを使った新しいQuarkusアプリケーションができました。もっと詳しく知りたい方は、 AMQPガイド をチェックしてみてください。