The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.
このページを編集

イベントバスの利用

Quarkusでは、異なる Bean が非同期イベントを使用して相互作用することができるため、疎結合化が促進されます。メッセージは 仮想アドレス に送信されます。3種類の配信メカニズムを提供しています。

  • point-to-point - メッセージを送信し、1 つのコンシューマーがそのメッセージを受信します。複数のコンシューマーがアドレスで待ち受けている場合、ラウンドロビンが適用されます。

  • publish/subscribe - メッセージを発行し、アドレスで待ち受けているすべてのコンシューマーがメッセージを受信します。

  • request/reply - メッセージを送信し、応答を待ちます。受信者は非同期的にメッセージに応答することができます。

これらの配信メカニズムはすべてノンブロッキングであり、リアクティブアプリケーションを構築するための基本的な要素の1つを提供しています。

非同期メッセージパッシング機能は、Reactive Messagingではサポートされていないメッセージへの返信を可能にします。ただし、シングルイベント動作(ストリームなし)とローカルメッセージに限定されます。

インストール

このメカニズムは Vert.x EventBus を使用するため、この機能を使用するには vertx エクステンションを有効にする必要があります。 新しいプロジェクトを作成する場合は、 extensions パラメータを次のように設定します:

コマンドラインインタフェース
quarkus create app org.acme:vertx-quickstart \
    --extension='vertx,rest' \
    --no-code
cd vertx-quickstart

Gradleプロジェクトを作成するには、 --gradle または --gradle-kotlin-dsl オプションを追加します。

Quarkus CLIのインストールと使用方法の詳細については、 Quarkus CLI ガイドを参照してください。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.17.4:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=vertx-quickstart \
    -Dextensions='vertx,rest' \
    -DnoCode
cd vertx-quickstart

Gradleプロジェクトを作成するには、 -DbuildTool=gradle または -DbuildTool=gradle-kotlin-dsl オプションを追加します。

Windowsユーザーの場合:

  • cmdを使用する場合、(バックスラッシュ \ を使用せず、すべてを同じ行に書かないでください)。

  • Powershellを使用する場合は、 -D パラメータを二重引用符で囲んでください。例: "-DprojectArtifactId=vertx-quickstart"

すでに作成済みのプロジェクトがある場合は、 vertx エクステンションを既存のQuarkusプロジェクトに add-extension コマンドで追加することができます。

コマンドラインインタフェース
quarkus extension add vertx
Maven
./mvnw quarkus:add-extension -Dextensions='vertx'
Gradle
./gradlew addExtension --extensions='vertx'

そうでない場合は、 pom.xml ファイルの依存関係セクションに手動で追加することができます。

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-vertx</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-vertx")

イベントの消費

イベントを消費するには、 io.quarkus.vertx.ConsumeEvent アノテーションを使用します。

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
1 設定されていない場合、アドレスは Bean の完全修飾名となり、例えばこのスニペットでは org.acme.vertx.GreetingService となります。
2 メソッドのパラメーターはメッセージボディです。メソッドが 何か を返す場合は、それがメッセージのレスポンスになります。

デフォルトでは、Vert.x イベントループで呼び出されるため、イベントをコンシュームするコードは ノンブロッキング でなければなりません。処理がブロッキングである場合は、 blocking 属性を使用してください。

@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
    // Something blocking
}

あるいは、メソッドに @io.smallrye.common.annotation.Blocking アノテーションを付けることもできます。

@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
    // Something blocking
}

@Blocking を使用する場合は、 @ConsumeEventblocking 属性の値は無視されます。このトピックの詳細については、 Quarkus リアクティブアーキテクチャのドキュメント を参照してください。

io.smallrye.mutiny.Uni または java.util.concurrent.CompletionStage のどちらかを返すことで、非同期処理を行うことも可能です。

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent
    public CompletionStage<String> consume(String name) {
        // return a CompletionStage completed when the processing is finished.
        // You can also fail the CompletionStage explicitly
    }

    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
Mutiny

前の例はMutinyリアクティブ型を使用しています。Mutinyに慣れていない方は、 Mutiny - 直感的なリアクティブプログラミングライブラリをご覧ください。

アドレスの設定

@ConsumeEvent アノテーションでアドレスを設定することができます。

@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
1 greeting アドレスに送信されたメッセージを受信

応答

@ConsumeEvent でアノテーションされたメソッドの 戻り値 が、着信メッセージに対するレスポンスとして使用されます。例えば、次のスニペットでは、 String がレスポンスとして返されます。

@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}

また、 Uni<T>CompletionStage<T> を返すことで、非同期レスポンスを扱うことも可能です。

@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}

Context Propagation エクステンションを利用することで、 executor を注入することができます。

@Inject ManagedExecutor executor;

あるいは、デフォルトの Quarkus ワーカープールを使用することもできます。

Executor executor = Infrastructure.getDefaultWorkerPool();

fire and forget (撃ち放し) のやりとりの実装

受信したメッセージに返信する必要はありません。通常、 fire and forget(撃ち放し) のやりとりでは、送信者はメッセージが消費されたことを知る必要はありません。これを実装するためには、コンシューマーメソッドは void を返します。

@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}

メッセージの取り扱い

先述のように、この仕組みは Vert.x イベントバスをベースにしているため、直接 Message を使うこともできます。

@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}

失敗時の対応

@ConsumeEvent がアノテートされたメソッドが例外を発生させた場合、以下のようになります。

  • 返信ハンドラーが設定されている場合、失敗はコード ConsumeEvent#FAILURE_CODE と例外メッセージを含んだ io.vertx.core.eventbus.ReplyException を通して送信者に伝搬されます。

  • 返答ハンドラーが設定されていない場合、例外は必要に応じて RuntimeException でラップされて再スローされ、デフォルトの例外ハンドラー、すなわち io.vertx.core.Vertx#exceptionHandler() で処理されます。

メッセージの送信

さて、メッセージを受信する方法を見てきましたが、次は送信者 に切り替えましょう。メッセージの送信とパブリッシュは Vert.x イベントバスを使います。

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;                                       (1)

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().transform(Message::body);
    }
}
1 イベントバスの注入
2 アドレス greeting に、ペイロード name を持つメッセージを送信します。

EventBus オブジェクトは、以下のメソッドを提供します。

  1. 特定のアドレスにメッセージを send (送信) - 一つのコンシューマーがメッセージを受信する。

  2. 特定のアドレスにメッセージを publish (発行) - すべてのコンシューマーがメッセージを受信する。

  3. メッセージを send (送信)し、返信を非同期に待つ

  4. メッセージを send (送信)し、返信を同期に待つ

// Case 1
bus.<String>requestAndForget("greeting", name);
// Case 2
bus.publish("greeting", name);
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().transform(Message::body);
// Case 4
String response = bus.<String>requestAndAwait("greeting", name).body();

登場した要素の組み立て - HTTP とメッセージの橋渡し

グリーティングHTTPエンドポイントをもう一度修正し,非同期メッセージパッシングを使用して,呼び出しを別のBeanに委譲してみよう。これは、request/replyディスパッチ機構を使用しています。Jakarta RESTエンドポイント内でビジネスロジックを実装する代わりに、メッセージを送信しています。このメッセージは別の Bean によって消費され、応答は reply メカニズムを使用して送信されます。

最初に、以下を使用して新しいプロジェクトを作成します。

コマンドラインインタフェース
quarkus create app org.acme:vertx-http-quickstart \
    --extension='vertx,rest' \
    --no-code
cd vertx-http-quickstart

Gradleプロジェクトを作成するには、 --gradle または --gradle-kotlin-dsl オプションを追加します。

Quarkus CLIのインストールと使用方法の詳細については、 Quarkus CLI ガイドを参照してください。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.17.4:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=vertx-http-quickstart \
    -Dextensions='vertx,rest' \
    -DnoCode
cd vertx-http-quickstart

Gradleプロジェクトを作成するには、 -DbuildTool=gradle または -DbuildTool=gradle-kotlin-dsl オプションを追加します。

Windowsユーザーの場合:

  • cmdを使用する場合、(バックスラッシュ \ を使用せず、すべてを同じ行に書かないでください)。

  • Powershellを使用する場合は、 -D パラメータを二重引用符で囲んでください。例: "-DprojectArtifactId=vertx-http-quickstart"

./mvnw compile quarkus:dev を実行すると、自動的に 開発者モード でアプリケーションを起動することができます。

コマンドラインインタフェース
quarkus dev
Maven
./mvnw quarkus:dev
Gradle
./gradlew --console=plain quarkusDev

次に、以下の内容で新しいJakarta RESTリソースを作成します:

src/main/java/org/acme/vertx/EventResource.java
package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().transform(Message::body);            (2)
    }
}
1 greeting アドレスに name を送信し、レスポンスを要求します。
2 レスポンスを取得したら、ボディを抽出してユーザーに送信します。

このエンドポイントを呼び出すと、しばらく待ってからタイムアウトになります。実際、誰もリスンしていません。そこで、 greeting アドレスをリスンするコンシューマーが必要です。以下の内容の GreetingService Bean を作成します。

src/main/java/org/acme/vertx/GreetingService.java
package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }

}

この Bean は name を受け取り、 greeting メッセージを返します.

ブラウザで http://localhost:8080/async/Quarkus を開くと、以下が表示されます。

Hello Quarkus

より理解しやすくするために、HTTP リクエスト/レスポンスがどのように処理されたかを詳しく見てみます。

  1. リクエストは hello メソッドが受け取ります。

  2. name を含むメッセージがイベントバスに送信されます。

  3. 別の Bean がこのメッセージを受信して、レスポンスを計算します。

  4. このレスポンスは、応答メカニズムによって返信されます。

  5. 送信者が応答を受信すると、HTTP レスポンスに内容が書き込まれます。

このアプリケーションは、以下の方法でパッケージングすることができます。

コマンドラインインタフェース
quarkus build
Maven
./mvnw install
Gradle
./gradlew build

以下でネイティブ実行可能ファイルとしてコンパイルすることもできます。

コマンドラインインタフェース
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.native.enabled=true

コーデックの利用

Vert.xイベントバス では、コーデックを使用してオブジェクトの シリアライズデシリアライズ を行います。Quarkusでは、ローカル配信用のデフォルトのコーデックを提供しています。そのため、以下のようにオブジェクトを交換することができます。

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", new MyName(name))
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello " + name.getName());
}

特定のコーデックを使用したい場合は、両側で明示的に設定する必要があります。

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", name,
        new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting", codec = MyNameCodec.class)            (2)
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 メッセージの送信に使用するコーデックの名前を設定します。
2 メッセージの受信に使用するコーデックを設定します。

関連コンテンツ