The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Mutinyによる非同期入門

Mutinyは、直感的なリアクティブプログラミングライブラリです。Mutinyは、Quarkusでリアクティブアプリケーションを書くための主要なモデルです。

イベントドリブンなリアクティブプログラミングAPI

Mutinyは、他のリアクティブ・プログラミング・ライブラリとは大きく異なります。プログラムを設計するために異なるアプローチをとります。Mutinyでは、すべてがイベントドリブンです:イベントを受け取り、それに反応します。このイベント駆動型の側面は、分散システムの非同期性を受け入れ、継続性を表現するエレガントで正確な方法を提供します。

Mutinyには、2つ型を提供します。両方ともイベントドリブンであり、遅延評価されます。

  • Uni は、単一のイベント(アイテムまたは失敗)を発します。Unisは、0または1の結果を返す非同期アクションを表現するのに便利です。例えば、メッセージ・ブローカーのキューにメッセージを送信した結果などがその例です。

  • Multi は複数のイベント(n個のアイテム、1つの失敗または1つの完了)を放出します。Multisはアイテムのストリームを表現することができ、無限に広がる可能性があります。例えば、メッセージ・ブローカのキューからメッセージを受信するのが良い例です。

この2つのタイプは、あらゆるタイプのインタラクションを表現することができます。これらはイベントソースです。ユーザーはこれらを監視し( サブスクリプション )、アイテムの出力、失敗、またはバインドされたMultiの場合は完了イベントが発行されると通知を受け取ります。あなた(サブスクライバー)がイベントを受け取ると、それを処理することができます(例えば、変換したり、フィルタリングしたり)。Mutinyでは、 _onX().action()_のようなコードを書くことになりますが、これは「アイテムXの場合actionを実施」と読み替えられます。

Mutinyとそのコンセプトについてもっと知りたい方は、 Mutinyのリファレンスドキュメント を参照してください。

QuarkusでのMutiny

Mutinyは、Quarkusのリアクティブ機能を扱う際の主要なAPIです。 つまり、ほとんどのエクステンションは、UniやMultisを返すAPI(リアクティブデータソースやレストクライアントなど)を公開するか、メソッドがUniやMultiを返す場合(Quarkus REST(旧RESTEasy Reactive)やReactive Messagingなど)を理解することで、Mutinyをサポートしています。

これらの統合により、MutinyはQuarkusで開発されたすべてのリアクティブアプリケーションのための主要で凝集性のあるモデルとなっています。さらに、Mutinyのアーキテクチャでは、きめ細かなデッドコードの排除が可能で、ネイティブにコンパイルした場合(QuarkusのネイティブモードやGraalVMのネイティブイメージコンパイラなど)、メモリ使用量が改善されます。

なぜ別のリアクティブプログラミングAPIなのか?

熟練したリアクティブ開発者は、既存のリアクティブプログラミングAPIがあるにもかかわらず、Quarkusがまた新たなリアクティブプログラミングAPIを導入したことを不思議に思うかもしれません。しかし、Mutinyはまったく異なる角度からアプローチしています。

イベントドリブン - Mutinyはイベントを設計の中核に据えています。Mutinyでは、イベントを観測し、それに反応し、エレガントで読みやすい処理パイプラインを作成します。関数型プログラミングの博士号は必要ありません。

Navigable - インテリジェントなコード補完機能があっても、何百ものメソッドを持つクラスは混乱します。Mutinyは、必要な演算子に向かってあなたを導く、ナビゲーブルで明示的なAPIを提供します。

ノンブロッキングI/O - Mutinyは、ノンブロッキングI/O( Quarkusの動力源)を備えたアプリケーションの非同期性を制御するための完璧なコンパニオンです。宣言的にオペレーションを構成し、データを変換し、進行を強制し、障害から回復するなどの機能があります。

非同期の世界のために作られた - Mutinyは、イベントドリブンなマイクロサービス、メッセージベースのアプリケーション、ネットワークユーティリティ、データストリーム処理、そしてもちろんリアクティブなアプリケーションなど、あらゆる非同期アプリケーションで使用することができます。

Reactive Streamsとコンバータ内蔵 - Mutinyは Reactive Streamsの仕様に基づいているため、他のリアクティブプログラミングライブラリと統合することができます。さらに、他のポピュラーなライブラリと対話するためのコンバータも提供しています。

Mutiny とI/Oスレッド

Quarkusは リアクティブエンジンを搭載しており、リアクティブアプリケーションを開発する際には、数少ないI/Oスレッドの1つでコードが実行されます。覚えておいてほしいのは、これらのスレッドを絶対にブロックしてはいけないということで、ブロックしてしまうとモデルが崩壊してしまいます。つまり、ブロッキングI/Oは使えないのです。代わりに、I/O操作をスケジュールして、継続を渡す必要があります。

Reactive Execution Model and I/O Threads

Mutinyのイベントドリブンパラダイムは、これに合わせて作られています。I/O操作が正常に完了すると、それを表現するUniはアイテムイベントをエミットします。それが失敗すると、それは失敗イベントをエミットします。継続は、イベントドリブンなAPIを使ってシンプルかつ自然に表現されます。

事例で見るMutiny

多くのQuarkusエクステンションがMutiny APIを公開しています。このセクションではMongoDBエクステンションを使用してMutinyの使用方法を説明します。

周期表の元素を表す簡単な構造を想像してみましょう。

public class Element {

   public String name;
   public String symbol;
   public int position;

   public Element(String name, String symbol, int position) {
       this.name = name;
       this.symbol = symbol;
       this.position = position;
   }

   public Element() {
        // Use by JSON mappers
   }
}

この構造体には、元素の名前、シンボル、位置が入っています。元素を取得して Mongo コレクションに格納するには、次のようなコードを使います。

@ApplicationScoped
public class ElementService {

   final ReactiveMongoCollection<Element> collection;

   @Inject
   ElementService(ReactiveMongoClient client) {
       collection = client.getDatabase("quarkus")
               .getCollection("elements", Element.class);
   }

   public void add(Element element) {
       Uni<InsertOneResult> insertion = collection.insertOne(element);
       insertion
           .onItem().transform(r -> r.getInsertedId().asString())
           .subscribe().with(
               result -> System.out.println("inserted " + result),
               failure -> System.out.println("D'oh" + failure));
   }

   public void getAll() {
       collection.find()
           .subscribe().with(
              element -> System.out.println("Element: " + element),
             failure -> System.out.println("D'oh! " + failure),
             () -> System.out.println("No more elements")
       );
   }

}

まず、Mongo のクライアントをインジェクションします。リアクティブ版 ( io.quarkus.mongodb.reactive.ReactiveMongoClient) を使っていることに注意しましょう。initialize メソッドでは、要素を挿入するためのコレクションを取得して保存します。

add メソッドは、コレクションに要素を挿入します。このメソッドはパラメータとして要素を受け取り、コレクションのリアクティブAPIを使用します。データベースとのやりとりにはI/Oが伴います。リアクティブな原則では、インタラクションが完了するのを待つ間のブロッキングを禁じています。代わりに、操作をスケジューリングし、継続を渡します。 insertOne メソッドはUniを返す、つまり非同期の操作です。これがスケジュールされたI/Oです。 .onItem() .onItem() では、観測されたUniがアイテムを放出したときに何が起こるべきかを設定することができます。この例では、スケジュールされたI/Oが完了したときです。この例では、挿入されたドキュメントIDを抽出しています。最後のステップはサブスクリプションです。これがなければ何も起こりません。購読することで、操作が開始されます。サブスクリプション・メソッドは、成功時には id の値を、挿入が失敗したときには失敗を返すというハンドラーを定義することもできます。

次に、2つ目のメソッドを見てみましょう。これは、格納されているすべての要素を取得します。この場合、複数のアイテム(格納されている要素ごとに1つ)を返しますので、 Multi を使用しています。挿入については、保存されている要素の取得にはI/Oが必要です。 find が私たちの操作です。Uniに関しては、オペレーションをトリガーするためにサブスクライブする必要があります。サブスクライバーは、アイテムイベント、失敗イベント、またはすべての要素を受信したときの完了イベントを受け取ります。

UniまたはMultiへのサブスクライブは不可欠です。無しには操作を実行できません。 Quarkusでは、サブスクリプションを処理するエクステンションがあります。 例えば、Quarkus RESTでは、HTTPメソッドがUniまたはMultiを返すことができ、Quarkus RESTがサブスクリプションを処理します。

Mutinyパターン

前節の例は、あえて単純化したものでした。ここでは、いくつかの共通するパターンを見てみましょう。

イベントの観察

次を利用して様々な種類のイベントを観察することができます:

on{event}().invoke(ev → System.out.println(ev));

例えば、アイテム使用の場合、次を使用して下さい: onItem().invoke(item → …​);

失敗の場合は、次を使用してください: onFailure().invoke(failure → …​)

invoke メソッドは同期型です。時には、非同期のアクションを実行する必要があります。このような場合は call を使用して、 onItem().call(item → someAsyncAction(item)) のようにします。なお、 call はアイテムを変更するのではなく、非同期アクションを呼び出すだけで、このアクションが完了すると、元のアイテムを下流に放出します。

アイテムのトランスフォーム

最初の計測パターンは、受け取ったアイテムイベントを変換することで構成されています。前節で見たように、挿入が成功したことを示したり、データベースに格納されている要素を示したりします。

アイテムの変換には onItem().transform(item → …​.) を使用します。

変換の詳細については、 Mutinyのドキュメント を参照してください。

シーケンシャル合成

シーケンシャル合成では、依存性のある非同期操作を連鎖させることができます。これは onItem().transformToUni(item → …​) を使って実現しています。 transform とは異なり、 transformToUni に渡される関数は Uni を返します。

Uni<String> uni1 = …
uni1
.onItem().transformToUni(item -> anotherAsynchronousAction(item));

非同期変換の詳細については、 Mutinyのドキュメント を参照してください。

エラーハンドリング

これまではアイテムのイベントだけを処理してきましたが、失敗の処理は必須です。 onFailure() を使って失敗を処理することができます。

例えば、 onFailure().recoverWithItem(fallback) を使ったフォールバックアイテムでリカバリーできます。

Uni<String> uni1 = …
uni1
.onFailure().recoverWithItem(“my fallback value”);

以下のようにして操作をリトライすることもできます。

Uni<String> uni1 = …
uni1
.onFailure().retry().atMost(5);

失敗のリカバリーについての詳細は、 失敗の処理のドキュメント および リトライのドキュメント を参照してください。

イベント、アクション

以下の表は、UniとMultiで受信できるイベントの一覧です。それぞれのイベントは、独自のグループ(onX)に関連付けられています。2つ目の表は、イベントを受けてできる古典的なアクションの一覧です。グループによってはより多くの可能性があることに注意してください。

上流からのイベント 下流からのイベント

Uni

Subscription (1), Item (0..1), failure (0..1)

Cancellation

Multi

Subscription (1), Item (0..n), failure (0..1), completion (0..1)

Cancellation, Request

イベントの全リストは、 イベントのドキュメントで確認してください。

アクション API コメント

transform

onItem().transform(Function<I, O> function);

同期関数を使って、イベントを別のイベントに変換します。 ダウンストリームは、関数の結果(変換に失敗した場合は失敗)を受け取ります。

transformToUni

onItem().transformToUni(Function<I, Uni<O>> function);

非同期関数を使って、イベントを別のイベントに変換する。下流側は、生成されたUniが発するアイテム(変換に失敗した場合は失敗)を受け取ります。生成されたUniが失敗を発した場合、その失敗は下流に渡されます。

invoke

onItem().invoke(Consumer<I> consumer)

同期型コンシューマーを呼び出します。これは特に副作用のあるアクションを実行するのに便利です。ダウンストリームは元のイベントを受け取りますが、コンシューマが例外を投げた場合は失敗となります。

call

onItem().call(Function<I, Uni<?>>)

非同期関数を呼び出します。これは、非同期の副作用のあるアクションを実行する際に特に便利です。ダウンストリームは、元のイベントを受け取りますが、コンシューマーが例外を投げた場合や生成されたUniが失敗を発した場合は失敗となります。

fail

onItem().failWith(Function<I, Throwable>)

イベントを受信したときに失敗を出力します。

complete (Multi のみ)

onItem().complete()

イベントを受信すると完了イベントを発行します。

その他のパターン

Mutinyには他にもたくさんの機能があります。Mutinyの ドキュメントでは、イベントの全リストとその処理方法など、さらに多くの例を見ることができます。

よくある質問として、以下のようなガイドがあります。

ショートカット

Uniを使うとき、 onItem() を書かなければならないのは面倒なことです。幸いなことに、Mutinyにはコードをより簡潔にするための一連のショートカットが用意されています。

ショートカット 等価表現

uni.map(x → y)

uni.onItem().transform(x → y)

uni.flatMap(x → uni2)

uni.onItem().transformToUni(x → uni2)

uni.chain(x → uni2)

uni.onItem().transformToUni(x → uni2)

uni.invoke(x → System.out.println(x))

uni.onItem().invoke(x → System.out.println(x))

uni.call(x → uni2)

uni.onItem().call(x → uni2)

uni.eventually(() → System.out.println("eventually"))

uni.onItemOrFailure().invoke((ignoredItem, ignoredException) → System.out.println("eventually"))

uni.eventually(() → uni2)

uni.onItemOrFailure().call((ignoredItem, ignoredException) → uni2)

uni.replaceWith(x)

uni.onItem().transform(ignored → x)

uni.replaceWith(uni2)

uni.onItem().transformToUni(ignored → uni2)

uni.replaceIfNullWith(x)

uni.onItem().ifNull().continueWith(x)

Reactive Streams

Mutinyは Reactive Streamsを使用しています。 MultiPublisher を実装し、back-pressureプロトコルを実施しています。エミッションは、下流のサブスクライバから発せられるリクエストによって制約されます。したがって、サブスクライバーに過負荷をかけることはありません。なお、場合によっては、このプロトコルに従えないこともあります(Multiが時間や測定センサーなどの制御できないイベントを放出するため)。この場合、Mutinyは、 onOverflow() を使ってオーバーフローを制御する方法を提供します。

Uni は、Reactive Streams Publisher を実装していません。 Uni は1つのイベントしか発することができないので、 Uni をサブスクライブすることで、結果を使用する意思を表明することができ、リクエストプロトコルのセレモニーは必要ありません。

MutinyとVert.x

Vert.xは、リアクティブなアプリケーションやシステムを構築するためのツールキットです。Vert.xは、リアクティブな原則(ノンブロッキングおよび非同期)に従ったライブラリの巨大なエコシステムを提供します。Vert.xは、リアクティブ機能を提供するため、Quarkusの重要な部分となっています。

さらに、Vert.x API全体をQuarkusで使用することができます。まとまりのある体験を提供するために、Vert.x APIはMutinyのバリアントを使用して、つまりUniとMultiを返すこともできます。

このAPIの詳細については、 https://quarkus.io/blog/mutiny-vertx/ を参照してください。

QuarkusにおけるMutiny インテグレーション

QuarkusにおけるMutinyのインテグレーションは、単なるライブラリにとどまりません。Mutinyは、QuarkusとMutinyを密接に統合するためのフックを公開しています。

  • I/O スレッドで実行している場合は awaittoIterable を呼び出すと失敗し、I/O スレッドのブロックを防ぎます。

  • log() オペレーターは、Quarkus のロガーを使用しています。

  • Mutinyのデフォルトのスレッドプールは、Quarkusのワーカースレッドプールです。

  • Mutiny UniおよびMultiを使用する場合、Context伝搬はデフォルトで有効です。

インフラ統合の詳細については、 https://smallrye.io/smallrye-mutiny/latest/guides/framework-integration/ を参照してください。

関連コンテンツ