Using HTTP with Reactive Messaging
このガイドでは、Quarkus アプリケーションが MicroProfile Reactive Messaging を使用して HTTP メッセージを消費および生成する方法を説明します。
前提条件
このガイドを完成させるには、以下が必要です:
-
15分未満
-
IDE
-
JDK 11+ がインストールされ、
JAVA_HOME
が適切に設定されていること -
Apache Maven 3.8.1+
-
ネイティブモードで実行したい場合は、GraalVM、Docker、Podman がインストールされていること
アーキテクチャ
このガイドでは、複数の通貨のコストを持つ HTTP メッセージを消費し、各コストをユーロの値に変換するサービス、すなわち CostConverter
を実装します。
ユーザーが簡単にサービスを試すことができるように、コストを合計したHTTPリソース( CostCollector
)と、新しいコストを追加して合計を見るためのシンプルなWebページを実装します。
ソリューション
次のセクションで紹介する手順に沿って、ステップを踏んでアプリを作成することをお勧めします。ただし、完成した例にそのまま進んでも構いません。
Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.git
、 アーカイブ をダウンロードします。
ソリューションは reactive-messaging-http-quickstart
ディレクトリ にあります。
Mavenプロジェクトの作成
まず、新しいプロジェクトが必要です。以下のコマンドで新規プロジェクトを作成します。
mvn io.quarkus.platform:quarkus-maven-plugin:2.11.1.Final:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=reactive-messaging-http-quickstart \
-Dextensions="reactive-messaging-http" \
-DnoExamples
cd reactive-messaging-http-quickstart
このコマンドは、Reactive MessagingとHTTPコネクタエクステンションをインポートしてMavenプロジェクトを生成します。
コンバーター
以下の内容の src/main/java/org/acme/reactivehttp/CostConverter.java
ファイルを作成します。
package org.acme.reactivehttp;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;
/**
* A bean consuming costs in multiple currencies and producing prices in EUR from them
*/
@ApplicationScoped
public class CostConverter {
private static final Logger log = Logger.getLogger(CostConverter.class);
private static final Map<String, Double> conversionRatios = new HashMap<>();
static {
conversionRatios.put("CHF", 0.93);
conversionRatios.put("USD", 0.84);
conversionRatios.put("PLN", 0.22);
conversionRatios.put("EUR", 1.);
}
@Incoming("incoming-costs") (1)
@Outgoing("outgoing-costs") (2)
double convert(Cost cost) { (3)
Double conversionRatio = conversionRatios.get(cost.getCurrency().toUpperCase());
if (conversionRatio == null) {
return 0.;
}
return conversionRatio * cost.getValue();
}
}
1 | incoming-costs ストリームからメッセージを消費します。 |
2 | 返された値を outgoing-costs ストリームにディスパッチします。 |
3 | Consume an event with payload of type Cost and produce a double . In the case of consuming an arbitrary object, the reactive-messaging-http extension will attempt to deserialize the request body as a JSON object. |
Cost
クラスを定義してみましょう。
package org.acme.reactivehttp;
public class Cost {
private double value;
private String currency;
public double getValue() {
return value;
}
public void setValue(double value) {
this.value = value;
}
public String getCurrency() {
return currency;
}
public void setCurrency(String currency) {
this.currency = currency;
}
}
次のステップでは、 application.properties
ファイルに両方のストリームの設定を作成します。
HTTPコネクタの設定
HTTP コネクタを設定する必要があります。これは application.properties
ファイルで行います。キーは以下のように構成されています。
mp.messaging.[outgoing|incoming].{channel-name}.{property}=value
channel-name
セグメントは、 @Incoming
および @Outgoing
アノテーションで設定された値と一致する必要があります。
-
incoming-costs
→ コストを受け取るソース -
outgoing-costs
→ 換算されたコストを受け取るシンク
mp.messaging.outgoing.outgoing-costs.connector=quarkus-http
# here we are using a URL pointing to an endpoint
# you can use e.g. an environment variable to change it
mp.messaging.outgoing.outgoing-costs.url=http://localhost:${quarkus.http.port}/cost-collector
# we need to use a different port for tests:
%test.mp.messaging.outgoing.outgoing-costs.url=http://localhost:${quarkus.http.test-port}/cost-collector
# POST is the default method. Another possibility is PUT
mp.messaging.outgoing.outgoing-costs.method=POST
mp.messaging.incoming.incoming-costs.connector=quarkus-http
# the incoming-costs channel will be fed via an endpoint on the `/costs` path
mp.messaging.incoming.incoming-costs.path=/costs
# POST is the default method. Another possibility is PUT
mp.messaging.incoming.incoming-costs.method=POST
コストコレクター
メッセージを変換して受け渡す様子を説明するために、送信コストを受け取って集計するエンドポイントを追加してみましょう。これは通常のJAX-RSエンドポイントです。
package org.acme.reactivehttp;
import javax.enterprise.context.ApplicationScoped;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@Path("/cost-collector")
@ApplicationScoped
public class CostCollector {
private double sum = 0;
@POST
public void consumeCost(String valueAsString) {
sum += Double.parseDouble(valueAsString);
}
@GET
public double getSum() {
return sum;
}
}
HTMLページ
アプリケーションと便利にやりとりするために、簡単なWebページを作成してみましょう。
このページでは、コストを追加するためのフォームと、現在のコストの合計の情報を提供します。ページは定期的に /cost-collector
から現在の合計を要求することによって合計を更新します。
以下の内容の src/main/resources/META-INF/resources/index.html
ファイルを作成します。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Costs</title>
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
<h2>Add a cost</h2>
<div>
<div>
<label for="value">Value</label>
<input type="text" id="value">
</div>
<div>
<label for="currency">Currency</label>
<select id="currency">
<option value="CHF">Swiss franc</option>
<option value="USD">United States dollar</option>
<option value="CHF">Polish złoty</option>
</select>
</div>
<input type="button" onclick="add()" value="Add">
</div>
<h2>Last cost</h2>
<div class="row">
<p class="col-md-12">The total cost is <strong><span id="content">N/A</span> €</strong>.</p>
</div>
</div>
</body>
<script>
add = function() {
var value = document.getElementById('value').value;
var currency = document.getElementById('currency').value;
var cost = {
value: document.getElementById('value').value,
currency: document.getElementById('currency').value
};
fetch('costs', { method: 'POST', body: JSON.stringify(cost) });
}
updateCost = function() {
fetch('cost-collector').then(response => response.text()).then(sum =>
document.getElementById('content').textContent=sum
);
}
window.setInterval(updateCost, 500);
</script>
</html>
さらに詳しく
HTTP コネクタのオプション
quarkus-http
コネクタの全オプション。
# OUTGOING
# The target URL
mp.messaging.outgoing.<channelName>.url=http://localhost:8213
# Message payload serializer, optional, implementation of `io.quarkus.reactivemessaging.http.runtime.serializers.Serializer`
mp.messaging.outgoing.<channelName>.serializer=com.example.MySerializer
# The number of attempts to make for sending a request to a remote endpoint. Must not be less than zero
# Zero by default
mp.messaging.outgoing.<channelName>.maxRetries=3
# Configures the random factor when using back-off with maxRetries > 0. 0.5 by default
mp.messaging.outgoing.<channelName>.jitter=0.3
# Configures a back-off delay between attempts to send a request.
# A random factor (jitter) is applied to increase the delay when several failures happen.
mp.messaging.outgoing.<channelName>.delay=1s
#The HTTP method (either `POST` or `PUT`), `POST` by default
mp.messaging.outgoing.<channelName>.method=PUT
#INCOMING
# The HTTP method (either `POST` or `PUT`, `POST` by default
mp.messaging.incoming.<channelName>.method=POST
# The path of the endpoint
mp.messaging.incoming.<channelName>.path=/my-reactive-ws-endpoint
# HTTP endpoint buffers messages if a consumer is not able to keep up. This setting specifies the size of the buffer.
# 8 by default.
mp.messaging.incoming.<channelName>.buffer-size=13
リアクティブメッセージング
このエクステンションは、MicroProfile Reactive Messaging を利用してデータストリーミングアプリケーションを構築します。
さらに詳しく知りたい場合は、Quarkusで使用されている実装である SmallRye Reactive Messaging のドキュメントを確認してください。