Quarkusでのコンテキスト伝搬
Traditional blocking code uses ThreadLocal
variables to store contextual objects in order to avoid
passing them as parameters everywhere. Many Quarkus extensions require those contextual objects to operate
properly: RESTEasy Reactive, ArC and Transaction
for example.
リアクティブ/非同期コードを書く場合、「後で」実行されるコードブロックのパイプラインに作業を切り 込まなければならず、実際には、定義したメソッドがreturnされた後に実行されます。そのため、 try/finally
ブロックや ThreadLocal
変数は動作しなくなります。なぜならば、呼び出し元が finally
ブロックを実行した後に、 リアクティブコードは別のスレッドで実行されるからです。
MicroProfile Context Propagation の実装である SmallRye Context Propagation は、リアクティブ/非同期設定でQuarkusエクステンションが正しく動作するようにするために作られました。これは、スレッドローカルにあったコンテキスト値を取得し、コードが呼び出されたときにそれらを復元することで動作します。
ソリューション
次の章で紹介する手順に沿って、ステップを踏んでアプリを作成することをお勧めします。ただし、完成した例にそのまま進んでも構いません。
Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.git
、 アーカイブ をダウンロードします。
ソリューションは context-propagation-quickstart
ディレクトリ にあります。
設定
Mutiny ( quarkus-mutiny
のエクステンション) を使用している場合は、コンテキスト伝播を有効にするために quarkus-smallrye-context-propagation
のエクステンションを追加するだけです。
つまり、ビルドファイルに以下の依存関係を追加してください:
<!-- RESTEasy Reactive extension if not already included -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive</artifactId>
</dependency>
<!-- Context Propagation extension -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-context-propagation</artifactId>
</dependency>
// RESTEasy Reactive extension if not already included
implementation("io.quarkus:quarkus-resteasy-reactive")
// Context Propagation extension
implementation("io.quarkus:quarkus-smallrye-context-propagation")
これにより、ArC、RESTEasy、トランザクションを使用している場合は、コンテキスト伝播をすることができます。
Mutinyとの使用例
Mutiny
このセクションでは、Mutinyのリアクティブ型を使用しています。もしMutinyに慣れていない場合は、まず Mutiny - 直感的なリアクティブプログラミングライブラリ を参照してください。 |
Kafkaトピックから次の3つのアイテムを読み込み、 Panacheを使ったHibernate ORMでデータベースに格納してから(すべて同じトランザクションで)、クライアントに返すRESTエンドポイントを書くとしたら、次のようになります:
// Get the prices stream
@Inject
@Channel("prices") Publisher<Double> prices;
@Transactional
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Publisher<Double> prices() {
// get the next three prices from the price stream
return Multi.createFrom().publisher(prices)
.select().first(3)
// The items are received from the event loop, so cannot use Hibernate ORM (classic)
// Switch to a worker thread, the transaction will be propagated
.emitOn(Infrastructure.getDefaultExecutor())
.map(price -> {
// store each price before we send them
Price priceEntity = new Price();
priceEntity.value = price;
// here we are all in the same transaction
// thanks to context propagation
priceEntity.persist();
return price;
// the transaction is committed once the stream completes
});
}
コンテキストの伝播のための Mutiny サポートのおかげで、これは、そのまま直ぐに動作することに注目してください。3つのアイテムは同じトランザクションを使用して保持され、このトランザクションはストリームが完了するとコミットされます。
CompletionStage
の使用例
If you are using CompletionStage
you need manual context propagation. You can do that by injecting a ThreadContext
or ManagedExecutor
that will propagate every context. For example, here we use the Vert.x Web Client
to get the list of Star Wars people, then store them in the database using
Hibernate ORM with Panache (all in the same transaction) before returning
them to the client as JSON using Jackson or JSON-B:
@Inject ThreadContext threadContext;
@Inject ManagedExecutor managedExecutor;
@Inject Vertx vertx;
@Transactional
@GET
@Path("/people")
public CompletionStage<List<Person>> people() throws SystemException {
// Create a REST client to the Star Wars API
WebClient client = WebClient.create(vertx,
new WebClientOptions()
.setDefaultHost("swapi.dev")
.setDefaultPort(443)
.setSsl(true));
// get the list of Star Wars people, with context capture
return threadContext.withContextCapture(client.get("/api/people/").send())
.thenApplyAsync(response -> {
JsonObject json = response.bodyAsJsonObject();
List<Person> persons = new ArrayList<>(json.getInteger("count"));
// Store them in the DB
// Note that we're still in the same transaction as the outer method
for (Object element : json.getJsonArray("results")) {
Person person = new Person();
person.name = ((JsonObject) element).getString("name");
person.persist();
persons.add(person);
}
return persons;
}, managedExecutor);
}
ThreadContext
または ManagedExecutor
を使用することで、ほとんどの有用な関数型と CompletionStage
をラップしてコンテキストを伝播させることができます。
注入された |
どのコンテキストが伝搬されるかをオーバーライドする
デフォルトでは、利用可能なすべてのコンテキストが伝搬されます。しかし、この動作をいくつかの方法で上書きすることができます。
設定の使用
以下の設定プロパティでは、伝搬されるコンテキストのデフォルトセットを指定できます。
設定プロパティ | 説明 | デフォルト値 |
---|---|---|
|
伝播されたコンテキストのコンマで区切られたセット |
|
|
クリアされたコンテキストのコンマで区切りのセット |
|
|
変更されていないコンテキストのコンマで区切られたセット |
|
以下のコンテキストは、Quarkusで設定不要で利用可能か、エクステンションを追加するかによって使用できます:
コンテキスト名 | ネーム定数 | 説明 |
---|---|---|
|
コンテキストの空のセットを指定するために使用できますが、値を空に設定しても機能します。 |
|
|
他のセットに明示されていないすべてのコンテキスト |
|
|
JTAトランザクションコンテキスト |
|
|
CDI(ArC)のコンテキスト |
|
|
N/A |
サーブレットコンテキスト |
|
N/A |
RESTEasy ReactiveまたはRESTEasy Classicのコンテキスト |
|
現在の |
伝播されたコンテキストをアノテーションで上書き
Mutinyが使用しているような自動コンテキスト伝搬を、特定のメソッドでオーバーライドするためには @CurrentThreadContext
アノテーションを使用します:
// Get the prices stream
@Inject
@Channel("prices") Publisher<Double> prices;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
// Get rid of all context propagation, since we don't need it here
@CurrentThreadContext(propagated = {}, unchanged = ThreadContext.ALL_REMAINING)
public Publisher<Double> prices() {
// get the next three prices from the price stream
return Multi.createFrom().publisher(prices)
.select().first(3);
}
CDIインジェクションを用いた伝搬されたコンテキストのオーバーライド
また、インジェクションポイントに @ThreadContextConfig
アノテーションを使うことでカスタムメイドの ThreadContext
を注入することもできます:
// Get the prices stream
@Inject
@Channel("prices") Publisher<Double> prices;
// Get a ThreadContext that doesn't propagate context
@Inject
@ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
SmallRyeThreadContext threadContext;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Publisher<Double> prices() {
// Get rid of all context propagation, since we don't need it here
try(CleanAutoCloseable ac = SmallRyeThreadContext.withThreadContext(threadContext)){
// get the next three prices from the price stream
return Multi.createFrom().publisher(prices)
.select().first(3);
}
}
同様に、 @ManagedExecutorConfig
アノテーションを使用して設定された ManagedExecutor
のインスタンスをインジェクトする類似の方法があります:
// Custom ManagedExecutor with different async limit, queue and no propagation
@Inject
@ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
ManagedExecutor configuredCustomExecutor;
ManagedExecutorとThreadContextの構成されたCDIインスタンスの共有
同じ ManagedExecutor
や ThreadContext
を複数の場所に注入し、その容量を共有したい場合、そのインスタンスに @NamedInstance
アノテーションで名前を付けることが出来ます。 @NamedInstance
はCDIアノテーションであり、同じ型と名前のすべての注入は、したがって、同じ基礎となるインスタンスを共有します。インスタンスをカスタマイズする必要がある場合は、インジェクションポイントの1つで @ManagedExecutorConfig
/ ThreadContextConfig
アノテーションを使用して、カスタマイズすることができます。
// Custom configured ManagedExecutor with name
@Inject
@ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
@NamedInstance("myExecutor")
ManagedExecutor sharedConfiguredExecutor;
// Since this executor has the same name, it will be the same instance as above
@Inject
@NamedInstance("myExecutor")
ManagedExecutor sameExecutor;
// Custom ThreadContext with a name
@Inject
@ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
@NamedInstance("myContext")
ThreadContext sharedConfiguredThreadContext;
// Given equal value of @NamedInstance, this ThreadContext will be the same as the above one
@Inject
@NamedInstance("myContext")
ThreadContext sameContext;
CDIのためのコンテキスト伝搬
CDI の観点からは、 @RequestScoped
, @ApplicationScoped
, @Singleton
Beanは伝播され、他のスレッドで利用可能です。 @Dependent
BeanやカスタムスコープされたBeanは、CDI コンテキスト伝播を介して自動的に伝播されることはありません。
@ApplicationScoped
と @Singleton
のBeanは常にアクティブなスコープであり、そのため対処が簡単です - コンテキスト伝播タスクは、CDI コンテナーが動作している限り、これらのBeanで動作します。しかし、 @RequestScoped
Beanは話が違います。手動で有効化/無効化すると、HTTP リクエストや他のリクエスト/タスクにバインドされます。この場合、元のスレッドがリクエストの終了に到達すると、コンテキストを終了し、それらのBeanで @PreDestroy
を呼び出し、コンテキストからクリアされることに注意しなければなりません。その後、他のスレッドからこれらのBeanにアクセスしようとすると、予期せぬ動作をすることがあります。したがって、コンテキストの伝播を介してリクエストスコープされたBeanを使用するすべてのタスクは、元のリクエストの持続時間を超えないような方法で実行されることを確認することが推奨されます。
上記で説明した動作のため、CDI で Context Propagation を使用する際には |