The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

MutinyとReactiverse

よく次の質問をされます:「QuarkusでEclipse Vert.xを使うにはどうすればいいですか?」確かに、QuarkusではVert.xを使用することができます。 Verticles をデプロイしたり、イベントバスと通信したり、Vert.xエコシステムのものなら何でも使えます。しかし、QuarkusでVert.xのMutinyバリアントを使用して、Quarkusが提供する他のReactive APIとシームレスな体験を得ることもできます。これについてはすでにいくつかの記事で触れていますが、具体的なブログ記事に値する内容です。というわけで、ここで紹介します。

Eclipse Vert.x

Vert.x はリアクティブなアプリケーションを構築するためのツールキットです。Vert.x のエコシステムは膨大です。HTTP やデータアクセス機能からマイクロサービスやセキュリティー機能を介したメッセージングクライアントまで、Vert.x のエコシステムは非常に多様で汎用性に富んでいます。その多様性を理解するには、Vert Vert.xのドキュメントをチェックしてください。そのため、Vert.xはWebアプリケーション、IoTゲートウェイ、銀行アプリケーションなど、多くの分野で人気があります。

ご存知のように、QuarkusはVert.xをベースにしています。裏側では、マネージドのVert.xのインスタンスが存在し、Quarkusに力を与えています。

architecture

QuarkusがHTTPエンドポイントを提供する場合、フードの下には、リクエストとレスポンスを処理するVert.x HTTPサーバーがあります。これは、メッセージング、gRPC、およびほぼすべてのI/Oにも当てはまります。

Vert.xの「素の」APIとその仲間たち

Vert.xは複数のAPIを提供しています。ここでは「素」のものに注目してみましょう。

Vert.x のリアクティブな性質にしたがって、API のほとんどを非同期メソッドが占めています。これらのメソッドは構文規則に従っています:

public void doSomething(param1, param2, Handler<AsyncResult<T>> handler) {
    // ...
}

興味深いのは最後のパラメーターです。これは関数、より正確にはコールバックで、操作が完了したときや失敗したときに呼び出されます。実際、Vert.xの非同期の性質上、 try/catch ブロックを使用することはできません。そのため、結果に応じて呼び出される継続コールバックを渡す必要があります。

AsyncResult は、この結果をキャプチャする構造体です。これは、操作によって生成された結果 ( <T> 型)、または失敗した場合の失敗を含みます。

例を挙げてみましょう:

vertx.fileSystem()
    .readFile("my-file.txt", ar -> {
        if (ar.failed()) {
            System.out.println("D'oh! Cannot read the file: " + ar.cause());
        } else {
            System.out.println("File content is: " + ar.result());
        }
    });

このコードはファイルを読み込み、非同期操作であるため、ファイルが読み込まれたときにコールバックを呼び出します。 readFile メソッドは、ファイルの完全な内容を読み込み、 バッファ に蓄積します。コールバックは、ファイルの内容 ( ar.result() ) または失敗を含む非同期の結果を受け取ります。Vert.x は、操作が完了または失敗したときにこのコールバックを呼び出します。

Vert.x は ReadStreamWriteStream クラスのおかげでストリームもサポートしています。 ReadStream は読み込み可能なデータのストリームを表します。そのため、ストリームを通過するすべての項目で呼び出されるコールバックをアタッチすることができます。 WriteStream はデータソースです。アイテムを WriteStream. にプッシュすることができます。 これらのアイテムは ReadStream によって消費されます。

vertx.fileSystem()
    .open("my-file.txt", new OpenOptions().setRead(true), ar -> {
        if (ar.failed()) {
            System.out.println(
                    "D'oh! Cannot read the file: " + ar.cause()
            );
        } else {
            AsyncFile file = ar.result();
            // AsyncFile is a read stream, we can read from it:
            file
                    .exceptionHandler(t ->
                        System.out.println("Failure while reading the file: " + t)
                    )
                    // Reads the file chunk by chunk
                    .handler(buffer ->
                        System.out.println("Received buffer: " + buffer)
                    );
        }
    });
Vert.x ストリームは、Reactive Streams を実装していません。Vert.x は別のバックプレッシャープロトコルを提供します。

なぜこれらの API シェーピングルールが重要なのか?Vert.xは単一のAPIを提供していません。上で紹介した「素の」APIは、提案されているAPIの1つに過ぎません。他にもKotlinでのAPIやRX JavaでのAPIなども提供されています。

これらの API は 生成され ます。Vert.x は、Vert.x の "むき出しの" API を他の API に変換するコード ジェネレーターを提供します。すべてのメソッドは整形されているので、ジェネレータはどのように適応されるべきかを理解しています。

generation

生成されたコードは、異なる API を公開しており、各メソッドは「素」の API にデリゲートします。非同期メソッドとストリームは異なる変換を行うことができるので、生成されたAPIは適切なイディオムを使用します。

Vert.x Mutiny API

Mutiny はイベント駆動型のリアクティブプログラミングライブラリです。Vert.xとは関係ありませんが、Vert.x API用のMutiny版を生成するコードジェネレータを書いています。

<span class="image"><img src="/assets/images/posts/mutiny-vertx/mutiny.png" alt="mutiny"></span>

変換は簡単です。

  • io.vertx パッケージ ⇒ io.vertx.mutiny パッケージ

  • 非同期メソッド⇒メソッドが返す Uni<T>

  • ReadStreams<T> ⇒消費することができます。 Multi<T>

  • WriteStreams<T> ⇒ リアクティブストリームとして消費することができます。 Subscriber<T>

また、MutinyがReactive Streamsを実装しているため、Vert.xのバックプレッシャープロトコルをReactive Streamsに適応させています。

例えば、上から1番目の例では、次のようになります:

Uni<Buffer> uni = vertx.fileSystem().readFile("my-file.txt");
uni.subscribe()
  .with(it -> System.out.println("File content is: " + it));
2つのAPIの違いの1つは、怠惰性に関連しています。Vert.x の "素の" API は、メソッドが呼び出されるとすぐに操作をトリガーします。Mutiny 版では、操作をトリガーするためにサブスクリプションを期待しています。

上からのストリームの例は以下のようになります。

Uni<AsyncFile> uni = vertx.fileSystem()
        .open("my-file.txt", new OpenOptions().setRead(true));
uni
    // Gets a Multi to read the file:
    .onItem().transformToMulti(asyncFile -> asyncFile.toMulti())
    // Gets the buffers one by one:
    .subscribe().with(
       buffer -> System.out.println("Received buffer: " + buffer)
);

もう少し補足

Mutiny版は、前のセクションで公開されたルールを適用するだけではありません。非同期メソッドに対しても提供します:

  • xAndAwait() メソッド - 結果を受け取るまで呼び出し元のスレッドをブロックします。失敗した場合は RuntimeException を発生させます

  • xAndForget() methods - 操作をトリガし、結果を破棄します。

// Read the content of the file in a blocking manner:
Buffer content   = vertx.fileSystem().readFileAndAwait("my-file.txt");

// Open and close the file
// Closing the file is an asynchronous operation (returning a Uni).
// We trigger the operation and discard the outcome
vertx.fileSystem().open("my-file.txt", new OpenOptions().setRead(true))
    .subscribe().with(file -> file.closeAndForget());

このAPIはどこにありますか?

執筆時点では、Vert.xコアとVert.xクライアント(MongoDB、Redis、Webクライアント、Mqttなど)のみを提供しています。私たちはVert.x スタック全体をカバーするようにサポートを拡張しているところです。

Mutinyクライアントを使用するには、適切な依存関係をプロジェクトに追加する必要があります。 依存関係のリストを見て、必要なものを選んでください。

例えば、Vert.x Web クライアントの Mutiny 版を使用するには、以下の依存関係を追加します:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-mutiny-vertx-web-client</artifactId>
  <version>...</version>
</dependency>

依存関係ができたら、あとはWebクライアントのインスタンスを作成するだけです:

@Inject Vertx vertx; // Inject the managed io.vertx.mutiny.core.Vertx instance

private WebClient client;

@PostConstruct
public void init() {
  client = WebClient.create(vertx, new WebClientOptions()
    .setDefaultHost("localhost")
    .setDefaultPort(8082)
  );
}

private Uni<String> call(String path) {
  return client
    .get(path).send()
    .onItem().transform(HttpResponse::bodyAsString);
}
何か足りないものがあった場合は、 SmallRye Reactive Utils に課題を提起してください。
Javadocは こちら にあります。

今後の道:Vert.x 4!

Vert.x 4が間もなく登場します!既にQuarkusと異なるサテライトプロジェクトで移行作業を行っています。Vert.x 4では、新しいジェネレータが実装されており(同じコード生成アプローチを踏襲しています)、スムーズなアップグレードへの道が開かれています。