The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Using HTTP with Reactive Messaging

このガイドでは、Quarkus アプリケーションが MicroProfile Reactive Messaging を使用して HTTP メッセージを消費および生成する方法を説明します。

前提条件

このガイドを完成させるには、以下が必要です:

  • 15分未満

  • IDE

  • JDK 11+ がインストールされ、 JAVA_HOME が適切に設定されていること

  • Apache Maven 3.8.1+

  • ネイティブモードで実行したい場合は、GraalVM、Docker、Podman がインストールされていること

アーキテクチャ

このガイドでは、複数の通貨のコストを持つ HTTP メッセージを消費し、各コストをユーロの値に変換するサービス、すなわち CostConverter を実装します。

ユーザーが簡単にサービスを試すことができるように、コストを合計したHTTPリソース( CostCollector )と、新しいコストを追加して合計を見るためのシンプルなWebページを実装します。

ソリューション

次のセクションで紹介する手順に沿って、ステップを踏んでアプリを作成することをお勧めします。ただし、完成した例にそのまま進んでも構いません。

Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.gitアーカイブ をダウンロードします。

ソリューションは reactive-messaging-http-quickstart ディレクトリ にあります。

Mavenプロジェクトの作成

まず、新しいプロジェクトが必要です。以下のコマンドで新規プロジェクトを作成します。

mvn io.quarkus.platform:quarkus-maven-plugin:2.11.1.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=reactive-messaging-http-quickstart \
    -Dextensions="reactive-messaging-http" \
    -DnoExamples
cd reactive-messaging-http-quickstart

このコマンドは、Reactive MessagingとHTTPコネクタエクステンションをインポートしてMavenプロジェクトを生成します。

コンバーター

以下の内容の src/main/java/org/acme/reactivehttp/CostConverter.java ファイルを作成します。

package org.acme.reactivehttp;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;

/**
 * A bean consuming costs in multiple currencies and producing prices in EUR from them
 */
@ApplicationScoped
public class CostConverter {

    private static final Logger log = Logger.getLogger(CostConverter.class);

    private static final Map<String, Double> conversionRatios = new HashMap<>();

    static {
        conversionRatios.put("CHF", 0.93);
        conversionRatios.put("USD", 0.84);
        conversionRatios.put("PLN", 0.22);
        conversionRatios.put("EUR", 1.);
    }

    @Incoming("incoming-costs") (1)
    @Outgoing("outgoing-costs") (2)
    double convert(Cost cost) { (3)
        Double conversionRatio = conversionRatios.get(cost.getCurrency().toUpperCase());
        if (conversionRatio == null) {
            return 0.;
        }
        return conversionRatio * cost.getValue();
    }
}
1 incoming-costs ストリームからメッセージを消費します。
2 返された値を outgoing-costs ストリームにディスパッチします。
3 Consume an event with payload of type Cost and produce a double. In the case of consuming an arbitrary object, the reactive-messaging-http extension will attempt to deserialize the request body as a JSON object.

Cost クラスを定義してみましょう。

package org.acme.reactivehttp;

public class Cost {
    private double value;
    private String currency;

    public double getValue() {
        return value;
    }

    public void setValue(double value) {
        this.value = value;
    }

    public String getCurrency() {
        return currency;
    }

    public void setCurrency(String currency) {
        this.currency = currency;
    }
}

次のステップでは、 application.properties ファイルに両方のストリームの設定を作成します。

HTTPコネクタの設定

HTTP コネクタを設定する必要があります。これは application.properties ファイルで行います。キーは以下のように構成されています。

mp.messaging.[outgoing|incoming].{channel-name}.{property}=value

channel-name セグメントは、 @Incoming および @Outgoing アノテーションで設定された値と一致する必要があります。

  • incoming-costs → コストを受け取るソース

  • outgoing-costs → 換算されたコストを受け取るシンク

mp.messaging.outgoing.outgoing-costs.connector=quarkus-http

# here we are using a URL pointing to an endpoint
# you can use e.g. an environment variable to change it
mp.messaging.outgoing.outgoing-costs.url=http://localhost:${quarkus.http.port}/cost-collector

# we need to use a different port for tests:
%test.mp.messaging.outgoing.outgoing-costs.url=http://localhost:${quarkus.http.test-port}/cost-collector

# POST is the default method. Another possibility is PUT
mp.messaging.outgoing.outgoing-costs.method=POST


mp.messaging.incoming.incoming-costs.connector=quarkus-http

# the incoming-costs channel will be fed via an endpoint on the `/costs` path
mp.messaging.incoming.incoming-costs.path=/costs

# POST is the default method. Another possibility is PUT
mp.messaging.incoming.incoming-costs.method=POST

コストコレクター

メッセージを変換して受け渡す様子を説明するために、送信コストを受け取って集計するエンドポイントを追加してみましょう。これは通常のJAX-RSエンドポイントです。

package org.acme.reactivehttp;

import javax.enterprise.context.ApplicationScoped;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;

@Path("/cost-collector")
@ApplicationScoped
public class CostCollector {

    private double sum = 0;

    @POST
    public void consumeCost(String valueAsString) {
        sum += Double.parseDouble(valueAsString);
    }

    @GET
    public double getSum() {
        return sum;
    }

}

HTMLページ

アプリケーションと便利にやりとりするために、簡単なWebページを作成してみましょう。

このページでは、コストを追加するためのフォームと、現在のコストの合計の情報を提供します。ページは定期的に /cost-collector から現在の合計を要求することによって合計を更新します。

以下の内容の src/main/resources/META-INF/resources/index.html ファイルを作成します。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Costs</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">

    <h2>Add a cost</h2>
    <div>
        <div>
            <label for="value">Value</label>
            <input type="text" id="value">
        </div>
        <div>
            <label for="currency">Currency</label>
            <select id="currency">
                <option value="CHF">Swiss franc</option>
                <option value="USD">United States dollar</option>
                <option value="CHF">Polish złoty</option>
            </select>
        </div>
        <input type="button" onclick="add()" value="Add">
    </div>


    <h2>Last cost</h2>
    <div class="row">
        <p class="col-md-12">The total cost is <strong><span id="content">N/A</span>&nbsp;&euro;</strong>.</p>
    </div>
</div>
</body>
<script>
    add = function() {
        var value = document.getElementById('value').value;
        var currency = document.getElementById('currency').value;

        var cost = {
            value: document.getElementById('value').value,
            currency: document.getElementById('currency').value
        };

        fetch('costs', { method: 'POST', body: JSON.stringify(cost) });
    }

    updateCost = function() {
        fetch('cost-collector').then(response => response.text()).then(sum =>
            document.getElementById('content').textContent=sum
        );
    }

    window.setInterval(updateCost, 500);
</script>
</html>

実行

以下でアプリケーションを実行します。

./mvnw quarkus:dev

ブラウザで http://localhost:8080/index.html を開いてください。

ネイティブ実行

以下ででネイティブ実行可能ファイルをビルドすることができます。

./mvnw package -Pnative

さらに詳しく

HTTP コネクタのオプション

quarkus-http コネクタの全オプション。

# OUTGOING

# The target URL
mp.messaging.outgoing.<channelName>.url=http://localhost:8213

# Message payload serializer, optional, implementation of `io.quarkus.reactivemessaging.http.runtime.serializers.Serializer`
mp.messaging.outgoing.<channelName>.serializer=com.example.MySerializer

# The number of attempts to make for sending a request to a remote endpoint. Must not be less than zero
# Zero by default
mp.messaging.outgoing.<channelName>.maxRetries=3

# Configures the random factor when using back-off with maxRetries > 0. 0.5 by default
mp.messaging.outgoing.<channelName>.jitter=0.3

# Configures a back-off delay between attempts to send a request.
# A random factor (jitter) is applied to increase the delay when several failures happen.
mp.messaging.outgoing.<channelName>.delay=1s

#The HTTP method (either `POST` or `PUT`), `POST` by default
mp.messaging.outgoing.<channelName>.method=PUT

#INCOMING
# The HTTP method (either `POST` or `PUT`, `POST` by default
mp.messaging.incoming.<channelName>.method=POST

# The path of the endpoint
mp.messaging.incoming.<channelName>.path=/my-reactive-ws-endpoint

# HTTP endpoint buffers messages if a consumer is not able to keep up. This setting specifies the size of the buffer.
# 8 by default.
mp.messaging.incoming.<channelName>.buffer-size=13

リアクティブメッセージング

このエクステンションは、MicroProfile Reactive Messaging を利用してデータストリーミングアプリケーションを構築します。

さらに詳しく知りたい場合は、Quarkusで使用されている実装である SmallRye Reactive Messaging のドキュメントを確認してください。