Reactive MessagingによるQuarkus仮想スレッドのサポート
このガイドでは、Quarkusでメッセージ処理アプリケーションを記述する際に、Java仮想スレッドを利用する方法について説明します。
このガイドでは、Reactive Messaging エクステンションを使用した仮想スレッドの使用に焦点を当てます。 Java 仮想スレッド全般と、REST サービスに対する Quarkus 仮想スレッドのサポートについて、詳しくは Quarkus 仮想スレッドのサポートを使用してシンプルなりアクティブ RESTORE サービスを作成する を参照してください。 特定の Reactive Messaging エクステンションのリファレンスガイドについては、Apache Kafka リファレンスガイド、Reactive Messaging AMQP 1.0 コネクター、Reactive Messaging RabbitMQ コネクター、または Apache Pulsar リファレンスガイド を参照してください。 |
デフォルトでは、Reactive Messaging はイベントループスレッドでメッセージ処理メソッドを呼び出します。 このトピックの詳細は、Quarkus Reactive Architecture ドキュメント を参照してください。 ただし、外部サービスの呼び出しやデータベース操作などのブロッキング処理と Reactive Messaging を組み合わせる必要がある場合もあります。 これには、処理が ブロッキング であり、ワーカースレッドで実行する必要があることを示す @Blocking アノテーションを使用できます。 ブロッキング処理の詳細は、SmallRye Reactive Messaging ドキュメント を参照してください。
Quarkus のリアクティブメッセージングにおける仮想スレッドのサポートは、メッセージ処理をイベントループスレッドまたはワーカースレッドで実行するのではなく、仮想スレッドにオフロードすることを目的としています。
メッセージコンシューマーメソッドで仮想スレッドのサポートを有効にするには、@RunOnVirtualThread アノテーションをメソッドに追加します。 JDK に互換性がある場合 (Java 19 以降のバージョン、推奨は 21 以降)、各着信メッセージが新しい仮想スレッドにオフロードされます。 こうすることで、仮想スレッドがマウントされているプラットフォームスレッドをブロックせずに、ブロッキング操作を実行できるようになります。
Reactive Messaging Kafka エクステンションの使用例
仮想スレッドで Kafka レコードを処理する方法の例を見てみましょう。 まず、ビルドファイルにリアクティブメッセージングエクステンションの依存関係があることを確認します。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
implementation("io.quarkus:quarkus-messaging-kafka")
Java 19 以降 (推奨は 21 以降) を使用していることを確認する必要があります。これは、次のように pom.xml
ファイルで強制できます。
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
</properties>
アプリケーションを実行します。
java -jar target/quarkus-app/quarkus-run.jar
または、 quarkus-maven-plugin
設定に以下を注入して Quarkus 開発モードを使用します。
<maven.compiler.release>21</maven.compiler.release>
その後、 @Incoming
アノテーションが付けられたコンシューマーメソッドで、アノテーション @RunOnVirtualThread
を使い始めることができます。
次の例では、REST クライアント を使用して、REST エンドポイントへのブロッキング呼び出しを行います。
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import io.smallrye.common.annotation.RunOnVirtualThread;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PriceConsumer {
@RestClient (2)
PriceAlertService alertService;
@Incoming("prices")
@RunOnVirtualThread (1)
public void consume(double price) {
if (price > 90.0) {
alertService.alert(price); (3)
}
}
@Outgoing("prices-out") (4)
public Multi<Double> randomPriceGenerator() {
return Multi.createFrom().<Random, Double>generator(Random::new, (r, e) -> {
e.emit(r.nextDouble(100));
return r;
});
}
}
1 | @Incoming メソッドの @RunOnVirtualThread アノテーションにより、メソッドが仮想スレッドで呼び出されます。 |
2 | REST クライアントスタブには @RestClient アノテーションが注入されます。 |
3 | alert メソッドは、REST 呼び出しが返されるまで仮想スレッドをブロックします。 |
4 | この @Outgoing メソッドは、ランダムな価格を生成し、アプリケーションによって消費される Kafka トピックにその価格を書き込みます。 |
デフォルトでは、Reactive Messaging のメッセージ処理はメッセージの順序を維持しながら順番に実行されることに注意してください。
同様に、 @Blocking(ordered = false)`アノテーションはこの動作を変更します。
`@RunOnVirtualThread
を使用すると、順序を保持せずにメッセージの同時処理が強制されます。
@RunOnVirtualThread アノテーションの使用
@RunOnVirtualThread に適格なメソッドシグネチャー
@Blocking
でアノテーションを付けることができるメソッドのみが @RunOnVirtualThreads
を使用できます。
適格なメソッドシグネチャーは次のとおりです。
-
@Outgoing("channel-out") O generator()
-
@Outgoing("channel-out") Message<O> generator()
-
@Incoming("channel-in") @Outgoing("channel-out") O process(I in)
-
@Incoming("channel-in") @Outgoing("channel-out") Message<O> process(I in)
-
@Incoming("channel-in") void consume(I in)
-
@Incoming("channel-in") Uni<Void> consume(I in)
-
@Incoming("channel-in") Uni<Void> consume(Message<I> msg)
-
@Incoming("channel-in") CompletionStage<Void> consume(I in)
-
@Incoming("channel-in") CompletionStage<Void> consume(Message<I> msg)
メソッドとクラスでの @RunOnVirtualThread アノテーションの使用
@RunOnVirtualThread
アノテーションは、次のような形で使用できます。
-
リアクティブメッセージングメソッドで直接使用 - このメソッドはブロッキングとみなされ、仮想スレッドで実行されます。
-
リアクティブメッセージングメソッドを含むクラスで使用 -
@Blocking
を持つこのクラスアノテーションのメソッドは、通常のワーカースレッドを使用するように設定されたプール名がアノテーションによって定義されている場合を除き、仮想スレッドで実行されます。
たとえば、次のようにメソッドで直接 @RunOnVirtualThread
を使用できます。
@ApplicationScoped
public class MyBean {
@Incoming("in")
@Outgoing("out")
@RunOnVirtualThread
public String process(String s) {
// Called on a new virtual thread for every incoming message
}
}
または、クラス自体で @RunOnVirtualThread
を使用することもできます。
@ApplicationScoped
@RunOnVirtualThread
public class MyBean {
@Incoming("in1")
@Outgoing("out1")
public String process(String s) {
// Called on the event loop - no @Blocking annotation
}
@Incoming("in2")
@Outgoing("out2")
@Blocking
public String process(String s) {
// Call on a new virtual thread for every incoming message
}
@Incoming("in3")
@Outgoing("out3")
@Blocking("my-worker-pool")
public String process(String s) {
// Called on a regular worker thread from the pool named "my-worker-pool"
}
}
最大同時実行数の制御
仮想スレッドの軽量な性質を活用するために、 @RunOnVirtualThread
アノテーションが付けられたメソッドのデフォルトの最大同時実行数は 1024 です。
プラットフォームスレッドとは異なり、仮想スレッドはプールされず、メッセージごとに作成されます。したがって、最大同時実行数はすべての @RunOnVirtualThread
メソッドに個別に適用されます。
同時実行レベルをカスタマイズする方法は 2 つあります。
-
@RunOnVirtualThread
アノテーションは、ワーカー名を指定するための @Blocking アノテーションと一緒に使用できます。@Incoming("prices") @RunOnVirtualThread @Blocking("my-worker") public void consume(double price) { //... }
次に、たとえば、このメソッドの最大同時実行数を 30 に設定するには、設定プロパティー
smallrye.messaging.worker.my-worker.max-concurrency=30
を使用して設定します。 -
ワーカー名が設定されていないすべての
@RunOnVirtualThread
メソッドに対して、設定プロパティーsmallrye.messaging.worker.<virtual-thread>.max-concurrency
を使用できます。