The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Quarkusでのコンテキスト伝搬

従来のブロッキングコードでは ThreadLocal変数を使用してコンテキストオブジェクトを格納し、パラメータとして持ち回ることを避けていました。多くのQuarkusのエクステンションは、適切に動作するためにこれらのコンテキストオブジェクトを必要とします。例えば、 RESTEasyArCTransactionなどです。

リアクティブ/非同期コードを書く場合、「後で」実行されるコードブロックのパイプラインに作業を切り 込まなければならず、実際には、定義したメソッドがreturnされた後に実行されます。そのため、 try/finally ブロックや ThreadLocal 変数は動作しなくなります。なぜならば、呼び出し元が finally ブロックを実行した後に、 リアクティブコードは別のスレッドで実行されるからです。

MicroProfile コンテキスト伝搬 の実装である SmallRye コンテキスト伝搬 は、リアクティブ/非同期設定でQuarkusエクステンションが正しく動作するようにするために作られました。これは、スレッドローカルにあったコンテキスト値を取得し、コードが呼び出されたときにそれらを復元することで動作します。

ソリューション

次のセクションで紹介する手順に沿って、ステップを踏んでアプリを作成することをお勧めします。ただし、完成した例にそのまま進んでも構いません。

Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.gitアーカイブ をダウンロードします。

このソリューションは context-propagation-quickstart directoryにあります。

設定

If you are using Mutiny (the quarkus-mutiny extension), you just need to add the quarkus-smallrye-context-propagation extension to enable context propagation.

つまり、ビルドファイルに以下の依存関係を追加してください:

pom.xml
<!-- RESTEasy Reactive extension if not already included -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-resteasy-reactive</artifactId>
</dependency>
<!-- Context Propagation extension -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-context-propagation</artifactId>
</dependency>
build.gradle
// RESTEasy Reactive extension if not already included
implementation("io.quarkus:quarkus-resteasy-reactive")
// Context Propagation extension
implementation("io.quarkus:quarkus-smallrye-context-propagation")

これにより、ArC、RESTEasy、トランザクションを使用している場合は、コンテキスト伝播をすることができます。

Mutinyとの使用例

Mutiny

このセクションでは、Mutinyのリアクティブ型を使用しています。もしMutinyに慣れていない場合は、まず Mutiny - 直感的なリアクティブプログラミングライブラリ を参照してください。

Kafkaトピックから次の3つのアイテムを読み込み、 Panacheを使ったHibernate ORMでデータベースに格納してから(すべて同じトランザクションで)、クライアントに返すRESTエンドポイントを書くとしたら、次のようになります:

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;

    @Transactional
    @GET
    @Path("/prices")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Publisher<Double> prices() {
        // get the next three prices from the price stream
        return Multi.createFrom().publisher(prices)
                .select().first(3)
                // The items are received from the event loop, so cannot use Hibernate ORM (classic)
                // Switch to a worker thread, the transaction will be propagated
                .emitOn(Infrastructure.getDefaultExecutor())
                .map(price -> {
                    // store each price before we send them
                    Price priceEntity = new Price();
                    priceEntity.value = price;
                    // here we are all in the same transaction
                    // thanks to context propagation
                    priceEntity.persist();
                    return price;
                    // the transaction is committed once the stream completes
                });
    }

コンテキスト伝播のための Mutiny サポートのおかげで、これは、そのまま直ぐに動作することに注目してください。3つのアイテムは同じトランザクションを使用して保持され、このトランザクションはストリームが完了するとコミットされます。

CompletionStage の使用例

CompletionStageを使っている場合は、手動でのコンテキストの伝播が必要です。そのためには、すべてのコンテキストを伝搬する ThreadContext または ManagedExecutor を注入することができます。例えば、ここでは Vert.x Web Clientを使用してStar Warsの人々のリストを取得し、 Panacheを使用したHibernate ORMを使用してデータベースに格納し(すべて同じトランザクションで)、 JacksonまたはJSON-Bを使用してJSONとしてクライアントに返しています:

    @Inject ThreadContext threadContext;
    @Inject ManagedExecutor managedExecutor;
    @Inject Vertx vertx;

    @Transactional
    @GET
    @Path("/people")
    public CompletionStage<List<Person>> people() throws SystemException {
        // Create a REST client to the Star Wars API
        WebClient client = WebClient.create(vertx,
                         new WebClientOptions()
                          .setDefaultHost("swapi.dev")
                          .setDefaultPort(443)
                          .setSsl(true));
        // get the list of Star Wars people, with context capture
        return threadContext.withContextCapture(client.get("/api/people/").send())
                .thenApplyAsync(response -> {
                    JsonObject json = response.bodyAsJsonObject();
                    List<Person> persons = new ArrayList<>(json.getInteger("count"));
                    // Store them in the DB
                    // Note that we're still in the same transaction as the outer method
                    for (Object element : json.getJsonArray("results")) {
                        Person person = new Person();
                        person.name = ((JsonObject) element).getString("name");
                        person.persist();
                        persons.add(person);
                    }
                    return persons;
                }, managedExecutor);
    }

ThreadContext または ManagedExecutor を使用することで、ほとんどの有用な関数型と CompletionStage をラップしてコンテキストを伝播させることができます。

注入された ManagedExecutor は、Quarkus のスレッドプールを使用しています。

どのコンテキストが伝搬されるかをオーバーライドする

デフォルトでは、利用可能なすべてのコンテキストが伝搬されます。しかし、この動作をいくつかの方法で上書きすることができます。

設定の使用

以下の設定プロパティでは、伝搬されるコンテキストのデフォルトセットを指定できます。

設定キー 説明 デフォルト値

mp.context.ThreadContext.propagated

伝播されたコンテキストのコンマで区切られたセット

Remaining (すべての非明示的にリスト化されたコンテキスト)

mp.context.ThreadContext.cleared

クリアされたコンテキストのコンマで区切りのセット

None(コンテキストなし)。ただし、伝播されたセットとクリアされたセットのどちらにも Remaining が含まれていない場合は、デフォルトで Remaining (明示されていないすべてのコンテキスト)となります

mp.context.ThreadContext.unchanged

変更されていないコンテキストのコンマで区切られたセット

None (コンテキストなし)

以下のコンテキストは、Quarkusで設定不要で利用可能か、エクステンションを追加するかによって使用できます:

Context Name ネーム定数 説明

None

ThreadContext.NONE

コンテキストの空のセットを指定するために使用できますが、値を空に設定しても機能します。

Remaining

ThreadContext.ALL_REMAINING

他のセットに明示されていないすべてのコンテキスト

Transaction

ThreadContext.TRANSACTION

JTAトランザクションコンテキスト

CDI

ThreadContext.CDI

CDI(ArC)のコンテキスト

Servlet

N/A

サーブレットコンテキスト

JAX-RS

N/A

RESTEasy Classic コンテキスト

Application

ThreadContext.APPLICATION

現在の ThreadContextClassLoader

伝播されたコンテキストをアノテーションで上書き

Mutinyが使用しているような自動コンテキスト伝搬を、特定のメソッドでオーバーライドするためには @CurrentThreadContext アノテーションを使用します:

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;

    @GET
    @Path("/prices")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    // Get rid of all context propagation, since we don't need it here
    @CurrentThreadContext(unchanged = ThreadContext.ALL_REMAINING)
    public Publisher<Double> prices() {
        // get the next three prices from the price stream
        return Multi.createFrom().publisher(prices)
                .select().first(3);
    }

CDIインジェクションを用いた伝搬されたコンテキストのオーバーライド

また、インジェクションポイントに @ThreadContextConfigアノテーションを使うことでカスタムメイドの ThreadContext を注入することもできます:

    // Get the prices stream
    @Inject
    @Channel("prices") Publisher<Double> prices;
    // Get a ThreadContext that doesn't propagate context
    @Inject
    @ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
    SmallRyeThreadContext threadContext;

    @GET
    @Path("/prices")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Publisher<Double> prices() {
        // Get rid of all context propagation, since we don't need it here
        try(CleanAutoCloseable ac = SmallRyeThreadContext.withThreadContext(threadContext)){
            // get the next three prices from the price stream
            return Multi.createFrom().publisher(prices)
                    .select().first(3);
        }
    }

同様に、 @ManagedExecutorConfig アノテーションを使用して設定された ManagedExecutor のインスタンスをインジェクトする類似の方法があります:

    // Custom ManagedExecutor with different async limit, queue and no propagation
    @Inject
    @ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
    ManagedExecutor configuredCustomExecutor;

ManagedExecutorとThreadContextの構成されたCDIインスタンスの共有

If you need to inject the same ManagedExecutor or ThreadContext into several places and share its capacity, you can name the instance with @NamedInstance annotation. @NamedInstance is a CDI qualifier and all injections of the same type and name will therefore share the same underlying instance. If you also need to customize your instance, you can do so using @ManagedExecutorConfig/ThreadContextConfig annotation on one of its injection points:

    // Custom configured ManagedExecutor with name
    @Inject
    @ManagedExecutorConfig(maxAsync = 2, maxQueued = 3, cleared = ThreadContext.ALL_REMAINING)
    @NamedInstance("myExecutor")
    ManagedExecutor sharedConfiguredExecutor;

    // Since this executor has the same name, it will be the same instance as above
    @Inject
    @NamedInstance("myExecutor")
    ManagedExecutor sameExecutor;

    // Custom ThreadContext with a name
    @Inject
    @ThreadContextConfig(unchanged = ThreadContext.ALL_REMAINING)
    @NamedInstance("myContext")
    ThreadContext sharedConfiguredThreadContext;

    // Given equal value of @NamedInstance, this ThreadContext will be the same as the above one
    @Inject
    @NamedInstance("myContext")
    ThreadContext sameContext;

CDIのためのコンテキスト伝搬

CDI の観点からは、 @RequestScoped, @ApplicationScoped, @Singleton Beanは伝播され、他のスレッドで利用可能です。 @Dependent BeanやカスタムスコープされたBeanは、CDI コンテキスト伝播を介して自動的に伝播されることはありません。

@ApplicationScoped@Singleton のBeanは常にアクティブなスコープであり、そのため対処が簡単です - コンテキスト伝播タスクは、CDI コンテナーが動作している限り、これらのBeanで動作します。しかし、 @RequestScoped Beanは話が違います。手動で有効化/無効化すると、HTTP リクエストや他のリクエスト/タスクにバインドされます。この場合、元のスレッドがリクエストの終了に到達すると、コンテキストを終了し、それらのBeanで @PreDestroy を呼び出し、コンテキストからクリアされることに注意しなければなりません。その後、他のスレッドからこれらのBeanにアクセスしようとすると、予期せぬ動作をすることがあります。したがって、コンテキスト伝播を介してリクエストスコープされたBeanを使用するすべてのタスクは、元のリクエストの持続時間を超えないような方法で実行されることを確認することが推奨されます。

上記で説明した動作のため、CDI で Context Propagation を使用する際には @PreDestroy@RequestScoped Beanで使用しないことをお勧めします。