The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Vert.x リファレンスガイド

Vert.xは、リアクティブなアプリケーションを構築するためのツールキットです。 Quarkus Reactive Architectureに記載されているように、QuarkusではVert.xを下地に使用しています。

このガイドは、 QuarkusアプリケーションからEclipse Vert.x APIの使用 ガイドの姉妹編です。Quarkusで使用されるVert.xインスタンスの使用方法や設定について、より詳細に説明しています。

Vert.xインスタンスへのアクセス

マネージドVert.xインスタンスにアクセスするには、 quarkus-vertx エクステンションをプロジェクトに追加してください。この依存関係がすでにインストールされている可能性があることに注意してください(推移的依存関係として)。

このエクステンションでは、フィールドまたはコンストラクタ・インジェクションのいずれかを使用して、Vert.xのマネージドインスタンスを取得できます。

@ApplicationScoped
public class MyBean {
// Field injection
@Inject Vertx vertx;

// Constructor injection
MyBean(Vertx vertx) {
    // ...
}

}

以下のどちらかを注入することができます。

  • 裸の Vert.x APIを公開する io.vertx.core.Vertx インスタンス

  • Mutiny APIを公開する io.vertx.mutiny.core.Vertx インスタンス

Mutiny版は、Quarkusが提供する他のリアクティブAPIと統合されているため、Mutiny版の使用をお勧めします。

Mutiny

Mutinyに慣れていない方は、 Mutiny - 直感的なリアクティブプログラミングライブラリをご覧ください。

Vert.x Mutiny版に関するドキュメントは https://smallrye.io/smallrye-mutiny-vertx-bindings にあります。

Vert.xインスタンスの設定

application.properties ファイルから Vert.x インスタンスを設定することができます。次の表は、サポートされているプロパティの一覧です。

ビルド時に固定される設定プロパティ - それ以外の設定プロパティは実行時に上書き可能

Configuration property

タイプ

デフォルト

Enables or disables the Vert.x cache.

Environment variable: QUARKUS_VERTX_CACHING

boolean

true

Enables or disabled the Vert.x classpath resource resolver.

Environment variable: QUARKUS_VERTX_CLASSPATH_RESOLVING

boolean

true

The number of event loops. 2 x the number of core by default.

Environment variable: QUARKUS_VERTX_EVENT_LOOPS_POOL_SIZE

int

The maximum amount of time the event loop can be blocked.

Environment variable: QUARKUS_VERTX_MAX_EVENT_LOOP_EXECUTE_TIME

Duration

2S

The amount of time before a warning is displayed if the event loop is blocked.

Environment variable: QUARKUS_VERTX_WARNING_EXCEPTION_TIME

Duration

2S

The size of the worker thread pool.

Environment variable: QUARKUS_VERTX_WORKER_POOL_SIZE

int

20

The maximum amount of time the worker thread can be blocked.

Environment variable: QUARKUS_VERTX_MAX_WORKER_EXECUTE_TIME

Duration

60S

The size of the internal thread pool (used for the file system).

Environment variable: QUARKUS_VERTX_INTERNAL_BLOCKING_POOL_SIZE

int

20

The queue size. For most applications this should be unbounded

Environment variable: QUARKUS_VERTX_QUEUE_SIZE

int

The executor growth resistance. A resistance factor applied after the core pool is full; values applied here will cause that fraction of submissions to create new threads when no idle thread is available. A value of 0.0f implies that threads beyond the core size should be created as aggressively as threads within it; a value of 1.0f implies that threads beyond the core size should never be created.

Environment variable: QUARKUS_VERTX_GROWTH_RESISTANCE

float

0f

The amount of time a thread will stay alive with no work.

Environment variable: QUARKUS_VERTX_KEEP_ALIVE_TIME

Duration

30S

Prefill thread pool when creating a new Executor. When io.vertx.core.spi.ExecutorServiceFactory.createExecutor is called, initialise with the number of defined threads at startup

Environment variable: QUARKUS_VERTX_PREFILL

boolean

false

Enables the async DNS resolver.

Environment variable: QUARKUS_VERTX_USE_ASYNC_DNS

boolean

false

PEM Key/cert config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM

boolean

false

Comma-separated list of the path to the key files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM_KEYS

list of string

Comma-separated list of the path to the certificate files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM_CERTS

list of string

JKS config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS

boolean

false

Path of the key file (JKS format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS_PATH

string

Password of the key file.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS_PASSWORD

string

PFX config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX

boolean

false

Path to the key file (PFX format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX_PATH

string

Password of the key.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX_PASSWORD

string

PEM Trust config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PEM

boolean

false

Comma-separated list of the trust certificate files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PEM_CERTS

list of string

JKS config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS

boolean

false

Path of the key file (JKS format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS_PATH

string

Password of the key file.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS_PASSWORD

string

PFX config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX

boolean

false

Path to the key file (PFX format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX_PATH

string

Password of the key.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX_PASSWORD

string

The accept backlog.

Environment variable: QUARKUS_VERTX_EVENTBUS_ACCEPT_BACKLOG

int

The client authentication.

Environment variable: QUARKUS_VERTX_EVENTBUS_CLIENT_AUTH

string

NONE

The connect timeout.

Environment variable: QUARKUS_VERTX_EVENTBUS_CONNECT_TIMEOUT

Duration

60S

The idle timeout in milliseconds.

Environment variable: QUARKUS_VERTX_EVENTBUS_IDLE_TIMEOUT

Duration

The receive buffer size.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECEIVE_BUFFER_SIZE

int

The number of reconnection attempts.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECONNECT_ATTEMPTS

int

0

The reconnection interval in milliseconds.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECONNECT_INTERVAL

Duration

1S

Whether to reuse the address.

Environment variable: QUARKUS_VERTX_EVENTBUS_REUSE_ADDRESS

boolean

true

Whether to reuse the port.

Environment variable: QUARKUS_VERTX_EVENTBUS_REUSE_PORT

boolean

false

The send buffer size.

Environment variable: QUARKUS_VERTX_EVENTBUS_SEND_BUFFER_SIZE

int

The so linger.

Environment variable: QUARKUS_VERTX_EVENTBUS_SOLINGER

int

Enables or Disabled SSL.

Environment variable: QUARKUS_VERTX_EVENTBUS_SSL

boolean

false

Whether to keep the TCP connection opened (keep-alive).

Environment variable: QUARKUS_VERTX_EVENTBUS_TCP_KEEP_ALIVE

boolean

false

Configure the TCP no delay.

Environment variable: QUARKUS_VERTX_EVENTBUS_TCP_NO_DELAY

boolean

true

Configure the traffic class.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRAFFIC_CLASS

int

Enables or disables the trust all parameter.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_ALL

boolean

false

The host name.

Environment variable: QUARKUS_VERTX_CLUSTER_HOST

string

localhost

The port.

Environment variable: QUARKUS_VERTX_CLUSTER_PORT

int

The public host name.

Environment variable: QUARKUS_VERTX_CLUSTER_PUBLIC_HOST

string

The public port.

Environment variable: QUARKUS_VERTX_CLUSTER_PUBLIC_PORT

int

Enables or disables the clustering.

Environment variable: QUARKUS_VERTX_CLUSTER_CLUSTERED

boolean

false

The ping interval.

Environment variable: QUARKUS_VERTX_CLUSTER_PING_INTERVAL

Duration

20S

The ping reply interval.

Environment variable: QUARKUS_VERTX_CLUSTER_PING_REPLY_INTERVAL

Duration

20S

The maximum amount of time in seconds that a successfully resolved address will be cached. If not set explicitly, resolved addresses may be cached forever.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_MAX_TIME_TO_LIVE

int

2147483647

The minimum amount of time in seconds that a successfully resolved address will be cached.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_MIN_TIME_TO_LIVE

int

0

The amount of time in seconds that an unsuccessful attempt to resolve an address will be cached.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_NEGATIVE_TIME_TO_LIVE

int

0

The maximum number of queries to be sent during a resolution.

Environment variable: QUARKUS_VERTX_RESOLVER_MAX_QUERIES

int

4

The duration after which a DNS query is considered to be failed.

Environment variable: QUARKUS_VERTX_RESOLVER_QUERY_TIMEOUT

Duration

5S

Enable or disable native transport

Environment variable: QUARKUS_VERTX_PREFER_NATIVE_TRANSPORT

boolean

false

期間フォーマットについて

期間のフォーマットは標準の java.time.Duration フォーマットを使用します。詳細は Duration#parse() javadoc を参照してください。

数値で始まる期間の値を指定することもできます。この場合、値が数値のみで構成されている場合、コンバーターは値を秒として扱います。そうでない場合は、 PT が暗黙的に値の前に付加され、標準の java.time.Duration 形式が得られます。

Vert.xクライアントの使用

Vert.xコアに加えて、ほとんどのVert.xエコシステムライブラリを使用することができます。いくつかのQuarkusエクステンションは、すでにVert.xライブラリをラップしています。

利用可能なAPI

次の表は、Vert.xエコシステムで最も使用されているライブラリの一覧です。これらのAPIにアクセスするには、指定されたエクステンションまたは依存関係をプロジェクトに追加します。使用方法については、関連ドキュメントを参照してください。

API

エクステンションか依存関係

ドキュメント

AMQPクライアント

io.quarkus:quarkus-smallrye-reactive-messaging-amqp (extension)

https://quarkus.io/guides/amqp

サーキットブレーカー

io.smallrye.reactive:smallrye-mutiny-vertx-circuit-breaker (external dependency)

https://vertx.io/docs/vertx-circuit-breaker/java/

Consul クライアント

io.smallrye.reactive:smallrye-mutiny-vertx-consul-client (external dependency)

https://vertx.io/docs/vertx-consul-client/java/

DB2クライアント

io.quarkus:quarkus-reactive-db2-client (extension)

https://quarkus.io/guides/reactive-sql-clients

Kafkaクライアント

io.quarkus:quarkus-smallrye-reactive-messaging-kafka (extension)

https://quarkus.io/guides/kafka

メールクライアント

io.quarkus:quarkus-mailer (extension)

https://quarkus.io/guides/mailer

MQTTクライアント

io.quarkus:quarkus-smallrye-reactive-messaging-mqtt (extension)

No guide yet

MS SQLクライアント

io.quarkus:quarkus-reactive-mssql-client (extension)

https://quarkus.io/guides/reactive-sql-clients

MySQLクライアント

io.quarkus:quarkus-reactive-mysql-client (extension)

https://quarkus.io/guides/reactive-sql-clients

Oracleクライアント

io.quarkus:quarkus-reactive-oracle-client (extension)

https://quarkus.io/guides/reactive-sql-clients

PostgreSQLクライアント

io.quarkus:quarkus-reactive-pg-client (extension)

https://quarkus.io/guides/reactive-sql-clients

RabbitMQクライアント

io.smallrye.reactive:smallrye-mutiny-vertx-rabbitmq-client (external dependency)

https://vertx.io/docs/vertx-rabbitmq-client/java

Redisクライアント

io.quarkus:quarkus-redis-client (extension)

https://quarkus.io/guides/redis

ウェブクライアント

io.smallrye.reactive:smallrye-mutiny-vertx-web-client (external dependency)

https://vertx.io/docs/vertx-web-client/java/

Vert.x Mutiny APIの使用方法について詳しくは、 https://smallrye.io/smallrye-mutiny-vertx-bindings を参照してください。

使用例

このセクションでは、RESTEasy Reactiveアプリケーションの文脈でVert.x WebClient を使った例を紹介します。上の表にあるように、プロジェクトに以下の依存関係を追加します。

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
build.gradle
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")

さて、コードの中で、 WebClient のインスタンスを作成することが可能です。

package org.acme.vertx;


import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.smallrye.mutiny.Uni;

import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;

@Path("/fruit-data")
public class ResourceUsingWebClient {

    private final WebClient client;

    @Inject
    VertxResource(Vertx vertx) {
        this.client = WebClient.create(vertx);
    }

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("/{name}")
    public Uni<JsonObject> getFruitData(String name) {
        return client.getAbs("https://.../api/fruit/" + name)
                .send()
                .onItem().transform(resp -> {
                    if (resp.statusCode() == 200) {
                        return resp.bodyAsJsonObject();
                    } else {
                        return new JsonObject()
                                .put("code", resp.statusCode())
                                .put("message", resp.bodyAsString());
                    }
                });
    }

}

このリソースは、 WebClient を作成し、リクエストに応じて、このクライアントを使用してリモート HTTP API を呼び出します。結果に応じて、レスポンスは受信したまま転送されるか、エラーをラップしたJSONオブジェクトが作成されます。 WebClient は非同期(かつノンブロッキング)で、エンドポイントからは Uni が返されます。

このアプリケーションは、ネイティブ実行可能ファイルとしても実行できます。しかし、その前に、Quarkusに ssl を有効にするよう指示する必要があります(リモートAPIがHTTPSを使用している場合)。 src/main/resources/application.properties を開き、以下を追加します。

quarkus.ssl.native=true

そして、ネイティブ実行可能ファイルを作成します。

CLI
quarkus build --native
Maven
./mvnw package -Dnative
Gradle
./gradlew build -Dquarkus.package.type=native

Vert.x JSONの使用

Vert.xのAPIはしばしばJSONに依存しています。Vert.xは、JSONドキュメントを操作する2つの便利なクラスを提供しています: io.vertx.core.json.JsonObject および io.vertx.core.json.JsonArray

JsonObject は、オブジェクトをJSON表現にマッピングしたり、JSONドキュメントからオブジェクトを構築するために使用できます。

// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);

// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);

なお、これらの機能は、 quarkus-jackson エクステンションで管理されているマッパーを使用しています。マッピングをカスタマイズするには Jacksonの設定 を参照してください。

JSON ObjectとJSON Arrayは、QuarkusのHTTPエンドポイントのリクエストとレスポンスボディとしてサポートされています(従来のRESTEasyとRESTEasy Reactiveを使用)。これらのエンドポイントについて考えてみましょう。

package org.acme.vertx;

import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {

    @GET
    @Path("{name}/object")
    public JsonObject jsonObject(String name) {
        return new JsonObject().put("Hello", name);
    }

    @GET
    @Path("{name}/array")
    public JsonArray jsonArray(String name) {
        return new JsonArray().add("Hello").add(name);
    }
}
{"Hello":"Quarkus"}
["Hello","Quarkus"]

これは、JSONコンテンツがリクエストボディである場合や、 Uni, Multi, CompletionStage, Publisher で包まれている場合にも同様に機能します。

verticleの使用

Verticles は、 _Vert.x が提供する「シンプルでスケーラブルな、アクターのようなデプロイ・並行性モデル」です。このモデルは、厳密なアクターモデルの実装を主張するものではありませんが、特に並行性、スケーリング、デプロイに関する類似性を共有しています。このモデルを使用するには、バーティクルを書いて デプロイ し、イベントバスにメッセージを送信することで通信します。

Quarkusで verticles をデプロイすることができます。次をサポートしています:

  • 生の verticle - io.vertx.core.AbstractVerticle を拡張するJavaクラス

  • Mutiny verticle - io.smallrye.mutiny.vertx.core.AbstractVerticle を拡張するJavaクラス

verticleのデプロイ

verticleをデプロイするには、deployVerticle メソッドを使用します。

@Inject Vertx vertx;

// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });

Vert.x の Mutiny 版を使用する場合、 deployVerticle メソッドは Uni を返すため、実際のデプロイメントを行うにはサブスクリプションをトリガーする必要があることに注意してください。

アプリケーションの初期化時にVerticleをデプロイする方法については、次の例で説明します。

Verticleとしての@ApplicationScoped Beansの使用

一般的に、Vert.xのバーティクルはCDI Beanではありません。そのため、依存性注入は使用できません。しかし、QuarkusではVerticleをBeanとしてデプロイすることができます。この場合、CDI(QuarkusではArc)がインスタンスの作成を担当することに注意してください。

次のスニペットはその例です。

package io.quarkus.vertx.verticles;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {

    @ConfigProperty(name = "address") String address;

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer(address)
                .handler(m -> m.replyAndForget("hello"))
                .completionHandler();
    }
}

vertx のインスタンスを注入する必要はなく、代わりに AbstractVerticle の protected フィールドを利用します。

そして、Verticleインスタンスをデプロイします。

package io.quarkus.vertx.verticles;

import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;

@ApplicationScoped
public class VerticleDeployer {

    public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
         vertx.deployVerticle(verticle).await().indefinitely();
    }
}

露出しているすべての AbstractVerticle をデプロイしたい場合は、次のようにします。

public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
    for (AbstractVerticle verticle : verticles) {
        vertx.deployVerticle(verticle).await().indefinitely();
    }
}

そして、Verticleインスタンスをデプロイします。

@ApplicationScoped を使用する場合、Verticleのインスタンスは1つになります。複数のVerticleのインスタンスを持つことは、それらの間で負荷を共有するのに役立ちます。各々のインスタンスは、異なるI/Oスレッド(Vert.xイベントループ)に関連付けられます。

Verticleの複数のインスタンスをデプロイするには、 @ApplicationScoped の代わりに @Dependent スコープを使用します。

package org.acme.verticle;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;

import javax.enterprise.context.Dependent;
import javax.inject.Inject;

@Dependent
public class MyVerticle extends AbstractVerticle {

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer("address")
                .handler(m -> m.reply("Hello from " + this))
                .completionHandler();
    }
}

そして、verticleをデプロイします。

package org.acme.verticle;

import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;

@ApplicationScoped
public class MyApp {

    void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
        vertx
                .deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
                .await().indefinitely();
    }
}

init メソッドは、 Instance<MyVerticle> を受け取ります。そして、 deployVerticle メソッドにサプライヤーを渡します。サプライヤーは get() メソッドを呼び出しているだけです。 @Dependent スコープのおかげで、呼び出すたびに新しいインスタンスが返されます。最後に、希望する数のインスタンスを DeploymentOptions に渡します。例えば、前の例では2つです。サプライヤーを2回呼び出し、Verticleの2つのインスタンスを作成します。

イベントバスの利用

Vert.xには、Quarkusアプリケーションから使用できる イベントバスが組み込まれています。そのため、アプリケーションコンポーネント(CDI Bean、リソース…​)は、非同期イベントを使用して相互に作用することができ、疎結合を促進します。

イベントバスでは、 仮想アドレスメッセージ を送信します。イベントバスには、3種類の配送メカニズムが用意されています。

  • point-to-point - メッセージを送信すると、1人の消費者がそれを受信します。複数の消費者がそのアドレスを聞く場合は、ラウンドロビンが適用されます。

  • publish/subscribe - メッセージを発行し、そのアドレスを聞いているすべてのコンシューマーがメッセージを受信します。

  • request/reply - メッセージを送信し、応答を期待します。受信者は非同期的にメッセージに応答することができます。

これらの配信メカニズムはすべてノンブロッキングであり、リアクティブなアプリケーションを構築するための基本的な要素の一つとなっています。

イベントの消費

Vert.x APIを使用してコンシューマーを登録することができますが、Quarkusには宣言型のサポートがあります。イベントを消費するには、 io.quarkus.vertx.ConsumeEvent アノテーションを使用します。

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
1 設定されていない場合、アドレスはBeanの完全修飾名となります。例えば、このスニペットでは、 org.acme.vertx.GreetingService となります。
2 メソッドのパラメータはメッセージボディです。メソッドが 何か を返す場合は、それがメッセージのレスポンスとなります。

アドレスを設定

@ConsumeEvent アノテーションでアドレスを設定することができます。

@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
1 greeting アドレスに送信されたメッセージを受信します。

非同期処理

これまでの例では、同期処理を行っています。 io.smallrye.mutiny.Uni または java.util.concurrent.CompletionStage を返却することで、非同期処理も可能です。

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent
    public CompletionStage<String> consume(String name) {
        // return a CompletionStage completed when the processing is finished.
        // You can also fail the CompletionStage explicitly
    }

    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
Mutiny

前の例はMutinyリアクティブ型を使用しています。Mutinyに慣れていない方は、 Mutiny - 直感的なリアクティブプログラミングライブラリをご覧ください。

ブロッキング処理

デフォルトでは、イベントを消費するコードは、I/Oスレッドで呼び出されるため、 ノンブロッキング でなければなりません。処理がブロッキングの場合は、 @io.smallrye.common.annotation.Blocking アノテーションを使用してください。

@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
    // Something blocking
}

あるいは、 @ConsumeEvent アノテーションの blocking 属性を使用することもできます。

@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
    // Something blocking
}

@Blocking を使用する場合、 @ConsumeEventblocking 属性の値は無視されます。

メッセージへの返信

@ConsumeEvent でアノテーションされたメソッドの 戻り 値は、受信したメッセージへの応答に使用されます。たとえば、次のスニペットでは、返された String が応答となります。

@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}

また、 Uni<T>CompletionStage<T> を返すことで、非同期応答を扱うこともできます:

@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}

コンテキスト伝搬エクステンションを使えば、 executor を注入することができます。

@Inject Executor executor;

fire and forget インタラクションの実装

受信したメッセージに返信する必要はありません。通常、 fire and forget のインタラクションでは、メッセージは消費され、送信者はそのことを知る必要はありません。このパターンを実装するために、コンシューマー・メソッドは void を返します。

@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}

メッセージの処理

ペイロード を直接使用する前の例とは異なり、 Message を直接使用することもできます。

@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}

失敗のハンドリング

@ConsumeEvent でアノテーションされたメソッドが例外を発生させた場合、

  • 返信ハンドラが設定されている場合、失敗はコード ConsumeEvent#FAILURE_CODE と例外メッセージを含む io.vertx.core.eventbus.ReplyException を通じて送信者に伝えられます。

  • 応答ハンドラが設定されていない場合、例外は再スローされ(必要に応じて RuntimeException でラップされる)、デフォルトの例外ハンドラ、すなわち io.vertx.core.Vertx#exceptionHandler() で処理することができます。

メッセージの送信

メッセージの送信と公開にはVert.x Event busを使用します。

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;                                            (1)

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().transform(Message::body);
    }
}
1 Event busの注入
2 アドレス greeting にメッセージを送信します。メッセージのペイロードは name です

EventBus オブジェクトは、以下のメソッドを提供します。

  1. send 特定のアドレスへのメッセージ - 一人のコンシューマーがメッセージを受信する。

  2. publish 特定のアドレスにメッセージを送る - すべてのコンシューマーがメッセージを受け取る。

  3. request メッセージを送って返信を期待する

// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().transform(Message::body);

コーデックの使用

Vert.xイベントバス では、コーデックを使用してオブジェクトの シリアライズデシリアライズ を行います。Quarkusでは、ローカル配信用のデフォルトのコーデックを提供しています。そのため、以下のようにオブジェクトを交換することができます。

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", new MyName(name))
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello " + name.getName());
}

特定のコーデックを使用したい場合は、両サイドで明示的に設定する必要があります。

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", name,
        new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting", codec = MyNameCodec.class)            (2)
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 メッセージの送信に使用するコーデックの名前を設定
2 メッセージの受信に使用するコーデックの設定

HTTPとイベントバスの融合

greeting HTTPエンドポイントに戻り、非同期メッセージパッシングを使用して呼び出しを別のBeanに委譲しましょう。 要求/応答ディスパッチメカニズムを使用します。 JAX-RSエンドポイント内にビジネスロジックを実装する代わりに、メッセージを送信しています。 別のBeanがこのメッセージを消費し、応答 メカニズムを使用して応答が送信されます。

HTTPエンドポイントクラスでは、イベントバスを注入し、 request メソッドを使用して、イベントバスにメッセージを送信し、応答を期待します。

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/bus")
public class EventResource {

    @Inject
    EventBus bus;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().transform(Message::body);            (2)
    }
}
1 greeting アドレスに name を送信し、レスポンスを要求します。
2 レスポンスを受け取ったら、ボディを抽出してユーザーに送信
HTTP メソッドは Uni を返します。RESTEasy Reactive を使用している場合は、 Uni のサポートが組み込まれています。 従来の RESTEasyを使用している場合は、 quarkus resteasy-mutiny 拡張機能をプロジェクトに追加する必要があります。

greeting のアドレスをリッスンするコンシューマーが必要です。このコンシューマーは、同じクラスでも、次のような別のBeanでも構いません。

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }

}

このBeanは、名前を受け取り、グリーティングメッセージを返します。

これにより、 /bus/quarkus のすべてのHTTPリクエストは、イベントバスにメッセージを送信し、応答を待ち、その応答が到着すると、HTTPレスポンスを書き込みます。

Hello Quarkus

より理解しやすくするために、HTTP リクエスト/レスポンスがどのように処理されたかを詳しく見てみましょう。

  1. リクエストは、 greeting メソッドで受信されます。

  2. name を含むメッセージがイベントバスに送信される

  3. 別のBeanがこのメッセージを受信し、応答を計算します

  4. この応答は、返信メカニズムを使って送り返されます

  5. 送信者が返信を受信すると、その内容がHTTPレスポンスに書き込まれます

Bidirectional communication with browsers using SockJS

Vert.xが提供するSockJSブリッジは、ブラウザアプリケーションとQuarkusアプリケーションがイベントバスを使って通信できるようにします。双方を接続します。そのため、双方が相手側で受信したメッセージを送信することができます。3つの配信メカニズムをサポートしています。

SockJSは、Quarkusアプリケーションとブラウザの間の通信チャネルをネゴシエートします。WebSocketがサポートされている場合はそれを使用し、そうでない場合はSSEや長いポーリングなどにダウングレードします。

SockJSを使用するためには、ブリッジの設定、特に通信に使用されるアドレスの設定が必要です。

package org.acme.vertx;

import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;

@ApplicationScoped
public class SockJsExample {

    @Inject
    Vertx vertx;

    public void init(@Observes Router router) {
        SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
        sockJSHandler.bridge(new SockJSBridgeOptions()
                .addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
        router.route("/eventbus/*").handler(sockJSHandler);
    }

}

This code configures the SockJS bridge to send all the messages targeting the ticks address to the connected browsers. More detailed explanations about the configuration can be found on the Vert.x SockJS Bridge documentation.

ブラウザは、メッセージを消費するために、 vertx-eventbus JavaScriptライブラリを使用する必要があります。

<!doctype html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>SockJS example - Quarkus</title>
    <script src="https://code.jquery.com/jquery-3.3.1.min.js"
            integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
    <script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>

<h1>SockJS Examples</h1>

<p><strong>Last Tick:</strong> <span id="tick"></span></p>

</body>
<script>
    var eb = new EventBus('/eventbus');

    eb.onopen = function () {

        eb.registerHandler('ticks', function (error, message) {
            $("#tick").html(message.body);
        });
    }

</script>
</html>

ネイティブ・トランスポート

GraalVMで作られたバイナリでは、ネイティブトランスポートはサポートされていません。

Vert.x is capable of using Netty’s native transports, which offers performance improvements on specific platforms.To enable them, you must include the appropriate dependency for your platform. It’s usually a good idea to have both to keep your application platform-agnostic. Netty is smart enough to use the correct one, that includes none at all on unsupported platforms:

pom.xml
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-epoll</artifactId>
  <classifier>linux-x86_64</classifier>
</dependency>

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-kqueue</artifactId>
  <classifier>osx-x86_64</classifier>
</dependency>
build.gradle
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")

implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")

また、Vert.xでネイティブトランスポートを使用するように明示的に設定する必要があります。 application.properties に次を追加します。

quarkus.vertx.prefer-native-transport=true

あるいは、 application.yml で次のように設定します。

quarkus:
  vertx:
    prefer-native-transport: true

順調にいけばquarkusは以下のようにログ出力します。

[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true

ネイティブLinuxトランスポート

Linuxでは、以下のソケットオプションを有効にすることができます。

  • SOREUSEPORT

quarkus.http.so-reuse-port=true
  • TCP_QUICKACK

quarkus.http.tcp-quick-ack=true
  • TCP_CORK

quarkus.http.tcp-cork=true
  • TCP_FASTOPEN

quarkus.http.tcp-fast-open=true

Native macOS Transport

On macOS Sierra and above you can enable the following socket options:

  • SOREUSEPORT

quarkus.http.so-reuse-port=true

Unixドメインソケットのリスニング

Listening on a Unix domain socket allows us to dispense with the overhead of TCP if the connection to the quarkus service is established from the same host. This can happen if access to the service goes through a proxy which is often the case if you’re setting up a service mesh with a proxy like Envoy.

これは、 ネイティブ・トランスポート をサポートしているプラットフォームでのみ動作します。

適切な ネイティブ・トランスポート を有効にし、以下の環境プロパティを設定します。

quarkus.http.domain-socket=/var/run/io.quarkus.app.socket
quarkus.http.domain-socket-enabled=true

quarkus.vertx.prefer-native-transport=true

これだけでは、デフォルトで 0.0.0.0:8080 で開かれる tcp ソケットを無効にすることはできません。これは次のように明示的に無効にすることができます。

quarkus.http.host-enabled=false

これらのプロパティは、Javaの -D コマンドライン・パラメータまたは application.properties で設定できます。

Do not forget to add the native transport dependency. See ネイティブ・トランスポート for details.
Make sure your application has the right permissions to write to the socket.

読み取り専用のデプロイメント環境

ファイルシステムが読み取り専用の環境では、次のようなエラーが発生することがあります。

java.lang.IllegalStateException: Failed to create cache dir

/tmp/ が書き込み可能である場合、これは vertx.cacheDirBase プロパティが /tmp/ のディレクトリを指すように設定することで修正できます。例えば、OpenShift では環境変数 JAVA_OPTS を作成して -Dvertx.cacheDirBase=/tmp/vertx という値を設定します。