RESTEasy Reactive - ブロックするか、しないべきか、それが問題だ

2021年1月、Quarkusチームは、QuarkusでHTTP APIを提供するための斬新な方法であるRESTEasy Reactiveを発表しました。この発表以来、RESTEasy Reactiveの採用は非常に好調で、まもなくHTTP APIを実装するデフォルトの方法にする予定です。

しかし、ちょっと待ってください。私が使っている命令型APIはどうなるのでしょうか?今のQuarkusを使うにはリアクティブプログラミングを学ぶ必要があるのでしょうか?はっきりさせておきましょう: いいえ。このブログ記事では、スムーズで透明性の高い移行を実現するためにRESTEasy reactiveに加えたいくつかの変更点をご紹介します。

QuarkusにおけるHTTP APIの簡単な歴史

Quarkusは、その誕生以来、HTTP APIを提供することができました。 RESTEasyの搭載は、Quarkusの最初のベータリリースの主要なマイルストーンでした。RESTEasy classic では、 @GET, @Path, @POST…​のようなよく知られたJAX-RSアノテーションを使用してHTTP APIを開発します。

package org.acme;

import javax.ws.rs.GET;
import javax.ws.rs.Path;

@Path("/hello")
public class GreetingResource {

   @GET
   public String hello() {
       return "Hello";
   }
}

RESTEasy classic は、HTTP リクエストに関連付けられたワーカースレッド上で、HTTP エンドポイント(前のスニペットの hello メソッド)を呼び出します。これはよく理解されているモデルで、簡単に理解できます。しかし、ワーカースレッドに依存することで、同時実行の制限が発生します。

Quarkusの中核にリアクティブを導入しても、RESTEasy _classic_はこのディスパッチ戦略を維持していました。このことが、Quarkusのエコシステムを分断していました。一方では、RESTEasy _classic_やHibernate ORMを使用する命令型の陣営があり、もう一方では、Reactive Routes、Vert.x API、その他のリアクティブなエクステンションを使用するリアクティブ型の陣営がありました。どちらもフードの下ではQuarkusのリアクティブエンジンを使用していましたが、リアクティブ派はより効率的な方法で使用していました。

命令型と反応型の考え方の統一を受けて、Quarkus 1.11では、RESTEasy reactive を導入しました。これは、Quarkusのリアクティブアーキテクチャの上にJAX-RSモデルを実装した新しいものです。これは、同様の開発モデルとはるかに優れたスループットを提供します。RESTEasy _reactive_のアーキテクチャと利点については、ここでは詳しく説明しません。Georgiosは2つの投稿でそれらをカバーしています。 RESTEasy Reactive introductionMassive performance without headachesです。

ユーザーの視点から見ると、RESTEasyの classic_と _reactive の主な違いは、HTTPエンドポイントのメソッドの呼び出し方です。

  • classic- 常にワーカースレッドで動作します。

  • reactive - I/Oスレッドまたはワーカースレッド(そして開発者であるあなたが選択可能)

何故そんなに重要なのかと思われるかもしれません。スレッドは高価です。特にコンテナやクラウド上ではリソースが限られています。I/O スレッドを使用すると、追加のスレッドを作成せずに済み(メモリ消費量の改善)、コンテキストスイッチを回避できます(応答時間の改善)。Emmanuelは、 A IO thread and a worker thread walk into a bar: a microbenchmark storyのブログ記事でそのメリットを説明しています。

ブロックするかしないか、それが問題だ。

RESTEasy reactive を導入したとき、デフォルトでノンブロッキングのアプローチを使用することにしました。特に明記されていない場合は、I/OスレッドでHTTPエンドポイント・メソッドを呼び出します。このモデルは優れたパフォーマンスをもたらし、 @Blocking アノテーションの使用により十分にシンプルなものとなりました。

ここ数ヶ月、RESTEasy _reactive_の採用は信じられないほど進んでいます。多くの質問や、当然ながらバグレポートも寄せられています。中心となる質問は、Hibernate ORMの使い方についてです。

Hibernate ORM classic(Hibernate reactive もあります)はブロッキングなので、 @Blocking アノテーションを使用せずにRESTEasy _reactive_で使用することはできません。このアノテーションは、(I/Oスレッドの代わりに)ワーカースレッドを使用するようにディスパッチ戦略を変更します。

結果として得られたモデルは、我々にとっては効率的でわかりやすいものに見えましたが、意識していないユーザーは多くのものを目にしています:

You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.: java.lang.IllegalStateException: You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.

エラーメッセージは明示的です。しかし、このような壁のようなテキストがターミナルに印刷されていても、私たちが満足することはほとんどありません。

"じゃあ、デフォルトでブロッキングをしよう" と言われるかもしれませんが、そんな単純な話ではありません。I/Oスレッドで呼び出されることが期待されるリアクティブAPIをワーカースレッドで呼び出すことは、I/OスレッドでブロッキングAPIを呼び出すことと同じくらい危険です。

新しい世界、新しいルール!

Quarkus 2.2.0では、メソッドのシグネチャに基づいた新しいディスパッチ戦略を導入しました。Quarkusのビルドタイムアプローチでは、ビルド時にメソッドがI/Oスレッドとワーカースレッドのどちらで呼び出されるべきかを賢く推測し、ランタイムのオーバーヘッドを削減することができます。

以下の表は、新しいルールをまとめたものです。

メソッドシグネチャ ディスパッチ戦略

T method(…​)

ワーカースレッド

Uni<T> method(…​)

I/Oスレッド

CompletionStage<T> method(…​)

I/Oスレッド

Multi<T> method(…​)

I/Oスレッド

Publisher<T> method(…​)

I/Oスレッド

@Transactional CompletionStage<T> method(…​)

ワーカースレッド

基本的に、 同期メソッドはワーカースレッドがデフォルトとなり、非同期メソッドはI/Oスレッドがデフォルトとなります(ただし、明示的に別の方法が指定されている場合を除く)。もちろん、 @Blocking@NonBlocking のアノテーションを使って、この動作をオーバーライドすることができます。 @Transactional アノテーションは、デフォルトのルールの例外であり、しばしばブロッキングリソース(エンティティマネージャなど)にアクセスしていることを意味します。

それによって何が変わるのでしょうか?

この新しい戦略が、効率性や柔軟性を損なうことなく、ユーザーエクスペリエンスを向上させることができるか、いくつかの例を挙げて説明します。

Hello RESTEasy Reactive

RESTEasy reactive を使用しても、上記の hello の例は変わりません。

package org.acme;

import javax.ws.rs.GET;
import javax.ws.rs.Path;

@Path("/hello")
public class GreetingResource {

   @GET
   public String hello() {
       return "Hello";
   }
}

そのメソッドは、同期シグネチャを持っているため、ワーカースレッドで呼び出されます。以前(Quarkus 2.2以前)のRESTEasyの _reactive_では、I/Oスレッドで呼び出されていました。この動作に戻すには、 @NonBlocking を追加します。

        package org.acme;

import io.smallrye.common.annotation.NonBlocking;

import javax.ws.rs.GET; import javax.ws.rs.Path;

@Path("/hello") public class GreetingResource {

   @GET
   @NonBlocking
   public String hello() {
       return "Hello";
   }
}

Alternatively, you can return a Uni:

package org.acme;

import io.smallrye.mutiny.Uni;

import javax.ws.rs.GET;
import javax.ws.rs.Path;

@Path("/hello")
public class GreetingResource {

   @GET
   public Uni<String> hello() {
       return Uni.createFrom().item("Hello");
   }
}

Hibernate ORMとの連携

ユーザーからのフィードバックを受けて、RESTEasy reactive でHibernate classic を使用する場合を考えてみましょう。

package org.acme;

import org.jboss.resteasy.reactive.RestQuery;

import javax.ws.rs.GET;
import javax.ws.rs.Path;

@Path("/fruit")
public class FruitResource {

   @GET
   public Fruit getFruit(@RestQuery String name) {
       return Fruit.find("name", name).firstResult();
   }
}

シグネチャが同期であるため、 @Blocking を使用する必要はありません。もう壁のようなテキストは必要ありません。

Hibernate Reactiveとの統合

Hibernate reactive を使用する場合、Mutiny APIを使用することになるので、コードは次のようになります。

package org.acme;

import io.smallrye.mutiny.Uni;
import org.jboss.resteasy.reactive.RestQuery;


import javax.ws.rs.GET;
import javax.ws.rs.Path;

@Path("/fruit")
public class FruitResource {

   @GET
   public Uni<Fruit> getFruit(@RestQuery String name) {
       return Fruit.find("name", name).firstResult();
   }
}

このメソッドはI/Oスレッドで実行されます。これはHibernate reactive が期待していることです。

Kafkaとの連携

HTTPとKafkaを組み合わせる場合(リアクティブメッセージングを使用する場合)は、エミッターを使用します。エミッターのタイプ( Emitter または MutinyEmitter )に応じて、 send メソッドは CompletionStage または Uni を返します。そのため、以下のHTTPメソッドはI/Oスレッドで実行されます。

package org.acme;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MutinyEmitter;
import org.eclipse.microprofile.reactive.messaging.Channel;

import javax.ws.rs.POST;
import javax.ws.rs.Path;

@Path("/fruit")
public class FruitResource {

   @Channel("kafka")
   MutinyEmitter<Fruit> emitter;

   @POST
   public Uni<Void> writeToKafka(Fruit fruit) {
       return emitter.send(fruit);
   }
}

これを同期シグネチャに変更すると、ワーカースレッドで実行されます。

package org.acme;

import io.smallrye.reactive.messaging.MutinyEmitter;
import org.eclipse.microprofile.reactive.messaging.Channel;

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import java.time.Duration;

@Path("/fruit")
public class FruitResource {

   @Channel("kafka")
   MutinyEmitter<Fruit> emitter;

   @POST
   public void writeToKafka(Fruit fruit) {
       System.out.println(Thread.currentThread().getName());
       emitter.send(fruit).await().atMost(Duration.ofSeconds(5));
   }
}

RESTEasy Reactive、Hibernate ORM、Kafkaの組み合わせ

それでは、Resteasy reactive、Hibernate ORM classic、Kafkaを組み合わせて、エンティティを永続化し、それをKafkaトピックに書き込んでみましょう。

package org.acme;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MutinyEmitter;
import org.eclipse.microprofile.reactive.messaging.Channel;

import javax.transaction.Transactional;
import javax.ws.rs.POST;
import javax.ws.rs.Path;

@Path("/fruit")
public class FruitResource {

   @Channel("kafka")
   MutinyEmitter<Fruit> emitter;

   @POST
   @Transactional
   public Uni<Void> persistAndWriteToKafka(Fruit fruit) {
       System.out.println(Thread.currentThread().getName());
       fruit.persist();
       return emitter.send(fruit);
   }
}

このメソッドは、シグネチャにもかかわらず、ワーカースレッドで実行されます。 @Transactional アノテーションにより、ワーカースレッドを使用するディスパッチ戦略が設定されています。

まとめ

Quarkus 2.2では、RESTEasy reactive のディスパッチ戦略がよりスマートになり、開発者のエクスペリエンスが向上しました。

  • リアクティブな方法を学ぶ必要はなく、命令型のコードを使い続けることができます。

  • スレッドについて考える必要はありません。Quarkusがあなたのためにそれを行います。

  • 柔軟性が失われるわけではなく、決定を覆すことができます。

Quarkus 2.3から、QuarkusチームはRESTEasy reactive をHTTP APIを実装するためのデフォルトの方法にすることを考えています。これは、RESTEasy classic エクステンションが廃止されることを意味するものではなく、RESTEasy reactive が負担なくより多くのことを提供できる段階に到達することを意味しています。