QuarkusアプリケーションでのAMQP入門
AMQP 1.0 は、アプリケーションや組織間でメッセージを渡すための標準仕様です。AMQP はシステムを接続し、ビジネスプロセスに必要な情報を供給し、システム間の通信を確実に処理します。AMQP は、イベント駆動型のアプリケーションで広く使用されている堅牢で成熟したプロトコルです。
この投稿は、 Kafkaの入門記事 と同等の内容を説明しますが、AMQPの使用方法に焦点を当てています。QuarkusアプリケーションでAMQPを使い始める方法を10ステップ以内で学ぶことができます。イベント駆動型マイクロサービスを構築するための宣言的アプローチである SmallRye Reactive Messaging を使用します。
完全なコードは GitHub から入手できます。 |
ステップ 1 - プロジェクトの生成
最初の一歩として、正しい依存関係を持つ新しいプロジェクト構造を取得してみましょう。 https://code.quarkus.io にアクセスして、グループIDとアーティファクトIDを入力します。そして、エクステンションのリストで以下を選択します:
-
SmallRye Reactive Messaging - AMQP コネクタ
-
RESTEasy Jackson
"Example Code" を無効にすることで、生成されたプロジェクトにサンプルコードが含まれないようにすることができます。 |
そして、 Generate your application をクリックして、プロジェクトをzipファイルとしてダウンロードし、解凍して、お気に入りのIDEにロードします。
生成された pom.xml
を開くと、 quarkus-smallrye-reactive-messaging-amqp
と quarkus-resteasy-jackson
の依存関係が宣言されていることがわかります。なのでコードを書く準備が出来ています。
ステップ2 - 何を交換するか?
交換するものが必要です。あまりオリジナリティを持たずに、 Movie
オブジェクトを送受信するとしましょう。プロジェクトの中で、以下の内容の org.acme.Movie
クラスを作成します:
package org.acme;
public class Movie {
public String title;
public int year;
}
AMQPでは、複数の データ セクション(または複数のAMQPシーケンス、または単一のAMQP値セクション)を持つことができる メッセージ を交換します。私たちのアプリケーションでは、 Movie
オブジェクトを交換しているので、インスタンスを JSON としてエンコードし、単一の データ セクションで転送しています。 content-type
ヘッダは application/json
です。
AMQPメッセージを送信先に送信します。簡単にするために、 movies という名前にしてみましょう。
ステップ3 - アプリケーションの設定
上記の通り、Reactive Messagingを使用します。リアクティブメッセージングを使用する場合、あるチャンネルにメッセージを送信し、別のチャンネルからメッセージを受信します。これらのチャンネルは、設定によって基礎となるメッセージング技術にマッピングされます。受信および公開チャンネルがアプリケーションで movies アドレスを使用することを示す必要があります。 src/main/resources/application.properties
で、次の内容を追加します:
# The AMQP broker location and credentials
amqp-host=localhost
amqp-port=5672
amqp-username=quarkus
amqp-password=quarkus
# Configuring the incoming channel (reading from AMQP)
mp.messaging.incoming.movies-in.connector=smallrye-amqp
mp.messaging.incoming.movies-in.address=movies
# Configuring the outgoing channel (writing to AMQP)
mp.messaging.outgoing.movies-out.connector=smallrye-amqp
mp.messaging.outgoing.movies-out.address=movies
ブローカーの場所と資格情報( amqp-
プロパティー)を設定した後、 movies-in
(レコードを受信)と movies-out
(レコードを公開)の2つのチャンネルを設定します。
チャンネルの設定には、 mp.messaging.incoming.movies-in
というプレフィックスを使用します。 connector
属性は、このチャネルの責任者(ここでは AMQP コネクタ)を示します。また、 address
属性を使用して、消費先を指定する必要があります。
アウトバウンド movies-out
チャネルを設定するには、 mp.messaging.outgoing.movies-out
プレフィックスを使用します。そのチャネルを担当するコネクタを示すだけでなく、アドレスも設定する必要があります。
ステップ4 - AMQPへのムービーの公開
さて、いよいよメッセージを送信する時が来ました。以下の内容で org.acme.MovieProducer
クラスを作成します:
package org.acme;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@ApplicationScoped
public class MovieProducer {
@Inject
@Channel("movies-out")
Emitter<Movie> emitter;
public void send(Movie movie) {
emitter.send(movie);
}
}
このクラスでは、 Emitter,
、つまりチャンネルへのメッセージ送信を担当するオブジェクトを注入します。このエミッタは movies-out
チャンネルにアタッチされています (AMQP にメッセージを送信します)。コネクタは自動的にコンテンツを JSON としてエンコードし、 content-type
ヘッダを設定します。
ペイロードが JSON にエンコードできることを確認する必要があります。 |
そこで、私たちのアプリケーションの残りの部分は、 send
メソッドを使用して、AMQP の宛先にムービーを送信することができます。
ステップ5 - 映画を消費する
今度は反対側に目を向けて、AMQPからムービーを取得してみましょう。
package org.acme;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MovieConsumer {
private final Logger logger = Logger.getLogger(MovieConsumer.class);
@Incoming("movies-in")
public void receive(Movie movie) {
logger.infof("Got a movie: %d - %s", movie.year, movie.title);
}
}
ここでは、 @Incoming
アノテーションを使用して、受信したレコードごとに receive
メソッドを呼び出すようにQuarkusに指示しています。
ムービーは JSON にエンコードされていることを思い出してください。コネクタが受信した JSON から Movie
を生成するのを支援する必要があります。
以下の内容の org.acme.JsonToObjectConverter
クラスを作成します:
package org.acme;
import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.amqp.IncomingAmqpMetadata;
import io.vertx.core.json.JsonObject;
import org.eclipse.microprofile.reactive.messaging.Message;
import javax.enterprise.context.ApplicationScoped;
import java.lang.reflect.Type;
@ApplicationScoped
public class JsonToObjectConverter implements MessageConverter {
@Override
public boolean canConvert(Message<?> in, Type target) {
return in.getMetadata(IncomingAmqpMetadata.class)
.map(meta -> meta.getContentType().equals("application/json") && target instanceof Class)
.orElse(false);
}
@Override
public Message<?> convert(Message<?> in, Type target) {
return in.withPayload(((JsonObject) in.getPayload()).mapTo((Class<?>) target));
}
}
このクラスは converter です。 Message
の内容を別の型にマッピングします。 canConvert
メソッドでは、受信メッセージが AMQP からのものであること (つまり IncomingAmqpMetadata
メタデータを含むこと) と、content-type が application/json
に設定されていることを確認します。 convert
メソッドは、受信した JsonObject
をターゲット型 (私たちの場合は Movie
) にマッピングします。
このコンバータにより、 consume
メソッドは Movie
オブジェクトを受け取ります。
ステップ6 - RESTエンドポイントからのムービーの送信
RESTエンドポイントからAMQPにメッセージを送信することはよくあることです。これを行うには、以下の内容で org.acme.MovieResource
クラスを作成します:
package org.acme;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@Path("/")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class MovieResource {
@Inject
MovieProducer producer;
@POST
public Response send(Movie movie) {
producer.send(movie);
// Return an 202 - Accepted response.
return Response.accepted().build();
}
}
このクラスは、上で実装した MovieProducer
を使用して movies
を送信します。 Emitter
を直接使用することもできます。
ステップ7 - これを実行してみましょう!
まず、AMQPブローカー、例えば Apache ActiveMQ Artemis が必要です。 Getting Started with Artemis のドキュメントに従うか、以下の docker-compose.yaml
ファイルを使用してください:
version: '2'
services:
artemis:
image: vromero/activemq-artemis:2-alpine-latest
ports:
- "5672:5672"
- "8161:8161"
- "61616:61616"
environment:
ARTEMIS_USERNAME: quarkus
ARTEMIS_PASSWORD: quarkus
プロジェクト内に docker-compose.yaml
ファイルをコピーし、ターミナルから、ブローカーを起動します: docker-compose up -d
そして、アプリケーションを次のように実行します:
./mvnw quarkus:dev
アプリケーションは開発モードで動作します。つまり、コードをまだ変更することが出来、変更した場合、自動的にリロードされます。
別の端末で、次のようなHTTP POSTリクエストをいくつか送信します:
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1994, "title":"The Shawshank Redemption"}' \
http://localhost:8080/
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1972, "title":"The Godfather"}' \
http://localhost:8080/
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":2008, "title":"The Dark Knight"}' \
http://localhost:8080/
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1994, "title":"Pulp Fiction"}' \
http://localhost:8080/
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":2010, "title":"Inception"}' \
http://localhost:8080/
アプリケーションを実行しているターミナルでは、以下のように表示されます:
...
2021-01-27 09:29:41,087 INFO [org.acm.MovieConsumer] (vert.x-eventloop-thread-9) Got a movie: 1994 - Pulp Fiction
2021-01-27 09:29:41,114 INFO [org.acm.MovieConsumer] (vert.x-eventloop-thread-9) Got a movie: 2010 - Inception
...
動いています!
ステップ8 - ネイティブパッケージ
GraalVMが正しくインストールされ、設定されている場合 、このアプリケーションをネイティブ実行可能ファイルとしてパッケージ化することができます:
./mvnw package -Pnative
そして、 ./target/getting-started-amqp-1.0.0-SNAPSHOT-runner
でネイティブ実行可能ファイルを実行すると、AMQPを使用したQuarkusアプリケーションが数ミリ秒で起動し、とんでもない量:100レコードを取り込んでもたったの33MB!になります。
$ rss getting-started-amqp-1.0.0-SNAPSHOT-runner
PID 0M COMMAND
54986 33M ./target/getting-started-amqp-1.0.0-SNAPSHOT-runner
まとめ
10分もしないうちに、AMQPを使った新しいQuarkusアプリケーションができました。もっと詳しく知りたい方は、 AMQPガイド をチェックしてみてください。