Using WebSockets with Reactive Messaging
This guide demonstrates how your Quarkus application can utilize SmallRye Reactive Messaging to consume and produce messages via WebSockets. WebSockets support is a part of the Reactive Messaging HTTP extension (quarkus-reactive-messaging-http
).
前提条件
このガイドを完成させるには、以下が必要です:
-
less than 15 minutes
-
IDE
-
JDK 11+ がインストールされ、
JAVA_HOME
が適切に設定されていること -
Apache Maven 3.8.1+
-
ネイティブモードで実行したい場合は、GraalVM、Docker、Podman がインストールされていること
アーキテクチャ
In this guide we will implement a service, namely CostConverter
that consumes messages with costs in multiple currencies and converts each cost to its value in Euro.
ユーザーが簡単にサービスを試すことができるように、コストを合計したHTTPリソース( CostCollector
)と、新しいコストを追加して合計を見るためのシンプルなWebページを実装します。

ソリューション
次のセクションで紹介する手順に沿って、ステップを踏んでアプリを作成することをお勧めします。ただし、完成した例にそのまま進んでも構いません。
Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.git
、 アーカイブ をダウンロードします。
The solution is located in the reactive-messaging-websockets-quickstart
directory.
Creating the Maven Project
まず、新しいプロジェクトが必要です。以下のコマンドで新規プロジェクトを作成します。
mvn io.quarkus:quarkus-maven-plugin:2.11.1.Final:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=reactive-messaging-websockets-quickstart \
-Dextensions="reactive-messaging-http,resteasy-reactive-jackson" \
-DnoExamples
cd reactive-messaging-websockets-quickstart
This command generates a Maven project with Reactive Messaging HTTP connector and RESTEasy Reactive with Jackson support extensions.
コンバーター
Create the src/main/java/org/acme/reactivews/CostConverter.java
file, with the following content:
package org.acme.reactivews;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;
/**
* A bean consuming costs in multiple currencies and producing prices in EUR from them
*/
@ApplicationScoped
public class CostConverter {
private static final Map<String, Double> conversionRatios = new HashMap<>();
static {
conversionRatios.put("CHF", 0.93);
conversionRatios.put("USD", 0.84);
conversionRatios.put("PLN", 0.22);
conversionRatios.put("EUR", 1.0);
}
@Incoming("incoming-costs") (1)
@Outgoing("outgoing-costs") (2)
Cost convert(Cost cost) { (3)
Double conversionRatio = conversionRatios.get(cost.getCurrency().toUpperCase());
if (conversionRatio == null) {
return cost;
}
return new Cost(conversionRatio * cost.getValue(), "EUR");
}
}
1 | incoming-costs ストリームからメッセージを消費します。 |
2 | 返された値を outgoing-costs ストリームにディスパッチします。 |
3 | Consume an event with payload of type Cost and produce another Cost , with value converted to Euro if we know the conversion ratio. In the case of consuming an arbitrary object, the reactive-messaging-http extension will attempt to deserialize the request body as a JSON object. |
Unlike the outbound HTTP connector, the outbound WebSocket connector does not check if the message is consumed by the remote endpoint. It may happen that a failure to receive a message is not reported, e.g. if the remote side closes the WebSocket connection in a crucial moment. |
Cost
クラスを定義してみましょう。
package org.acme.reactivews;
public class Cost {
private double value;
private String currency;
public Cost(double value, String currency) {
this.value = value;
this.currency = currency;
}
public Cost() {
}
public void setValue(double value) {
this.value = value;
}
public void setCurrency(String currency) {
this.currency = currency;
}
public double getValue() {
return value;
}
public String getCurrency() {
return currency;
}
}
次のステップでは、 application.properties
ファイルに両方のストリームの設定を作成します。
Configuring the Web Socket connector
We need to configure the Web Socket connector. This is done in the application.properties
file. The keys are structured as follows:
mp.messaging.[outgoing|incoming].{channel-name}.{property}=value
channel-name
セグメントは、 @Incoming
および @Outgoing
アノテーションで設定された値と一致する必要があります。
-
incoming-costs
→ an inbound WebSocket that receives costs -
outgoing-costs
→ an outbound WebSocket that pushes converted costs
mp.messaging.outgoing.outgoing-costs.connector=quarkus-websocket
# the WebSockets are exposed on the same port as HTTP
# for non-test profiles, it is quarkus.http.port...
mp.messaging.outgoing.outgoing-costs.url=ws://localhost:${quarkus.http.port}/cost-collector
# for the test profile it is quarkus.http.test-port
%test.mp.messaging.outgoing.outgoing-costs.url=ws://localhost:${quarkus.http.test-port}/cost-collector
mp.messaging.incoming.incoming-costs.connector=quarkus-websocket
# the incoming-costs channel will be fed via a Web Socket endpoint on the `/costs` path
mp.messaging.incoming.incoming-costs.path=/costs
mp.messaging.incoming.collector.path=/cost-collector
mp.messaging.incoming.collector.connector=quarkus-websocket
コストコレクター
To illustrate that converting messages and passing them through works, let’s add a CostCollector
class that consumes the Web Socket messages and exposes information about the sum of collected costs via REST:
package org.acme.reactivews;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@Path("/collected-costs")
@ApplicationScoped
public class CostCollector {
private double sum;
@GET
// expose the sum of the collected costs
public synchronized double getCosts() {
return sum;
}
@Incoming("collector")
// consume costs from collector channel
synchronized void collect(Cost cost) {
if ("EUR".equals(cost.getCurrency())) {
sum += cost.getValue();
}
}
}
One more thing we have to do is to configure the collector
channel in application.properties
:
mp.messaging.incoming.collector.path=/cost-collector
mp.messaging.incoming.collector.connector=quarkus-websocket
The HTML page
アプリケーションと便利にやりとりするために、簡単なWebページを作成してみましょう。
このページでは、コストを追加するためのフォームと、現在のコストの合計の情報を提供します。ページは定期的に /cost-collector
から現在の合計を要求することによって合計を更新します。
Change the src/main/resources/META-INF/resources/index.html
file, with the following content:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Costs</title>
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
<h2>Add a cost</h2>
<div>
<div>
<label for="value">Value</label>
<input type="text" id="value">
</div>
<div>
<label for="currency">Currency</label>
<select id="currency">
<option value="CHF">Swiss franc</option>
<option value="USD">United States dollar</option>
<option value="PLN">Polish złoty</option>
<option value="EUR">Euro</option>
</select>
</div>
<input type="button" onclick="add()" value="Add">
</div>
<h2>Summary</h2>
<div class="row">
<p class="col-md-12">The total cost is <strong><span id="content">N/A</span> €</strong>.</p>
</div>
</div>
</body>
<script>
var webSocket = new WebSocket(`ws://${document.location.host}/costs`);
add = function() {
const cost = {
value: document.getElementById('value').value,
currency: document.getElementById('currency').value
};
webSocket.send(JSON.stringify(cost));
}
updateCost = function() {
fetch('collected-costs').then(response => response.text()).then(sum =>
document.getElementById('content').textContent=sum
);
}
window.setInterval(updateCost, 500);
</script>
</html>
さらに詳しく
WebSockets
All quarkus-websocket
connector options:
# OUTGOING
# The target URL
mp.messaging.outgoing.<channelName>.url=ws://localhost:8234/
# Message serializer, optional, implementation of `io.quarkus.reactivemessaging.http.runtime.serializers.Serializer`
mp.messaging.outgoing.<channelName>.serializer=com.example.MySerializer
# The number of retries to make for sending a message to a remote websocket endpoint.
# A value greater than 0 is advised. Otherwise, a web socket timeout can result in a dropped message
# The default value is 1
mp.messaging.outgoing.<channelName>.maxRetries=1
# Configures the random factor when using back-off with maxAttempts > 1, 0.5 by default
mp.messaging.outgoing.<channelName>.jitter=0.7
# Configures a back-off delay between attempts to send a request.
# A random factor (jitter) is applied to increase the delay when several failures happen.
mp.messaging.outgoing.<channelName>.delay=2s
# INCOMING
# The path of the endpoint
mp.messaging.incoming.<channelName>.path=/my-ws-endpoint
# Web socket endpoint buffers messages if a consumer is not able to keep up.
# This setting specifies the size of the buffer. 8 by default
mp.messaging.incoming.<channelName>.buffer-size=3
リアクティブメッセージング
このエクステンションは、MicroProfile Reactive Messaging を利用してデータストリーミングアプリケーションを構築します。
さらに詳しく知りたい場合は、Quarkusで使用されている実装である SmallRye Reactive Messaging のドキュメントを確認してください。