The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Using the event bus

Quarkusでは、異なる Bean が非同期イベントを使用して相互作用することができるため、疎結合化が促進されます。メッセージは 仮想アドレス に送信されます。3種類の配信メカニズムを提供しています。

  • point-to-point - send the message, one consumer receives it. If several consumers listen to the address, a round-robin is applied;

  • publish/subscribe - メッセージをパブリッシュし、アドレスをリスンしているすべてのコンシューマがメッセージを受信します。

  • request/reply - メッセージを送信し、応答を待ちます。受信者は非同期的にメッセージに応答することができます。

All these delivery mechanisms are non-blocking, and are providing one of the fundamental brick to build reactive applications.

非同期メッセージパッシング機能は、Reactive Messagingではサポートされていないメッセージへの返信を可能にします。ただし、シングルイベント動作(ストリームなし)とローカルメッセージに限定されます。

インストール

この仕組みはVert.x EventBusを利用しているので、この機能を利用するには vertx エクステンションを有効にする必要があります。新規プロジェクトを作成する場合は、 extensions パラメーターを以下のように設定します。

CLI
quarkus create app org.acme:vertx-quickstart \
    --extension=vertx,resteasy-reactive \
    --no-code
cd vertx-quickstart

Gradleプロジェクトを作成するには、 --gradle または --gradle-kotlin-dsl オプションを追加します。

Quarkus CLIのインストール方法については、Quarkus CLIガイドをご参照ください。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:2.11.1.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=vertx-quickstart \
    -Dextensions="vertx,resteasy-reactive" \
    -DnoCode
cd vertx-quickstart

Gradleプロジェクトを作成するには、 -DbuildTool=gradle または -DbuildTool=gradle-kotlin-dsl オプションを追加します。

すでに作成済みのプロジェクトがある場合は、 vertx エクステンションを既存のQuarkusプロジェクトに add-extension コマンドで追加することができます。

CLI
quarkus extension add 'vertx'
Maven
./mvnw quarkus:add-extension -Dextensions="vertx"
Gradle
./gradlew addExtension --extensions="vertx"

Otherwise, you can manually add this to the dependencies section of your build file:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-vertx</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-vertx")

イベントのコンシューム

イベントをコンシュームするには、 io.quarkus.vertx.ConsumeEvent アノテーションを使用します。

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
1 設定されていない場合、アドレスは Bean の完全修飾名となり、例えばこのスニペットでは org.acme.vertx.GreetingService となります。
2 メソッドのパラメーターはメッセージ本体です。メソッドが 何か を返す場合は、それがメッセージのレスポンスになります。

デフォルトでは、Vert.x イベントループで呼び出されるため、イベントをコンシュームするコードは ノンブロッキング でなければなりません。処理がブロッキングである場合は、 blocking 属性を使用してください。

@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
    // Something blocking
}

あるいは、メソッドに @io.smallrye.common.annotation.Blocking とアノテーションを付けることもできます。

@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
    // Something blocking
}

When using @Blocking, it ignores the value of the blocking attribute of @ConsumeEvent. See the Quarkus Reactive Architecture documentation for further details on this topic.

io.smallrye.mutiny.Uni または java.util.concurrent.CompletionStage のどちらかを返すことで、非同期処理も可能です。

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent
    public CompletionStage<String> consume(String name) {
        // return a CompletionStage completed when the processing is finished.
        // You can also fail the CompletionStage explicitly
    }

    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
Mutiny

The previous example uses Mutiny reactive types. If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library.

アドレスの設定

@ConsumeEvent アノテーションでアドレスを設定することができます。

@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
1 greeting アドレスに送信されたメッセージを受信する

応答

@ConsumeEvent でアノテーションされたメソッドの 戻り値 が、着信メッセージに対するレスポンスとして使用されます。例えば、次のスニペットでは、 String がレスポンスとして返されます。

@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}

また、 Uni<T>CompletionStage<T> を返すことで、非同期応答を扱うこともできます。。

@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}

Context Propagation エクステンションを使えば、 executor を注入することができます。

@Inject ManagedExecutor executor;

Alternatively, you can use the default Quarkus worker pool using:

Executor executor = Infrastructure.getDefaultWorkerPool();

fire and forget インタラクションの実装

You don’t have to reply to received messages. Typically, for a fire and forget interaction, the messages are consumed and the sender does not need to know about it. To implement this, your consumer method just returns void

@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}

Message の取り扱い

上でも述べたように、この仕組みはVert.xイベントバスをベースにしています。なので、直接 Message を使うこともできます。

@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}

失敗のハンドリング

If a method annotated with @ConsumeEvent throws an exception then:

  • if a reply handler is set then the failure is propagated back to the sender via an io.vertx.core.eventbus.ReplyException with code ConsumeEvent#FAILURE_CODE and the exception message,

  • if no reply handler is set then the exception is rethrown (and wrapped in a RuntimeException if necessary) and can be handled by the default exception handler, i.e. io.vertx.core.Vertx#exceptionHandler().

メッセージの送信

さて、メッセージを受信する方法を見てきましたが、次は送信者 に切り替えましょう。メッセージの送信とパブリッシュは Vert.x イベントバスを使います。

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;                                       (1)

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().transform(Message::body);
    }
}
1 イベントバスのインジェクト
2 アドレス greeting にメッセージを送信。メッセージのペイロードは name

EventBus オブジェクトは、以下のメソッドを提供します。

  1. send 特定のアドレスへのメッセージ - 一人のコンシューマーがメッセージを受信する。

  2. publish 特定のアドレスにメッセージを送る - すべてのコンシューマーがメッセージを受け取る。

  3. send メッセージを送って返信を待つ

// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().transform(Message::body);

物事をまとめる - HTTPとメッセージのブリッジング

greeting HTTP エンドポイントを再訪し、非同期メッセージパッシングを使用して分離された Bean に呼び出しを委譲してみましょう。リクエスト/レスポンスディスパッチの仕組みを利用しています。JAX-RS エンドポイント内にビジネスロジックを実装する代わりに、メッセージを送信しています。このメッセージは別の Bean によってコンシュームされ、応答は 返信 機構を使用して送信されます。

まず、新しいプロジェクトを作成します。

CLI
quarkus create app org.acme:vertx-http-quickstart \
    --extension=vertx,resteasy-reactive \
    --no-code
cd vertx-http-quickstart

Gradleプロジェクトを作成するには、 --gradle または --gradle-kotlin-dsl オプションを追加します。

Quarkus CLIのインストール方法については、Quarkus CLIガイドをご参照ください。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:2.11.1.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=vertx-http-quickstart \
    -Dextensions="vertx,resteasy-reactive" \
    -DnoCode
cd vertx-http-quickstart

Gradleプロジェクトを作成するには、 -DbuildTool=gradle または -DbuildTool=gradle-kotlin-dsl オプションを追加します。

You can already start the application in dev mode using:

CLI
quarkus dev
Maven
./mvnw quarkus:dev
Gradle
./gradlew --console=plain quarkusDev

そして、以下の内容の JAX-RS リソースを新規に作成します。

src/main/java/org/acme/vertx/EventResource.java
package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().transform(Message::body);            (2)
    }
}
1 greeting アドレスに name を送信し、レスポンスを要求します。
2 レスポンスを取得したら、ボディを抽出してユーザーに送信します。

このエンドポイントを呼び出すと、しばらく待ってからタイムアウトになります。実際、誰もリスンしていません。そこで、 greeting アドレスをリスンするコンシューマーが必要です。以下の内容の GreetingService Bean を作成します。

src/main/java/org/acme/vertx/GreetingService.java
package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }

}

この Bean は name を受け取り、 greeting メッセージを返します.

さて、ブラウザで http://localhost:8080/async/Quarkus を開くと、以下が表示されるはずです。

Hello Quarkus

より理解しやすくするために、HTTP リクエスト/レスポンスがどのように処理されたかを詳しく見てみましょう。

  1. リクエストは hello メソッドが受け取ります。

  2. name を含むメッセージがイベントバスに送信されます。

  3. 別の Bean がこのメッセージを受信して、レスポンスを計算します.

  4. このレスポンスは、応答機構を使用して返信されます。

  5. 送信者が応答を受信すると、HTTP レスポンスに内容が書き込まれます。

このアプリケーションは、以下の方法でパッケージングできます。

CLI
quarkus build
Maven
./mvnw clean package
Gradle
./gradlew build

以下でネイティブ実行可能ファイルとしてコンパイルすることもできます。

CLI
quarkus build --native
Maven
./mvnw package -Dnative
Gradle
./gradlew build -Dquarkus.package.type=native

コーデックを使う

Vert.xイベントバス では、コーデックを使用してオブジェクトの シリアライズデシリアライズ を行います。Quarkusでは、ローカル配信用のデフォルトのコーデックを提供しています。そのため、以下のようにオブジェクトを交換することができます。

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", new MyName(name))
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello " + name.getName());
}

特定のコーデックを使用したい場合は、両サイドで明示的に設定する必要があります。

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", name,
        new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting", codec = MyNameCodec.class)            (2)
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 メッセージの送信に使用するコーデックの名前を設定します。
2 メッセージの受信に使用するコーデックを設定します。