The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

ストリームを覗く

Mutiny はイベント駆動型のリアクティブプログラミングライブラリです。他のリアクティブ・プログラミング・ライブラリと同様に、主要な構成要素としてストリームを使用します。これらのストリームはイベントを伝え、あなたのコードはこれらのイベントを処理します。ほとんどの場合、あなたのコードはアイテムイベントと失敗イベントにしか興味がありません。しかし、キャンセル、リクエスト、完了などの他の種類のイベントもあります。

何が起こっているのかをよりよく理解したり、特定の副作用を実装したりするために、これらの様々なイベントを見る必要があることは稀ではありません。例えば、完了イベントの後にリソースを閉じたり、失敗やキャンセル時にメッセージをログに記録したりする必要があるかもしれません。

イベントの種類ごとに、その特定のイベントを処理するための方法を提供する関連 グループ があります: onItem(), onFailure(), onCompletion() など。これらのグループは、配信に影響を与えずに様々なイベントを覗くための 2 つの方法を提供しています: invokecall 。これらのグループは、受信したイベントを変換するのではなく、何かが起こったことを通知して反応させてくれます。

invoke メソッド

invoke メソッドは同期で、何も戻りません。Mutinyは、観測されたストリームがイベントをディスパッチするときに、設定されたコールバックを呼び出します。

Uni<Integer> u = uni.onItem()
    .invoke(i -> System.out.println("Received item: " + i));
Multi<Integer> m = multi.onItem()
    .invoke(i -> System.out.println("Received item: " + i));

上で述べたように、 invoke は同期です。Mutinyはコールバックを呼び出し、コールバックが戻ってきたらイベントを下流に伝播させます。ディスパッチをブロックします。

invoke

もちろん、ブロックしないことを強くお勧めします。

次のスニペットは、さまざまな種類のイベントをログに記録する方法を示しています。

multi
  .onSubscribe().invoke(() -> System.out.println("⬇️ Subscribed"))
  .onItem().invoke(i -> System.out.println("⬇️ Received item: " + i))
  .onFailure().invoke(f -> System.out.println("⬇️ Failed with " + f))
  .onCompletion().invoke(() -> System.out.println("⬇️ Completed"))
  .onCancellation().invoke(() -> System.out.println("⬆️ Cancelled"))
  .onRequest().invoke(l -> System.out.println("⬆️ Requested: " + l))
前のコードスニペットの矢印は、イベントが上流(ソース)から来るのか、下流(コンシューマ)から来るのかを示しています。

invoke メソッドは、1 つのケースを除いて、イベントを変更しません。 invoke コールバックが例外をスローした場合、ダウンストリームは実際のイベントを取得せず、代わりに失敗イベントを取得します。

失敗イベントを観測しているときに、コールバックが例外を投げると、Mutinyは元の失敗とコールバックの失敗を集約した CompositeException を伝播します。

呼び出し方法

invoke とは異なり、 call は非同期で、コールバックは Uni<?> オブジェクトを返します。

call リソースを閉じるなど、非同期の副作用を実装する必要がある場合によく使われます。

call

Mutinyは、コールバックによって返されたUniがアイテムを放出するまで、元のイベントを下流にディスパッチしません。

multi
    .onItem().call(i ->
        Uni.createFrom().nullItem()
            .onItem().delayIt().by(Duration.ofSeconds(1))
    )

As shown in the previous snippet, you can use this approach to delay items. But the primary use case is about completing asynchronous actions:

multi
    .onCompletion().call(() -> resource.close())

Under the hood, Mutiny gets the Uni (by invoking the callback) and subscribes to it. It observes the item or failure event from that Uni. It discards the item value as only the emission matters in this case.

If the callback throws an exception or the produced Uni produces a failure, Mutiny propagates that failure (or a CompositeException) downstream, replacing the original event.

まとめ

The invoke and call methods are handy when you need to observe a stream without changing the transiting events.

Use invoke for implementing synchronous side-effects or logging events. The asynchronous nature of call makes it perfect for implementing asynchronous side-effects, such as closing resources, flushing data, delay items, and so on.

The following table highlights the key differences:

メソッド

invoke

call

種類

同期

非同期

戻り値の型

void

Uni<?>

主なユースケース

ロギング

リソースのクローズ、データのフラッシュ

これらのメソッドは、関連するグループ内のあらゆる種類のイベントに対して利用可能です。