The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Quarkusでのコンテキスト伝搬

Traditional blocking code uses ThreadLocal variables to store contextual objects in order to avoid passing them as parameters everywhere. Many Quarkus extensions require those contextual objects to operate properly: RESTEasy Reactive, ArC and Transaction for example.

リアクティブ/非同期コードを書く場合、「後で」実行されるコードブロックのパイプラインに作業を切り 込まなければならず、実際には、定義したメソッドがreturnされた後に実行されます。そのため、 try/finally ブロックや ThreadLocal 変数は動作しなくなります。なぜならば、呼び出し元が finally ブロックを実行した後に、 リアクティブコードは別のスレッドで実行されるからです。

MicroProfile Context Propagation の実装である SmallRye Context Propagation は、リアクティブ/非同期設定でQuarkusエクステンションが正しく動作するようにするために作られました。これは、スレッドローカルにあったコンテキスト値を取得し、コードが呼び出されたときにそれらを復元することで動作します。

ソリューション

次の章で紹介する手順に沿って、ステップを踏んでアプリを作成することをお勧めします。ただし、完成した例にそのまま進んでも構いません。

Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.gitアーカイブ をダウンロードします。

ソリューションは context-propagation-quickstart ディレクトリ にあります。

設定

Mutiny ( quarkus-mutiny のエクステンション) を使用している場合は、コンテキスト伝播を有効にするために quarkus-smallrye-context-propagation のエクステンションを追加するだけです。

つまり、ビルドファイルに以下の依存関係を追加してください:

pom.xml
<!-- RESTEasy Reactive extension if not already included -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-resteasy-reactive</artifactId>
</dependency>
<!-- Context Propagation extension -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-context-propagation</artifactId>
</dependency>
build.gradle
// RESTEasy Reactive extension if not already included
implementation("io.quarkus:quarkus-resteasy-reactive")
// Context Propagation extension
implementation("io.quarkus:quarkus-smallrye-context-propagation")

これにより、ArC、RESTEasy、トランザクションを使用している場合は、コンテキスト伝播をすることができます。

Mutinyとの使用例

Mutiny

このセクションでは、Mutinyのリアクティブ型を使用しています。もしMutinyに慣れていない場合は、まず Mutiny - 直感的なリアクティブプログラミングライブラリ を参照してください。

Kafkaトピックから次の3つのアイテムを読み込み、 Panacheを使ったHibernate ORMでデータベースに格納してから(すべて同じトランザクションで)、クライアントに返すRESTエンドポイントを書くとしたら、次のようになります:

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;

    @Transactional
    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Publisher<Double> prices() {
        // get the next three prices from the price stream
        return Multi.createFrom().publisher(prices)
                .select().first(3)
                // The items are received from the event loop, so cannot use Hibernate ORM (classic)
                // Switch to a worker thread, the transaction will be propagated
                .emitOn(Infrastructure.getDefaultExecutor())
                .map(price -> {
                    // store each price before we send them
                    Price priceEntity = new Price();
                    priceEntity.value = price;
                    // here we are all in the same transaction
                    // thanks to context propagation
                    priceEntity.persist();
                    return price;
                    // the transaction is committed once the stream completes
                });
    }

コンテキストの伝播のための Mutiny サポートのおかげで、これは、そのまま直ぐに動作することに注目してください。3つのアイテムは同じトランザクションを使用して保持され、このトランザクションはストリームが完了するとコミットされます。

CompletionStage の使用例

If you are using CompletionStage you need manual context propagation. You can do that by injecting a ThreadContext or ManagedExecutor that will propagate every context. For example, here we use the Vert.x Web Client to get the list of Star Wars people, then store them in the database using Hibernate ORM with Panache (all in the same transaction) before returning them to the client as JSON using Jackson or JSON-B:

    @Inject ThreadContext threadContext;
    @Inject ManagedExecutor managedExecutor;
    @Inject Vertx vertx;

    @Transactional
    @GET
    @Path("/people")
    public CompletionStage<List<Person>> people() throws SystemException {
        // Create a REST client to the Star Wars API
        WebClient client = WebClient.create(vertx,
                         new WebClientOptions()
                          .setDefaultHost("swapi.dev")
                          .setDefaultPort(443)
                          .setSsl(true));
        // get the list of Star Wars people, with context capture
        return threadContext.withContextCapture(client.get("/api/people/").send())
                .thenApplyAsync(response -> {
                    JsonObject json = response.bodyAsJsonObject();
                    List<Person> persons = new ArrayList<>(json.getInteger("count"));
                    // Store them in the DB
                    // Note that we're still in the same transaction as the outer method
                    for (Object element : json.getJsonArray("results")) {
                        Person person = new Person();
                        person.name = ((JsonObject) element).getString("name");
                        person.persist();
                        persons.add(person);
                    }
                    return persons;
                }, managedExecutor);
    }

ThreadContext または ManagedExecutor を使用することで、ほとんどの有用な関数型と CompletionStage をラップしてコンテキストを伝播させることができます。

注入された ManagedExecutor は、Quarkus のスレッドプールを使用しています。

どのコンテキストが伝搬されるかをオーバーライドする

デフォルトでは、利用可能なすべてのコンテキストが伝搬されます。しかし、この動作をいくつかの方法で上書きすることができます。

設定の使用

以下の設定プロパティでは、伝搬されるコンテキストのデフォルトセットを指定できます。

設定プロパティ 説明 デフォルト値

mp.context.ThreadContext.propagated

伝播されたコンテキストのコンマで区切られたセット

Remaining (すべての非明示的にリスト化されたコンテキスト)

mp.context.ThreadContext.cleared

クリアされたコンテキストのコンマで区切りのセット

None(コンテキストなし)。ただし、伝播されたセットとクリアされたセットのどちらにも Remaining が含まれていない場合は、デフォルトで Remaining (明示されていないすべてのコンテキスト)となります

mp.context.ThreadContext.unchanged

変更されていないコンテキストのコンマで区切られたセット

None (コンテキストなし)

以下のコンテキストは、Quarkusで設定不要で利用可能か、エクステンションを追加するかによって使用できます:

コンテキスト名 ネーム定数 説明

None

ThreadContext.NONE

コンテキストの空のセットを指定するために使用できますが、値を空に設定しても機能します。

Remaining

ThreadContext.ALL_REMAINING

他のセットに明示されていないすべてのコンテキスト

Transaction

ThreadContext.TRANSACTION

JTAトランザクションコンテキスト

CDI

ThreadContext.CDI

CDI(ArC)のコンテキスト

Servlet

N/A

サーブレットコンテキスト

Jakarta REST

N/A

RESTEasy ReactiveまたはRESTEasy Classicのコンテキスト

Application

ThreadContext.APPLICATION

現在の ThreadContextClassLoader

伝播されたコンテキストをアノテーションで上書き

Mutinyが使用しているような自動コンテキスト伝搬を、特定のメソッドでオーバーライドするためには @CurrentThreadContext アノテーションを使用します:

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    // Get rid of all context propagation, since we don't need it here
    @CurrentThreadContext(propagated = {}, unchanged = ThreadContext.ALL_REMAINING)
    public Publisher<Double> prices() {
        // get the next three prices from the price stream
        return Multi.createFrom().publisher(prices)
                .select().first(3);
    }

CDIインジェクションを用いた伝搬されたコンテキストのオーバーライド

また、インジェクションポイントに @ThreadContextConfigアノテーションを使うことでカスタムメイドの ThreadContext を注入することもできます:

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;
    // Get a ThreadContext that doesn't propagate context
    @Inject
    @ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
    SmallRyeThreadContext threadContext;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Publisher<Double> prices() {
        // Get rid of all context propagation, since we don't need it here
        try(CleanAutoCloseable ac = SmallRyeThreadContext.withThreadContext(threadContext)){
            // get the next three prices from the price stream
            return Multi.createFrom().publisher(prices)
                    .select().first(3);
        }
    }

同様に、 @ManagedExecutorConfig アノテーションを使用して設定された ManagedExecutor のインスタンスをインジェクトする類似の方法があります:

    // Custom ManagedExecutor with different async limit, queue and no propagation
    @Inject
    @ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
    ManagedExecutor configuredCustomExecutor;

ManagedExecutorとThreadContextの構成されたCDIインスタンスの共有

同じ ManagedExecutorThreadContext を複数の場所に注入し、その容量を共有したい場合、そのインスタンスに @NamedInstance アノテーションで名前を付けることが出来ます。 @NamedInstance はCDIアノテーションであり、同じ型と名前のすべての注入は、したがって、同じ基礎となるインスタンスを共有します。インスタンスをカスタマイズする必要がある場合は、インジェクションポイントの1つで @ManagedExecutorConfig/ ThreadContextConfig アノテーションを使用して、カスタマイズすることができます。

    // Custom configured ManagedExecutor with name
    @Inject
    @ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
    @NamedInstance("myExecutor")
    ManagedExecutor sharedConfiguredExecutor;

    // Since this executor has the same name, it will be the same instance as above
    @Inject
    @NamedInstance("myExecutor")
    ManagedExecutor sameExecutor;

    // Custom ThreadContext with a name
    @Inject
    @ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
    @NamedInstance("myContext")
    ThreadContext sharedConfiguredThreadContext;

    // Given equal value of @NamedInstance, this ThreadContext will be the same as the above one
    @Inject
    @NamedInstance("myContext")
    ThreadContext sameContext;

CDIのためのコンテキスト伝搬

CDI の観点からは、 @RequestScoped, @ApplicationScoped, @Singleton Beanは伝播され、他のスレッドで利用可能です。 @Dependent BeanやカスタムスコープされたBeanは、CDI コンテキスト伝播を介して自動的に伝播されることはありません。

@ApplicationScoped@Singleton のBeanは常にアクティブなスコープであり、そのため対処が簡単です - コンテキスト伝播タスクは、CDI コンテナーが動作している限り、これらのBeanで動作します。しかし、 @RequestScoped Beanは話が違います。手動で有効化/無効化すると、HTTP リクエストや他のリクエスト/タスクにバインドされます。この場合、元のスレッドがリクエストの終了に到達すると、コンテキストを終了し、それらのBeanで @PreDestroy を呼び出し、コンテキストからクリアされることに注意しなければなりません。その後、他のスレッドからこれらのBeanにアクセスしようとすると、予期せぬ動作をすることがあります。したがって、コンテキストの伝播を介してリクエストスコープされたBeanを使用するすべてのタスクは、元のリクエストの持続時間を超えないような方法で実行されることを確認することが推奨されます。

上記で説明した動作のため、CDI で Context Propagation を使用する際には @PreDestroy@RequestScoped Beanで使用しないことをお勧めします。