MutinyとReactiverse
よく次の質問をされます:「QuarkusでEclipse Vert.xを使うにはどうすればいいですか?」確かに、QuarkusではVert.xを使用することができます。 Verticles をデプロイしたり、イベントバスと通信したり、Vert.xエコシステムのものなら何でも使えます。しかし、QuarkusでVert.xのMutinyバリアントを使用して、Quarkusが提供する他のReactive APIとシームレスな体験を得ることもできます。これについてはすでにいくつかの記事で触れていますが、具体的なブログ記事に値する内容です。というわけで、ここで紹介します。
Eclipse Vert.x
Vert.x はリアクティブなアプリケーションを構築するためのツールキットです。Vert.x のエコシステムは膨大です。HTTP やデータアクセス機能からマイクロサービスやセキュリティー機能を介したメッセージングクライアントまで、Vert.x のエコシステムは非常に多様で汎用性に富んでいます。その多様性を理解するには、Vert Vert.xのドキュメントをチェックしてください。そのため、Vert.xはWebアプリケーション、IoTゲートウェイ、銀行アプリケーションなど、多くの分野で人気があります。
ご存知のように、QuarkusはVert.xをベースにしています。裏側では、マネージドのVert.xのインスタンスが存在し、Quarkusに力を与えています。
QuarkusがHTTPエンドポイントを提供する場合、フードの下には、リクエストとレスポンスを処理するVert.x HTTPサーバーがあります。これは、メッセージング、gRPC、およびほぼすべてのI/Oにも当てはまります。
Vert.xの「素の」APIとその仲間たち
Vert.xは複数のAPIを提供しています。ここでは「素」のものに注目してみましょう。
Vert.x のリアクティブな性質にしたがって、API のほとんどを非同期メソッドが占めています。これらのメソッドは構文規則に従っています:
public void doSomething(param1, param2, Handler<AsyncResult<T>> handler) {
// ...
}
興味深いのは最後のパラメーターです。これは関数、より正確にはコールバックで、操作が完了したときや失敗したときに呼び出されます。実際、Vert.xの非同期の性質上、 try/catch
ブロックを使用することはできません。そのため、結果に応じて呼び出される継続コールバックを渡す必要があります。
AsyncResult
は、この結果をキャプチャする構造体です。これは、操作によって生成された結果 ( <T>
型)、または失敗した場合の失敗を含みます。
例を挙げてみましょう:
vertx.fileSystem()
.readFile("my-file.txt", ar -> {
if (ar.failed()) {
System.out.println("D'oh! Cannot read the file: " + ar.cause());
} else {
System.out.println("File content is: " + ar.result());
}
});
このコードはファイルを読み込み、非同期操作であるため、ファイルが読み込まれたときにコールバックを呼び出します。 readFile
メソッドは、ファイルの完全な内容を読み込み、 バッファ に蓄積します。コールバックは、ファイルの内容 ( ar.result()
) または失敗を含む非同期の結果を受け取ります。Vert.x は、操作が完了または失敗したときにこのコールバックを呼び出します。
Vert.x は ReadStream
と WriteStream
クラスのおかげでストリームもサポートしています。 ReadStream
は読み込み可能なデータのストリームを表します。そのため、ストリームを通過するすべての項目で呼び出されるコールバックをアタッチすることができます。 WriteStream
はデータソースです。アイテムを WriteStream.
にプッシュすることができます。 これらのアイテムは ReadStream
によって消費されます。
vertx.fileSystem()
.open("my-file.txt", new OpenOptions().setRead(true), ar -> {
if (ar.failed()) {
System.out.println(
"D'oh! Cannot read the file: " + ar.cause()
);
} else {
AsyncFile file = ar.result();
// AsyncFile is a read stream, we can read from it:
file
.exceptionHandler(t ->
System.out.println("Failure while reading the file: " + t)
)
// Reads the file chunk by chunk
.handler(buffer ->
System.out.println("Received buffer: " + buffer)
);
}
});
Vert.x ストリームは、Reactive Streams を実装していません。Vert.x は別のバックプレッシャープロトコルを提供します。 |
なぜこれらの API シェーピングルールが重要なのか?Vert.xは単一のAPIを提供していません。上で紹介した「素の」APIは、提案されているAPIの1つに過ぎません。他にもKotlinでのAPIやRX JavaでのAPIなども提供されています。
これらの API は 生成され ます。Vert.x は、Vert.x の "むき出しの" API を他の API に変換するコード ジェネレーターを提供します。すべてのメソッドは整形されているので、ジェネレータはどのように適応されるべきかを理解しています。
生成されたコードは、異なる API を公開しており、各メソッドは「素」の API にデリゲートします。非同期メソッドとストリームは異なる変換を行うことができるので、生成されたAPIは適切なイディオムを使用します。
Vert.x Mutiny API
Mutiny はイベント駆動型のリアクティブプログラミングライブラリです。Vert.xとは関係ありませんが、Vert.x API用のMutiny版を生成するコードジェネレータを書いています。
<span class="image"><img src="/assets/images/posts/mutiny-vertx/mutiny.png" alt="mutiny"></span>
変換は簡単です。
-
io.vertx
パッケージ ⇒io.vertx.mutiny
パッケージ -
非同期メソッド⇒メソッドが返す
Uni<T>
-
ReadStreams<T>
⇒消費することができます。Multi<T>
-
WriteStreams<T>
⇒ リアクティブストリームとして消費することができます。Subscriber<T>
また、MutinyがReactive Streamsを実装しているため、Vert.xのバックプレッシャープロトコルをReactive Streamsに適応させています。
例えば、上から1番目の例では、次のようになります:
Uni<Buffer> uni = vertx.fileSystem().readFile("my-file.txt");
uni.subscribe()
.with(it -> System.out.println("File content is: " + it));
2つのAPIの違いの1つは、怠惰性に関連しています。Vert.x の "素の" API は、メソッドが呼び出されるとすぐに操作をトリガーします。Mutiny 版では、操作をトリガーするためにサブスクリプションを期待しています。 |
上からのストリームの例は以下のようになります。
Uni<AsyncFile> uni = vertx.fileSystem()
.open("my-file.txt", new OpenOptions().setRead(true));
uni
// Gets a Multi to read the file:
.onItem().transformToMulti(asyncFile -> asyncFile.toMulti())
// Gets the buffers one by one:
.subscribe().with(
buffer -> System.out.println("Received buffer: " + buffer)
);
もう少し補足
Mutiny版は、前のセクションで公開されたルールを適用するだけではありません。非同期メソッドに対しても提供します:
-
xAndAwait()
メソッド - 結果を受け取るまで呼び出し元のスレッドをブロックします。失敗した場合はRuntimeException
を発生させます -
xAndForget()
methods - 操作をトリガし、結果を破棄します。
// Read the content of the file in a blocking manner:
Buffer content = vertx.fileSystem().readFileAndAwait("my-file.txt");
// Open and close the file
// Closing the file is an asynchronous operation (returning a Uni).
// We trigger the operation and discard the outcome
vertx.fileSystem().open("my-file.txt", new OpenOptions().setRead(true))
.subscribe().with(file -> file.closeAndForget());
このAPIはどこにありますか?
執筆時点では、Vert.xコアとVert.xクライアント(MongoDB、Redis、Webクライアント、Mqttなど)のみを提供しています。私たちはVert.x スタック全体をカバーするようにサポートを拡張しているところです。
Mutinyクライアントを使用するには、適切な依存関係をプロジェクトに追加する必要があります。 依存関係のリストを見て、必要なものを選んでください。
例えば、Vert.x Web クライアントの Mutiny 版を使用するには、以下の依存関係を追加します:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
<version>...</version>
</dependency>
依存関係ができたら、あとはWebクライアントのインスタンスを作成するだけです:
@Inject Vertx vertx; // Inject the managed io.vertx.mutiny.core.Vertx instance
private WebClient client;
@PostConstruct
public void init() {
client = WebClient.create(vertx, new WebClientOptions()
.setDefaultHost("localhost")
.setDefaultPort(8082)
);
}
private Uni<String> call(String path) {
return client
.get(path).send()
.onItem().transform(HttpResponse::bodyAsString);
}
何か足りないものがあった場合は、 SmallRye Reactive Utils に課題を提起してください。 |
Javadocは こちら にあります。 |