仮想スレッドを使ったさらにシンプルなQuarkusでのリアクティブRESTサービスの作成
このガイドではQuarkusでRESTサービスを記述する際にJava 19の仮想スレッドを利用する方法について説明します。
これは仮想スレッドを使用してリアクティブ REST サービスを記述するためのリファレンスガイドです。リアクティブ REST サービスの軽い導入としてはJSON RESTサービスの実装 を参照してください。詳細な説明については RESTEASY REACTIVE を使用して REST サービスを作成する を参照してください。 |
仮想スレッドとは何ですか?
用語解説
- OSスレッド
-
オペレーティングシステムによって管理される「スレッド(より糸)のような」なデータ構造。
- プラットフォームスレッド
-
Java 19 まではThread クラスのすべてのインスタンスはプラットフォームスレッド、つまり OSスレッドのラッパーでした。プラットフォームスレッドを作成するとOSスレッドが作成され、プラットフォームスレッドをブロックするとOSスレッドがブロックされます。
- 仮想スレッド
-
軽量で、JVMが管理するスレッド。 Thread クラスを拡張していますが、1つの特定のOSスレッドに縛られることはありません。したがって、仮想スレッドのスケジューリングはJVMの責任です。
- キャリアスレッド
-
仮想スレッドを実行するために使用されるプラットフォームスレッドをキャリアと呼びます。これは Thread や VirtualThreadとは別のクラスではなく、機能的な呼称です。
仮想スレッドとプラットフォームスレッドの違い
ここではその概要を説明しますが、詳細はは JEP 425 を確認してください。
仮想スレッドはI/Oバウンドワークロードのためのプラットフォームスレッドの安価な代替を提供することを目的としたJava 19から利用可能な機能です。
これまでプラットフォームスレッドはJVMの並行処理単位でした。これはOSの構造のラッパーです。つまり、Javaプラットフォームスレッドを作成することは、実際にはOSに"スレッド(より糸)のような"構造を作成することになるのです。
一方で仮想スレッドはJVMによって管理されます。実行するにはプラットフォームスレッド(その仮想スレッドのキャリアとして機能する)にマウントされる必要があります。なので次のような特徴を持つように設計されています:
- 軽量
-
仮想スレッドはプラットフォームスレッドよりもメモリ上で占有する領域が小さくなります。したがってヒープを爆発させずにプラットフォームスレッドよりも多くの仮想スレッドを同時に使用することが可能になります。デフォルトでは、プラットフォームスレッドは約1MBのスタックで作成され、仮想スレッドのスタックは"使った分だけ"です。これらの数字と仮想スレッドの他の論拠はLoomプロジェクトのリードディベロッパーが行ったプレゼンテーションで知ることができます: https://youtu.be/lIq-x_iI-kc?t=543 。
- 安価に作成できる
-
Javaでプラットフォームスレッドを作成するのは時間がかかります。現時点ではスレッドの起動にかかる時間を最小限にするために、一度作成したスレッドを再利用するプーリングなどの技術を使うことが強く推奨されています(同様にメモリ消費を抑えるためにスレッドの最大数を制限することも推奨されています)。仮想スレッドは必要なときに作る使い捨ての存在であり、スレッドをプールしたり異なるタスクに再利用したりすることは推奨されません。
- 安価なブロック
-
ブロッキングI/Oを実行する場合、JavaプラットフォームのスレッドにラップされたOSスレッドは待ち行列に入れられ、新しいスレッドコンテキストをCPUコアにロードするためにコンテキストスイッチが発生します。この操作には時間がかかります。仮想スレッドはJVMによって管理されるためブロッキング処理を実行してもOSスレッドがブロックされることはありません。その状態は単にヒープに格納され、別の仮想スレッドが同じJavaプラットフォームのスレッドで実行されます。
仮想スレッドはI/Oバウンドワークロードにのみに有効
私たちはプラットフォームスレッドよりもはるかに多くの仮想スレッドを作成できることを知っています。長い計算を行うために仮想スレッドを使いたくなるかもしれません(CPUバウンドワークロード)。これは逆効果とまでは言いませんが無駄になります。CPUバウンドとはI/Oの完了を待つ間にスレッドをすばやくスワップすることではなく、実際に何かを計算するためにスレッドをCPUコアに接続したままにしておくことです。このシナリオでは数十個のCPUコアに対して数千個のスレッドを持つことは無意味であり、仮想スレッドがCPUバウンドワークロードの性能を向上させることはありません。
リアクティブなRESTサービスに仮想スレッドを導入する
仮想スレッドは使い捨てのエンティティなので、quarkus-loomの基本的な考え方では、エンドポイントハンドラをイベントループ(RESTeasy-reactiveの場合)やプラットフォームワーカスレッドで実行せずに新しい仮想スレッドでオフロードします。
そのためには、エンドポイントに @RunOnVirtualThread アノテーションを追加すれば十分です。JDKに互換性がある場合(Java 19またはそれ以降のバージョン)はエンドポイントは仮想スレッドにオフロードされます。この場合は仮想スレッドがマウントされているプラットフォームスレッドをブロックせずにブロック操作を実行できるようになります。
このアノテーションは @Blocking アノテーションが付けられたエンドポイント、またはそのシグネチャのためにブロッキングとみなされるエンドポイントとの組み合わせでのみ使用することができます。詳しくは 実行モデル、ブロッキング、ノンブロッキング をご確認ください。
はじめに
次のインポートをビルドファイルに追加します:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive</artifactId>
</dependency>
implementation("io.quarkus:quarkus-resteasy-reactive")
また、Javaのバージョン19を使用する必要があります。これは、pom.xmlファイルで次のように指定することができます:
<properties>
<maven.compiler.source>19</maven.compiler.source>
<maven.compiler.target>19</maven.compiler.target>
</properties>
仮想スレッドはまだ実験的な機能なので、 --enable-preview
フラグを付けてアプリケーションを起動する必要があります:
java --enable-preview -jar target/quarkus-app/quarkus-run.jar
以下の例は3つのエンドポイントの違いを示しています。これらのエンドポイントはすべてデータベースにクエリーを投げてクライアントに結果を返しています。
-
1つ目は伝統的なブロッキングスタイルを採用しており、そのシグネチャからブロッキングとみなされます。
-
2つ目はMutinyのリアクティブストリームを宣言的なスタイルで使用し、そのシグネチャによりノンブロッキングとみなされます。
-
3つ目はMutinyのリアクティブストリームを同期的に使うもので"リアクティブ型"を返さないのでブロッキングとみなされ、 @RunOnVirtualThread アノテーションが使えます。
Mutinyを使用する場合は仮想スレッドで使用するための代替の"xAndAwait"メソッドが提供されます。これらは、I/Oの完了を待っても、キャリアスレッドを"拘束"して性能を悪化させないようにするものです。拘束とは、 このセクション で説明する現象です。
つまり、mutiny環境は仮想スレッドにとって安全な環境なのです。Mutinyが提供する保証については後で詳しく説明します。
package org.acme.rest;
import org.acme.fortune.model.Fortune;
import org.acme.fortune.repository.FortuneRepository;
import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.mutiny.Uni;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import java.util.List;
import java.util.Random;
@Path("")
public class FortuneResource {
@GET
@Path("/blocking")
public Fortune blocking() {
var list = repository.findAllBlocking();
return pickOne(list);
}
@GET
@Path("/reactive")
public Uni<Fortune> reactive() {
return repository.findAllAsync()
.map(this::pickOne);
}
@GET
@Path("/virtual")
@RunOnVirtualThread
public Fortune virtualThread() {
var list = repository.findAllAsyncAndAwait();
return pickOne(list);
}
}
複雑なロジックの簡略化
先ほどの例はささいなもので、命令型スタイルが複雑なリアクティブ操作をいかに簡略化できるかをとらえていません。次はもっと複雑な例です。エンドポイントはデータベース内のすべての運勢(fortune)を取得し、 結果をクライアントに返す前にそれぞれの運勢に引用文を追加します。
package org.acme.rest;
import org.acme.fortune.model.Fortune;
import org.acme.fortune.repository.FortuneRepository;
import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.mutiny.Uni;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import java.util.List;
import java.util.Random;
@Path("")
public class FortuneResource {
private final FortuneRepository repository;
public Uni<List<String>> getQuotesAsync(int size){
//...
//asynchronously returns a list of quotes from an arbitrary source
}
@GET
@Path("/quoted-blocking")
public List<Fortune> getAllQuotedBlocking() {
// we get the list of fortunes
var fortunes = repository.findAllBlocking();
// we get the list of quotes
var quotes = getQuotesAsync(fortunes.size()).await().indefinitely();
// we append each quote to each fortune
for(int i=0; i < fortunes.size(); i ++){
fortunes.get(i).title += " - " + quotes.get(i);
}
return fortunes;
}
@GET
@Path("/quoted-reactive")
public Uni<List<Fortune>> getAllQuotedReactive() {
// we first fetch the list of resource and we memoize it
// to avoid fetching it again everytime need it
var fortunes = repository.findAllAsync().memoize().indefinitely();
// once we get a result for fortunes,
// we know its size and can thus query the right number of quotes
var quotes = fortunes.onItem().transformToUni(list -> getQuotesAsync(list.size()));
// we now need to combine the two reactive streams
// before returning the result to the user
return Uni.combine().all().unis(fortunes, quotes).asTuple().onItem().transform(tuple -> {
var todoList = tuple.getItem1();
//can await it since it is already resolved
var quotesList = tuple.getItem2();
for(int i=0; i < todoList.size(); i ++){
todoList.get(i).title += " - " + quotesList.get(i);
}
return todoList;
});
}
@GET
@RunOnVirtualThread
@Path("/quoted-virtual-thread")
public List<Fortune> getAllQuotedVirtualThread() {
//we get the list of fortunes
var fortunes = repository.findAllAsyncAndAwait();
//we get the list of quotes
var quotes = getQuotesAsync(fortunes.size()).await().indefinitely();
//we append each quote to each fortune
for(int i=0; i < fortunes.size(); i ++){
fortunes.get(i).title += " - " + quotes.get(i);
}
return fortunes;
}
}
拘束の場合
仮想スレッドがそのキャリア(マウントされているプラットフォームスレッド)を"拘束"する場合があります。この場合はプラットフォームスレッドは通常のブロッキングシナリオと同じようにブロックされます。
JEP 425 によると、これは2つの状況で発生する場合があります:
-
when a virtual thread performs a blocking operation inside a
synchronized
block or method -
ネイティブメソッドや外部関数内でブロッキング操作を実行した場合
自分たちのコードでこのような状況を回避するのはかなり簡単ですが、使用する依存関係をすべて検証するのは困難です。典型的な例としては、仮想スレッドの実験をしているときに postgresqlのJDBCドライバ を使うと拘束が多発することに気づきました。
postgresqlのJDBCの問題
これまでの実験から、仮想スレッドがJDBCドライバを使ってデータベースに問い合わせを行う操作をしている間、仮想スレッドがキャリアスレッドを拘束することが分かっています。
最初の例で使用した findAllBlocking()
メソッドのコードを表示してみましょう。
//import ...
@ApplicationScoped
public class FortuneRepository {
// ...
public List<Fortune> findAllBlocking() {
List<Fortune> fortunes = new ArrayList<>();
Connection conn = null;
try {
conn = db.getJdbcConnection();
var preparedStatement = conn.prepareStatement(SELECT_ALL);
ResultSet rs = preparedStatement.executeQuery();
while (rs.next()) {
fortunes.add(create(rs));
}
rs.close();
preparedStatement.close();
} catch (SQLException e) {
logger.warn("Unable to retrieve fortunes from the database", e);
} finally {
close(conn);
}
return fortunes;
}
//...
}
実際のクエリは ResultSet rs = preparedStatement.executeQuery();
で実行されます。以下は、postgresql-jdbc ドライバ 42.5.0 で実装されている方法です:
class PgPreparedStatement extends PgStatement implements PreparedStatement {
// ...
/*
* A Prepared SQL query is executed and its ResultSet is returned
*
* @return a ResultSet that contains the data produced by the * query - never null
*
* @exception SQLException if a database access error occurs
*/
@Override
public ResultSet executeQuery() throws SQLException {
synchronized (this) {
if (!executeWithFlags(0)) {
throw new PSQLException(GT.tr("No results were returned by the query."), PSQLState.NO_DATA);
}
return getSingleResultSet();
}
}
// ...
}
この synchronized
のブロックが原因です。ロックに置き換えることは良い解決策ですが、それだけでは十分ではありません。 synchronized
ブロックは、 executeWithFlags(int flag)
でも使用されています。postgresqlのjdbcドライバが仮想スレッドに準拠していることを確認するためには体系的なレビューが必要になります。
リアクティブドライバによる救いの手
vertx-sql-client はリアクティブクライアントであり、データベースとのトランザクションの完了を待っている間はブロックしないことになっています。しかしながら、 smallrye-mutiny-vertx-sqlclient を使用する場合、トランザクションの完了を待つ可変メソッドを使用し、ブロッキングの挙動を模倣することが可能です。
以下はブロッキングがリアクティブメソッドに置き換えられているだけの、先ほど見た FortuneRepository
です。
//import ...
@ApplicationScoped
public class FortuneRepository {
// ...
public Uni<List<Fortune>> findAllAsync() {
return db.getPool()
.preparedQuery(SELECT_ALL).execute()
.map(this::createListOfFortunes);
}
public List<Fortune> findAllAsyncAndAwait() {
var rows = db.getPool().preparedQuery(SELECT_ALL)
.executeAndAwait();
return createListOfFortunes(rows);
}
//...
}
postgresqlのjdbcドライバ とは逆に、 synchronized
ブロックが使用されるべきではない場所で使用されることはなく、 await
動作は、拘束を発生させないロックとラッチを使用して実装されています。
smallrye-mutiny-vertx-sqlclient の同期メソッドと仮想スレッドを使用することで、同期ブロッキングスタイルを使用でき、キャリアスレッドの拘束を回避し、純粋なリアクティブ実装に近い性能を得ることができます。
性能に関するポイント
Our experiments seem to indicate that Quarkus with virtual threads will scale better than Quarkus blocking (offloading the computation on a pool of platform worker threads) but not as well as Quarkus reactive. The memory consumption especially might be an issue: if your system needs to keep its memory footprint low we would advise you stick to using reactive constructs.
この性能低下は仮想スレッドそのものが原因ではなく、Vert.x/Netty(Quarkusの基盤となるリアクティブエンジン)と仮想スレッドの間の相互作用に起因しているようです。このことは、これから説明する問題にも表れています。
Nettyの問題
For JSON serialization, Netty uses their custom implementation of thread locals, FastThreadLocal
to store buffers. When using virtual threads in quarkus, the number of virtual threads simultaneously living in the service is directly related to the incoming traffic. It is possible to get hundreds of thousands, if not millions, of them.
もし、データをJSONにシリアライズする必要がある場合、多数の FastThreadLocal
のインスタンスを作成することになることから大量のメモリを消費し、ガベージコレクタにかかる負担も大きくなってしまいます。これは最終的にアプリケーションの性能に影響を与え、スケーラビリティを阻害することになります。
これはリアクティブスタックと仮想スレッドのミスマッチの好例です。根本的な仮説が全く異なるため、結果として最適化も異なってきます。Nettyは少数のイベントループ(QuarkusのデフォルトではCPUのコア数と同じ数のイベントループ)を使用するシステムを想定していますが、何十万ものスレッドを取得します。私たちが仮想スレッドを使ってどのような未来を描いているかについては、 このメール を参照してください。
Nettyの問題に対する我々の解決策
アップストリームのNettyに手を加えることなくこのリソースの浪費を避けるために、ビルド時にスレッドローカルの作成を担当するクラスのバイトコードを変更するエクステンションを書きました。このエクステンションを使うと、TechempowerスイートのJsonのSerializationのテストにおけるQuarkusの仮想スレッドのパフォーマンスが80%近く向上し、リアクティブエンドポイントとほぼ同等になりました。
使用するにはこれを依存関係として追加する必要があります:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-netty-loom-adaptor</artifactId>
</dependency>
また、このエクステンションで行う作業の中には、特別なアクセスが必要なものがあります、次のことが必要になります
-
-Dnet.bytebuddy.experimental
フラグを付けてコンパイルをする -
実行時に
--add-opens java.base/java.lang=ALL-UNNAMED
フラグをつけてjava.base.lang
モジュールを開く
このエクステンションはあくまでパフォーマンス向上のためのもので、使わなくても全く問題ありません。
devモードについて
Quarkusをdevモードで使用したい場合、このガイドで説明したフラグを手動で指定することはできません。 代わりに以下のように quarkus-maven-plugin
の設定で指定します。
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
<configuration>
<source>19</source>
<target>19</target>
<compilerArgs>
<arg>--enable-preview</arg>
<arg>-Dnet.bytebuddy.experimental</arg>
</compilerArgs>
<jvmArgs>--enable-preview --add-opens java.base/java.lang=ALL-UNNAMED</jvmArgs>
</configuration>
</plugin>
If you don’t want to specify the opening of the java.lang
module in your pom.xml file, you can also specify it as an argument when you start the dev mode.
quarkus-maven-pluginの設定はシンプルになるでしょう:
<configuration>
<source>19</source>
<target>19</target>
<compilerArgs>
<arg>--enable-preview</arg>
<arg>-Dnet.bytebuddy.experimental</arg>
</compilerArgs>
<jvmArgs>--enable-preview</jvmArgs>
</configuration>
また、コマンドは次のようになります:
mvn quarkus:dev -Dopen-lang-package