The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Vert.x リファレンスガイド

Vert.xは、リアクティブなアプリケーションを構築するためのツールキットです。 Quarkus Reactive Architectureに記載されているように、QuarkusではVert.xを下地に使用しています。

このガイドは、 QuarkusアプリケーションからEclipse Vert.x APIの使用 ガイドの姉妹編です。Quarkusで使用されるVert.xインスタンスの使用方法や設定について、より詳細に説明しています。

Vert.xインスタンスへのアクセス

マネージド Vert.x インスタンスにアクセスするには、 quarkus-vertx エクステンションをプロジェクトに追加します。 この依存関係は、プロジェクトで既に利用可能な場合があります(推移的依存関係として)。

このエクステンションでは、フィールドまたはコンストラクタ・インジェクションのいずれかを使用して、Vert.xのマネージドインスタンスを取得できます。

@ApplicationScoped
public class MyBean {

    // Field injection
    @Inject Vertx vertx;

    // Constructor injection
    MyBean(Vertx vertx) {
        // ...
    }

}

以下のどちらかを注入することができます。

  • 素の Vert.x APIを公開する io.vertx.core.Vertx インスタンス

  • Mutiny APIを公開する io.vertx.mutiny.core.Vertx インスタンス

Mutiny版は、Quarkusが提供する他のリアクティブAPIと統合されているため、Mutiny版の使用をお勧めします。

Mutiny

Mutinyに慣れていない方は、 Mutiny - 直感的なリアクティブプログラミングライブラリをご覧ください。

Vert.x Mutiny版に関するドキュメントは https://smallrye.io/smallrye-mutiny-vertx-bindings にあります。

Vert.xインスタンスの設定

application.properties ファイルから Vert.x インスタンスを設定することができます。次の表は、サポートされているプロパティの一覧です。

ビルド時に固定される構成プロパティ - 他のすべての構成プロパティは実行時にオーバーライド可能

Configuration property

デフォルト

Enables or disables the Vert.x cache.

Environment variable: QUARKUS_VERTX_CACHING

Show more

boolean

true

Enables or disabled the Vert.x classpath resource resolver.

Environment variable: QUARKUS_VERTX_CLASSPATH_RESOLVING

Show more

boolean

true

The number of event loops. By default, it matches the number of CPUs detected on the system.

Environment variable: QUARKUS_VERTX_EVENT_LOOPS_POOL_SIZE

Show more

int

The maximum amount of time the event loop can be blocked.

Environment variable: QUARKUS_VERTX_MAX_EVENT_LOOP_EXECUTE_TIME

Show more

Duration

2S

The amount of time before a warning is displayed if the event loop is blocked.

Environment variable: QUARKUS_VERTX_WARNING_EXCEPTION_TIME

Show more

Duration

2S

The maximum amount of time the worker thread can be blocked.

Environment variable: QUARKUS_VERTX_MAX_WORKER_EXECUTE_TIME

Show more

Duration

60S

The size of the internal thread pool (used for the file system).

Environment variable: QUARKUS_VERTX_INTERNAL_BLOCKING_POOL_SIZE

Show more

int

20

The queue size. For most applications this should be unbounded

Environment variable: QUARKUS_VERTX_QUEUE_SIZE

Show more

int

The executor growth resistance. A resistance factor applied after the core pool is full; values applied here will cause that fraction of submissions to create new threads when no idle thread is available. A value of 0.0f implies that threads beyond the core size should be created as aggressively as threads within it; a value of 1.0f implies that threads beyond the core size should never be created.

Environment variable: QUARKUS_VERTX_GROWTH_RESISTANCE

Show more

float

0

The amount of time a thread will stay alive with no work.

Environment variable: QUARKUS_VERTX_KEEP_ALIVE_TIME

Show more

Duration

30S

Prefill thread pool when creating a new Executor. When io.vertx.core.spi.ExecutorServiceFactory#createExecutor is called, initialise with the number of defined threads at startup

Environment variable: QUARKUS_VERTX_PREFILL

Show more

boolean

false

Enables the async DNS resolver.

Environment variable: QUARKUS_VERTX_USE_ASYNC_DNS

Show more

boolean

false

PEM Key/cert config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM

Show more

boolean

false

Comma-separated list of the path to the key files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM_KEYS

Show more

list of string

Comma-separated list of the path to the certificate files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM_CERTS

Show more

list of string

JKS config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS

Show more

boolean

false

Path of the key file (JKS format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS_PATH

Show more

string

Password of the key file.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS_PASSWORD

Show more

string

PFX config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX

Show more

boolean

false

Path to the key file (PFX format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX_PATH

Show more

string

Password of the key.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX_PASSWORD

Show more

string

PEM Trust config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PEM

Show more

boolean

false

Comma-separated list of the trust certificate files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PEM_CERTS

Show more

list of string

JKS config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS

Show more

boolean

false

Path of the key file (JKS format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS_PATH

Show more

string

Password of the key file.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS_PASSWORD

Show more

string

PFX config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX

Show more

boolean

false

Path to the key file (PFX format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX_PATH

Show more

string

Password of the key.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX_PASSWORD

Show more

string

The accept backlog.

Environment variable: QUARKUS_VERTX_EVENTBUS_ACCEPT_BACKLOG

Show more

int

The client authentication.

Environment variable: QUARKUS_VERTX_EVENTBUS_CLIENT_AUTH

Show more

string

NONE

The connect timeout.

Environment variable: QUARKUS_VERTX_EVENTBUS_CONNECT_TIMEOUT

Show more

Duration

60S

The idle timeout in milliseconds.

Environment variable: QUARKUS_VERTX_EVENTBUS_IDLE_TIMEOUT

Show more

Duration

The receive buffer size.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECEIVE_BUFFER_SIZE

Show more

int

The number of reconnection attempts.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECONNECT_ATTEMPTS

Show more

int

0

The reconnection interval in milliseconds.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECONNECT_INTERVAL

Show more

Duration

1S

Whether to reuse the address.

Environment variable: QUARKUS_VERTX_EVENTBUS_REUSE_ADDRESS

Show more

boolean

true

Whether to reuse the port.

Environment variable: QUARKUS_VERTX_EVENTBUS_REUSE_PORT

Show more

boolean

false

The send buffer size.

Environment variable: QUARKUS_VERTX_EVENTBUS_SEND_BUFFER_SIZE

Show more

int

The so linger.

Environment variable: QUARKUS_VERTX_EVENTBUS_SO_LINGER

Show more

int

Enables or Disabled SSL.

Environment variable: QUARKUS_VERTX_EVENTBUS_SSL

Show more

boolean

false

Whether to keep the TCP connection opened (keep-alive).

Environment variable: QUARKUS_VERTX_EVENTBUS_TCP_KEEP_ALIVE

Show more

boolean

false

Configure the TCP no delay.

Environment variable: QUARKUS_VERTX_EVENTBUS_TCP_NO_DELAY

Show more

boolean

true

Configure the traffic class.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRAFFIC_CLASS

Show more

int

Enables or disables the trust all parameter.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_ALL

Show more

boolean

false

The host name.

Environment variable: QUARKUS_VERTX_CLUSTER_HOST

Show more

string

localhost

The port.

Environment variable: QUARKUS_VERTX_CLUSTER_PORT

Show more

int

The public host name.

Environment variable: QUARKUS_VERTX_CLUSTER_PUBLIC_HOST

Show more

string

The public port.

Environment variable: QUARKUS_VERTX_CLUSTER_PUBLIC_PORT

Show more

int

Enables or disables the clustering.

Environment variable: QUARKUS_VERTX_CLUSTER_CLUSTERED

Show more

boolean

false

The ping interval.

Environment variable: QUARKUS_VERTX_CLUSTER_PING_INTERVAL

Show more

Duration

20S

The ping reply interval.

Environment variable: QUARKUS_VERTX_CLUSTER_PING_REPLY_INTERVAL

Show more

Duration

20S

The maximum amount of time in seconds that a successfully resolved address will be cached.

If not set explicitly, resolved addresses may be cached forever.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_MAX_TIME_TO_LIVE

Show more

int

2147483647

The minimum amount of time in seconds that a successfully resolved address will be cached.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_MIN_TIME_TO_LIVE

Show more

int

0

The amount of time in seconds that an unsuccessful attempt to resolve an address will be cached.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_NEGATIVE_TIME_TO_LIVE

Show more

int

0

The maximum number of queries to be sent during a resolution.

Environment variable: QUARKUS_VERTX_RESOLVER_MAX_QUERIES

Show more

int

4

The duration after which a DNS query is considered to be failed.

Environment variable: QUARKUS_VERTX_RESOLVER_QUERY_TIMEOUT

Show more

Duration

5S

Enable or disable native transport

Environment variable: QUARKUS_VERTX_PREFER_NATIVE_TRANSPORT

Show more

boolean

false

期間フォーマットについて

To write duration values, use the standard java.time.Duration format. See the Duration#parse() Java API documentation for more information.

数字で始まる簡略化した書式を使うこともできます:

  • 数値のみの場合は、秒単位の時間を表します。

  • 数値の後に ms が続く場合は、ミリ秒単位の時間を表します。

その他の場合は、簡略化されたフォーマットが解析のために java.time.Duration フォーマットに変換されます:

  • 数値の後に hms が続く場合は、その前に PT が付けられます。

  • 数値の後に d が続く場合は、その前に P が付けられます。

See Vert.x設定のカスタマイズ to configure the Vert.x instance using a programmatic approach.

vert.xクライアントを使用

Vert.xコアに加えて、ほとんどのVert.xエコシステムライブラリを使用することができます。いくつかのQuarkusエクステンションは、すでにVert.xライブラリをラップしています。

利用可能なAPI

次の表は、Vert.xエコシステムで 最も 使用されているライブラリの一覧です。 これらのAPIにアクセスするには、指定されたエクステンションまたは依存関係をプロジェクトに追加します。 使用方法については、関連ドキュメントを参照してください。

API

エクステンションか依存関係

ドキュメント

AMQPクライアント

io.quarkus:quarkus-smallrye-reactive-messaging-amqp (extension)

Getting Started to SmallRye Reactive Messaging with AMQP

サーキットブレーカー

io.smallrye.reactive:smallrye-mutiny-vertx-circuit-breaker (external dependency)

https://vertx.io/docs/vertx-circuit-breaker/java/

Consul クライアント

io.smallrye.reactive:smallrye-mutiny-vertx-consul-client (external dependency)

https://vertx.io/docs/vertx-consul-client/java/

DB2クライアント

io.quarkus:quarkus-reactive-db2-client (extension)

Reactive SQL Clients

Kafkaクライアント

io.quarkus:quarkus-smallrye-reactive-messaging-kafka (extension)

Apache Kafka Reference Guide

メールクライアント

io.quarkus:quarkus-mailer (extension)

Sending emails using SMTP

MQTTクライアント

io.quarkus:quarkus-smallrye-reactive-messaging-mqtt (extension)

ガイドはまだありません

MS SQLクライアント

io.quarkus:quarkus-reactive-mssql-client (extension)

Reactive SQL Clients

MySQLクライアント

io.quarkus:quarkus-reactive-mysql-client (extension)

Reactive SQL Clients

Oracleクライアント

io.quarkus:quarkus-reactive-oracle-client (extension)

Reactive SQL Clients

PostgreSQLクライアント

io.quarkus:quarkus-reactive-pg-client (extension)

Reactive SQL Clients

RabbitMQクライアント

io.smallrye.reactive:smallrye-mutiny-vertx-rabbitmq-client (external dependency)

https://vertx.io/docs/vertx-rabbitmq-client/java

Redisクライアント

io.quarkus:quarkus-redis-client (extension)

Using the Redis Client

ウェブクライアント

io.smallrye.reactive:smallrye-mutiny-vertx-web-client (external dependency)

https://vertx.io/docs/vertx-web-client/java/

Vert.x Mutiny APIの使用方法について詳しくは、 https://smallrye.io/smallrye-mutiny-vertx-bindings を参照してください。

Vert.xウェブクライアントの使用

このセクションでは、RESTEasy Reactiveアプリケーションの文脈でVert.x WebClient を使った例を紹介します。上の表にあるように、プロジェクトに以下の依存関係を追加します。

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
build.gradle
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")

これで、コードの中で、 WebClient のインスタンスを作成することが可能です。

package org.acme.vertx;


import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import io.smallrye.mutiny.Uni;

import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;

@Path("/fruit-data")
public class ResourceUsingWebClient {

    private final WebClient client;

    @Inject
    VertxResource(Vertx vertx) {
        this.client = WebClient.create(vertx);
    }

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("/{name}")
    public Uni<JsonObject> getFruitData(String name) {
        return client.getAbs("https://.../api/fruit/" + name)
                .send()
                .onItem().transform(resp -> {
                    if (resp.statusCode() == 200) {
                        return resp.bodyAsJsonObject();
                    } else {
                        return new JsonObject()
                                .put("code", resp.statusCode())
                                .put("message", resp.bodyAsString());
                    }
                });
    }

}

このリソースは、 WebClient を作成し、リクエストに応じて、このクライアントを使用してリモート HTTP API を呼び出します。結果に応じて、レスポンスは受信したまま転送されるか、エラーをラップしたJSONオブジェクトが作成されます。 WebClient は非同期(かつノンブロッキング)で、エンドポイントからは Uni が返されます。

このアプリケーションは、ネイティブ実行可能ファイルとしても実行できます。しかし、その前に、Quarkusに ssl を有効にするよう指示する必要があります(リモートAPIがHTTPSを使用している場合)。 src/main/resources/application.properties を開き、以下を追加します。

quarkus.ssl.native=true

そして、ネイティブ実行可能ファイルを作成します。

コマンドラインインタフェース
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.package.type=native

Vert.x JSONの使用

Vert.xのAPIはしばしばJSONに依存しています。Vert.xは、JSONドキュメントを操作する2つの便利なクラスを提供しています: io.vertx.core.json.JsonObject および io.vertx.core.json.JsonArray

JsonObject は、オブジェクトをJSON表現にマッピングしたり、JSONドキュメントからオブジェクトを構築するために使用できます。

// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);

// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);

なお、これらの機能は、 quarkus-jackson エクステンションで管理されているマッパーを使用しています。マッピングをカスタマイズするには Jacksonの設定 を参照してください。

JSON ObjectとJSON Arrayは、QuarkusのHTTPエンドポイントのリクエストとレスポンスボディとしてサポートされています(従来のRESTEasyとRESTEasy Reactiveを使用)。これらのエンドポイントについて考えてみましょう。

package org.acme.vertx;

import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {

    @GET
    @Path("{name}/object")
    public JsonObject jsonObject(String name) {
        return new JsonObject().put("Hello", name);
    }

    @GET
    @Path("{name}/array")
    public JsonArray jsonArray(String name) {
        return new JsonArray().add("Hello").add(name);
    }
}
{"Hello":"Quarkus"}
["Hello","Quarkus"]

これは、JSONコンテンツがリクエストボディである場合や、 Uni, Multi, CompletionStage, Publisher で包まれている場合にも同様に機能します。

Verticlesの使用

Verticles は、 _Vert.x が提供する「シンプルでスケーラブルな、アクターのようなデプロイ・並行性モデル」です。このモデルは、厳密なアクターモデルの実装を主張するものではありませんが、特に並行性、スケーリング、デプロイに関する類似性を共有しています。このモデルを使用するには、バーティクルを書いて デプロイ し、イベントバスにメッセージを送信することで通信します。

Quarkusで verticles をデプロイすることができます。次をサポートしています:

  • 生の verticle - io.vertx.core.AbstractVerticle を拡張するJavaクラス

  • Mutiny verticle - io.smallrye.mutiny.vertx.core.AbstractVerticle を拡張するJavaクラス

Verticleのデプロイ

verticleをデプロイするには、deployVerticle メソッドを使用します。

@Inject Vertx vertx;

// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });

Vert.x の Mutiny 版を使用する場合、 deployVerticle メソッドは Uni を返すため、実際のデプロイメントを行うにはサブスクリプションをトリガーする必要があることに注意してください。

アプリケーションの初期化時にVerticleをデプロイする方法については、次の例で説明します。

@ApplicationScopedのBeanをVerticleとして使用する

一般的に、Vert.xのバーティクルはCDI Beanではありません。そのため、依存性注入は使用できません。しかし、QuarkusではVerticleをBeanとしてデプロイすることができます。この場合、CDI(QuarkusではArc)がインスタンスの作成を担当することに注意してください。

次のスニペットはその例です。

package io.quarkus.vertx.verticles;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {

    @ConfigProperty(name = "address") String address;

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer(address)
                .handler(m -> m.replyAndForget("hello"))
                .completionHandler();
    }
}

vertx のインスタンスを注入する必要はなく、代わりに AbstractVerticle の protected フィールドを利用します。

そして、Verticleインスタンスをデプロイします。

package io.quarkus.vertx.verticles;

import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

@ApplicationScoped
public class VerticleDeployer {

    public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
         vertx.deployVerticle(verticle).await().indefinitely();
    }
}

露出しているすべての AbstractVerticle をデプロイしたい場合は、次のようにします。

public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
    for (AbstractVerticle verticle : verticles) {
        vertx.deployVerticle(verticle).await().indefinitely();
    }
}

複数のvirticleのインスタンスの作成

@ApplicationScoped を使用する場合、Verticleのインスタンスは1つになります。複数のVerticleのインスタンスを持つことは、それらの間で負荷を共有するのに役立ちます。各々のインスタンスは、異なるI/Oスレッド(Vert.xイベントループ)に関連付けられます。

Verticleの複数のインスタンスをデプロイするには、 @ApplicationScoped の代わりに @Dependent スコープを使用します。

package org.acme.verticle;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;

import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;

@Dependent
public class MyVerticle extends AbstractVerticle {

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer("address")
                .handler(m -> m.reply("Hello from " + this))
                .completionHandler();
    }
}

そして、verticleを次のようにデプロイします:

package org.acme.verticle;

import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

@ApplicationScoped
public class MyApp {

    void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
        vertx
                .deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
                .await().indefinitely();
    }
}

init メソッドは、 Instance<MyVerticle> を受け取ります。そして、 deployVerticle メソッドにサプライヤーを渡します。サプライヤーは get() メソッドを呼び出しているだけです。 @Dependent スコープのおかげで、呼び出すたびに新しいインスタンスが返されます。最後に、希望する数のインスタンスを DeploymentOptions に渡します。例えば、前の例では2つです。サプライヤーを2回呼び出し、Verticleの2つのインスタンスを作成します。

Event Busの使用

Vert.xには、Quarkusアプリケーションから使用できる イベントバスが組み込まれています。そのため、アプリケーションコンポーネント(CDI Bean、リソース…​)は、非同期イベントを使用して相互に作用することができ、疎結合を促進します。

イベントバスでは、 仮想アドレスメッセージ を送信します。イベントバスには、3種類の配送メカニズムが用意されています。

  • point-to-point - メッセージを送信すると、1人の消費者がそれを受信します。複数の消費者がそのアドレスを聞く場合は、ラウンドロビンが適用されます。

  • publish/subscribe - メッセージを発行し、そのアドレスを聞いているすべてのコンシューマーがメッセージを受信します。

  • request/reply - メッセージを送信し、応答を期待します。受信者は非同期的にメッセージに応答することができます。

これらの配信メカニズムはすべてノンブロッキングであり、リアクティブなアプリケーションを構築するための基本的な要素の一つとなっています。

イベントの消費

Vert.x APIを使用してコンシューマーを登録することができますが、Quarkusには宣言型のサポートがあります。イベントを消費するには、 io.quarkus.vertx.ConsumeEvent アノテーションを使用します。

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
1 設定されていない場合、アドレスはBeanの完全修飾名となります。例えば、このスニペットでは、 org.acme.vertx.GreetingService となります。
2 メソッドのパラメータはメッセージボディです。メソッドが 何か を返す場合は、それがメッセージのレスポンスとなります。

アドレスの設定

@ConsumeEvent アノテーションでアドレスを設定することができます。

@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
1 greeting アドレスに送信されたメッセージを受信します。

The address value can be a property expression. In this case, the configured value is used instead: @ConsumeEvent("${my.consumer.address}"). Additionally, the property expression can specify a default value: @ConsumeEvent("${my.consumer.address:defaultAddress}").

Config Property Example
@ConsumeEvent("${my.consumer.address}")   (1)
public String consume(String name) {
    return name.toLowerCase();
}
1 Receive the messages sent to the address configured with the my.consumer.address key.
If no config property with the specified key exists and no default value is set then the application startup fails.

イベントの非同期処理

これまでの例では、同期処理を行っています。 io.smallrye.mutiny.Uni または java.util.concurrent.CompletionStage を返却することで、非同期処理も可能です。

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
Mutiny

前の例はMutinyリアクティブ型を使用しています。Mutinyに慣れていない方は、 Mutiny - 直感的なリアクティブプログラミングライブラリをご覧ください。

イベントのブロッキング処理

デフォルトでは、イベントを消費するコードは、I/Oスレッドで呼び出されるため、 ノンブロッキング でなければなりません。処理がブロッキングの場合は、 @io.smallrye.common.annotation.Blocking アノテーションを使用してください。

@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
    // Something blocking
}

あるいは、 @ConsumeEvent アノテーションの blocking 属性を使用することもできます。

@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
    // Something blocking
}

@Blocking を使用する場合、 @ConsumeEventblocking 属性の値は無視されます。

イベントへの返信

@ConsumeEvent でアノテーションされたメソッドの 戻り 値は、受信したメッセージへの応答に使用されます。たとえば、次のスニペットでは、返された String が応答となります。

@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}

また、 Uni<T>CompletionStage<T> を返すことで、非同期応答を扱うこともできます:

@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}

コンテキスト伝搬エクステンションを使えば、 executor を注入することができます。

@Inject Executor executor;

ファイヤー・アンド・フォーゲット・インタラクションの実装

受信したメッセージに返信する必要はありません。通常、 fire and forget のインタラクションでは、メッセージは消費され、送信者はそのことを知る必要はありません。このパターンを実装するために、コンシューマー・メソッドは void を返します。

@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}

(イベントの代わりに)メッセージを消費

ペイロード を直接使用する前の例とは異なり、 Message を直接使用することもできます。

@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}

失敗のハンドリング

@ConsumeEvent でアノテーションされたメソッドが例外を発生させた場合、

  • 返信ハンドラが設定されている場合、失敗はコード ConsumeEvent#FAILURE_CODE と例外メッセージを含む io.vertx.core.eventbus.ReplyException を通じて送信者に伝えられます。

  • リプライ・ハンドラが設定されていない場合、例外は再スローされ(必要であれば RuntimeException でラップされる)、デフォルトの例外ハンドラ、 すなわち io.vertx.core.Vertx#exceptionHandler() で処理されます。

メッセージの送信

メッセージの送信と公開にはVert.x Event busを使用します。

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;                                            (1)

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().transform(Message::body);
    }
}
1 Event busの注入
2 アドレス greeting にメッセージを送信します。メッセージのペイロードは name です

EventBus オブジェクトは、以下のメソッドを提供します。

  1. send 特定のアドレスへのメッセージの送信 - 一人のコンシューマーがメッセージを受信する。

  2. publish 特定のアドレスへのメッセージの送信 - 全てのコンシューマーがメッセージを受け取る。

  3. request メッセージを送信し、返信を期待する

// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().transform(Message::body);

仮想スレッドのイベント処理

@ConsumeEvent でアノテーションされたメソッドが @RunOnVirtualThread でもアノテーションされている場合、 メソッドは仮想スレッド上で呼び出されます。 各イベントは異なる仮想スレッド上で起動されます。

この機能を使うには以下が必要です:

  1. Javaランタイムが仮想スレッドをサポートしていること。

  2. メソッドがブロッキング・シグネチャを使っていること。

2つ目のポイントは、オブジェクトまたは void を返すメソッドだけが @RunOnVirtualThread を使えるということです。 UniCompletionStage を返すメソッドは、仮想スレッドでは実行 できません

詳しくは バーチャル・スレッド・ガイド をお読みください。

コーデックの使用

The https://vertx.io/docs/vertx-core/java/event_bus[Vert.x Event Bus] uses https://vertx.io/docs/vertx-core/java/message_codecs[codecs] to _serialize and deserialize message objects. Quarkus provides a default codec for local delivery. This codec is automatically used for return types and message body parameters of local consumers, i.e. methods annotated with @ConsumeEvent whete ConsumeEvent#local() == true (which is the default).

So that you can exchange the message objects as follows:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", new MyName(name))
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello " + name.getName());
}

特定のコーデックを使用したい場合は、両サイドで明示的に設定する必要があります。

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", name,
        new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting", codec = MyNameCodec.class)            (2)
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 メッセージの送信に使用するコーデックの名前を設定
2 メッセージの受信に使用するコーデックの設定

HTTPとEvent Busの組み合わせ

グリーティングHTTPエンドポイントに戻りましょう。非同期メッセージパッシングを使用して、別のBeanへの呼出に委譲してみましょう。これは、request/replyディスパッチメカニズムを使用しています。Jakarta RESTエンドポイント内でビジネスロジックを実装する代わりに、メッセージを送信しています。別の Bean がこのメッセージを消費し、応答は reply メカニズムを使用して送信されます。

HTTPエンドポイントクラスでは、イベントバスを注入し、 request メソッドを使用して、イベントバスにメッセージを送信し、応答を期待します。

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/bus")
public class EventResource {

    @Inject
    EventBus bus;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().transform(Message::body);            (2)
    }
}
1 greeting アドレスに name を送信し、レスポンスを要求します。
2 レスポンスを受け取ったら、ボディを抽出してユーザーに送信
HTTP メソッドは Uni を返します。RESTEasy Reactive を使用している場合は、 Uni のサポートが組み込まれています。 従来の RESTEasyを使用している場合は、 quarkus resteasy-mutiny エクステンションをプロジェクトに追加する必要があります。

greeting のアドレスをリッスンするコンシューマーが必要です。このコンシューマーは、同じクラスでも、次のような別のBeanでも構いません。

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }

}

このBeanは、名前を受け取り、グリーティングメッセージを返します。

これにより、 /bus/quarkus のすべてのHTTPリクエストは、イベントバスにメッセージを送信し、応答を待ち、その応答が到着すると、HTTPレスポンスを書き込みます。

Hello Quarkus

より理解しやすくするために、HTTP リクエスト/レスポンスがどのように処理されたかを詳しく見てみましょう。

  1. リクエストは、 greeting メソッドで受信されます。

  2. name を含むメッセージがイベントバスに送信される

  3. 別のBeanがこのメッセージを受信し、応答を計算します

  4. この応答は、返信メカニズムを使って送り返されます

  5. 送信者が返信を受信すると、その内容がHTTPレスポンスに書き込まれます

SockJSによるブラウザとの双方向通信

Vert.xが提供するSockJSブリッジは、ブラウザアプリケーションとQuarkusアプリケーションがイベントバスを使って通信できるようにします。双方を接続します。そのため、双方が相手側で受信したメッセージを送信することができます。3つの配信メカニズムをサポートしています。

SockJSは、Quarkusアプリケーションとブラウザの間の通信チャネルをネゴシエートします。WebSocketがサポートされている場合はそれを使用し、そうでない場合はSSEや長いポーリングなどにダウングレードします。

SockJSを使用するためには、ブリッジの設定、特に通信に使用されるアドレスの設定が必要です。

package org.acme;

import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;

@ApplicationScoped
public class SockJsExample {

    @Inject
    Vertx vertx;

    public void init(@Observes Router router) {
        SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
        Router bridge = sockJSHandler.bridge(new SockJSBridgeOptions()
                .addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
        router.route("/eventbus/*").subRouter(bridge);

        AtomicInteger counter = new AtomicInteger();
        vertx.setPeriodic(1000,
                ignored -> vertx.eventBus().publish("ticks", counter.getAndIncrement()));
    }

}

このコードは、ticks アドレスをターゲットとするすべてのメッセージを接続されたブラウザに送信するように SockJS ブリッジを設定します。 設定に関するより詳細な説明は、 Vert.x SockJS Bridge ドキュメント に記載されています。

ブラウザは、メッセージを消費するために、 vertx-eventbus JavaScriptライブラリを使用する必要があります。

<!doctype html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>SockJS example - Quarkus</title>
    <script src="https://code.jquery.com/jquery-3.3.1.min.js"
            integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
    <script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>

<h1>SockJS Examples</h1>

<p><strong>Last Tick:</strong> <span id="tick"></span></p>

</body>
<script>
    var eb = new EventBus('/eventbus');

    eb.onopen = function () {

        eb.registerHandler('ticks', function (error, message) {
            $("#tick").html(message.body);
        });
    }

</script>
</html>

ネイティブ・トランスポートの使用

ネイティブ・トランスポートは、ネイティブ実行可能ファイルではサポートされていません。
To use io_uring, refer to the io_uringを使用 section.

Vert.xでは、 Nettyのネイティブトランスポート を使用することができ、特定のプラットフォームでパフォーマンスが向上します。 これを有効にするには、プラットフォームに適した依存関係を含める必要があります。 通常、アプリケーションをプラットフォームに依存しないようにするためには、両方用意するのがよいでしょう。 Netty は賢いので、未対応のプラットフォームには全く依存しないなど、正しい方を使用します:

pom.xml
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-epoll</artifactId>
  <classifier>linux-x86_64</classifier>
</dependency>

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-kqueue</artifactId>
  <classifier>osx-x86_64</classifier>
</dependency>
build.gradle
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")

implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")

また、Vert.xでネイティブトランスポートを使用するように明示的に設定する必要があります。 application.properties に次を追加します。

quarkus.vertx.prefer-native-transport=true

あるいは、 application.yml で次のように設定します。

quarkus:
  vertx:
    prefer-native-transport: true

順調にいけばquarkusは以下のようにログ出力します。

[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true

Linuxのネイティブ・トランスポート

Linuxでは、以下のソケットオプションを有効にすることができます。

  • SOREUSEPORT

quarkus.http.so-reuse-port=true
  • TCP_QUICKACK

quarkus.http.tcp-quick-ack=true
  • TCP_CORK

quarkus.http.tcp-cork=true
  • TCP_FASTOPEN

quarkus.http.tcp-fast-open=true

macOSのネイティブ・トランスポート

MacOS Sierra以上では、以下のソケットオプションを有効にすることができます。

  • SOREUSEPORT

quarkus.http.so-reuse-port=true

Vert.x コンテキスト対応スケジューラの使用

Some Mutiny operators need to schedule work on an executor thread pool. A good example is .onItem().delayIt().by(Duration.ofMillis(10) as it needs such an executor to delay emissions.

デフォルトのエクゼキュータは、 io.smallrye.mutiny.infrastructure.Infrastructure によって返却され、Quarkusによってすでに設定、管理されています。

That being said, there are cases where you need to make sure that an operation is run on a Vert.x (duplicated) context and not just on any random thread.

The io.smallrye.mutiny.vertx.core.ContextAwareScheduler interface offers an API to obtain context-aware schedulers. Such a scheduler is configured with:

  1. a delegate ScheduledExecutorService of your choice (hint: you can reuse Infrastructure.getDefaultWorkerPool()), and

  2. a context fetching strategy among:

    • an explicit Context, or

    • calling Vertx::getOrCreateContext() either on the current thread or later when the scheduling request happens, or

    • calling Vertx::currentContext(), which fails if the current thread is not a Vert.x thread.

Here is a sample where ContextAwareScheduler is used:

class MyVerticle extends AbstractVerticle {

    @Override
    public Uni<Void> asyncStart() {
        vertx.getOrCreateContext().put("foo", "bar");

        var delegate = Infrastructure.getDefaultWorkerPool();
        var scheduler = ContextAwareScheduler.delegatingTo(delegate)
            .withCurrentContext();

        return Uni.createFrom().voidItem()
                .onItem().delayIt().onExecutor(scheduler).by(Duration.ofMillis(10))
                .onItem().invoke(() -> {
                    // Prints "bar"
                    var ctx = vertx.getOrCreateContext();
                    System.out.println(ctx.get("foo"));
                });
    }
}

In this example a scheduler is created by capturing the context of the Vert.x event-loop that calls asyncStart(). The delayIt operator uses that scheduler, and we can check that the context that we get in invoke is a Vert.x duplicated context where the data for key "foo" has been propagated.

Unixドメインソケットの使用

Unixドメインソケットでリスニングすることにより、quarkusサービスへの接続が同じホストから確立されている場合、TCPのオーバーヘッドを省略することができます。これは、サービスへのアクセスがプロキシを経由する場合に起こります。Envoyのようなプロキシを使ってサービスメッシュを設定している場合によく見られます。

これは、 [native-transport] をサポートするプラットフォームでのみ動作します。

適切な ネイティブ・トランスポートの使用 を有効にし、以下の環境プロパティを設定します。

quarkus.http.domain-socket=/var/run/io.quarkus.app.socket
quarkus.http.domain-socket-enabled=true

quarkus.vertx.prefer-native-transport=true

これだけでは、デフォルトで 0.0.0.0:8080 で開かれる tcp ソケットを無効にすることはできません。これは次のように明示的に無効にすることができます。

quarkus.http.host-enabled=false

これらのプロパティは、Javaの -D コマンドライン・パラメータまたは application.properties で設定できます。

ネイティブのトランスポート依存関係を追加することを忘れないでください。 詳細については、ネイティブ・トランスポートの使用 を参照してください。
アプリケーションがソケットに書き込むための適切なアクセス許可を持っていることを確認してください。

io_uringを使用

io_uring はネイティブ実行可能ファイルではサポートされていません。
io_uring サポートは実験的です。

io_uring is a Linux kernel interface that allows you to send and receive data asynchronously. It provides unified semantics for both file and network I/O. It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets. It has the potential to provide modest performance benefits to network I/O on its own and greater benefits for mixed file and network I/O application workloads.

io_uring についてもっと知るには、以下のリンクをお勧めします:

  • Why you should use io_uring for network I/O: The main benefit of io_uring for network I/O is a modern asynchronous API that is straightforward to use and provides unified semantics for file and network I/O. A potential performance benefit of io_uring for network I/O is reducing the number of syscalls. This could provide the biggest benefit for high volumes of small operations where the overhead of system calls can be significant.

  • The Backend Revolution and Why io_uring Is So Important: The io_uring API uses two ring buffers for communication between application and kernel (hence the API name) and designed in a way that enables natural batching of requests and responses. Besides, it provides a way to submit multiple requests in one system call, which can reduce overhead.

  • What exactly is io_uring?: io_uring is a Linux kernel interface to efficiently allow you to send and receive data asynchronously. It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets.

To use io_uring, you need to add two dependencies to your project and enable native transport. First add the following dependencies to your project:

pom.xml
<dependency>
    <groupId>io.netty.incubator</groupId>
    <artifactId>netty-incubator-transport-native-io_uring</artifactId>
    <version>0.0.21.Final</version> <!-- Update this version (https://github.com/netty/netty-incubator-transport-io_uring/tags) -->
    <classifier>linux-x86_64</classifier>
</dependency>
<dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-io_uring-incubator</artifactId>
</dependency>
build.gradle
// Update the io_uring version by picking the latest from https://github.com/netty/netty-incubator-transport-io_uring/tags
implementation("io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.21.Final")
implementation("io.vertx:vertx-io_uring-incubator")

Then, in the application.properties, add:

quarkus.vertx.prefer-native-transport=true
Linuxマシンでio_uringは使えますか?

Linuxマシンで io_uring が使用できるかどうかを確認するには、以下のコマンドを実行してください:

> grep io_uring_setup /proc/kallsyms
0000000000000000 t __pfx_io_uring_setup
0000000000000000 t io_uring_setup
0000000000000000 T __pfx___x64_sys_io_uring_setup
0000000000000000 T __x64_sys_io_uring_setup
0000000000000000 T __pfx___ia32_sys_io_uring_setup
0000000000000000 T __ia32_sys_io_uring_setup
0000000000000000 d event_exit__io_uring_setup
0000000000000000 d event_enter__io_uring_setup
0000000000000000 d __syscall_meta__io_uring_setup
0000000000000000 d args__io_uring_setup
0000000000000000 d types__io_uring_setup
0000000000000000 d __event_exit__io_uring_setup
0000000000000000 d __event_enter__io_uring_setup
0000000000000000 d __p_syscall_meta__io_uring_setup

もし上記のように表示されたら、 io_uring を使用することが可能です。

トラブルシューティング

io_uring support is still experimental. Check the Netty io_uring FAQ if you see some odd behavior. Also, the netty io_uring was slower than epoll issue describes a few configuration mistakes.

ドメイン・ソケットはio_uringではまだサポートされていません。
Vert.xの非同期ファイルシステムAPIは、まだio_uringを使用していません。

読み取り専用環境へのデプロイ

ファイルシステムが読み取り専用の環境では、次のようなエラーが発生することがあります。

java.lang.IllegalStateException: Failed to create cache dir

/tmp/ が書き込み可能である場合、これは vertx.cacheDirBase プロパティが /tmp/ のディレクトリを指すように設定することで修正できます。例えば、OpenShift では環境変数 JAVA_OPTS を作成して -Dvertx.cacheDirBase=/tmp/vertx という値を設定します。

Vert.x設定のカスタマイズ

マネージド Vert.x インスタンスの設定は、application.properties ファイルを使用して提供できますが、special bean を使用することもできます。 io.quarkus.vertx.VertxOptionsCustomizer インターフェイスを公開する CDI Bean を使用して、Vert.x 設定をカスタマイズできます。 たとえば、次のカスタマイザは tmp ベース ディレクトリを変更します。

@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {

    @Override
    public void accept(VertxOptions options) {
        options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
    }
}

customizer Beanは、(アプリケーションの設定に由来する) VertxOptions を受け取り、それを修正することができます。