The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Using WebSockets with Reactive Messaging

This guide demonstrates how your Quarkus application can utilize SmallRye Reactive Messaging to consume and produce messages via WebSockets. WebSockets support is a part of the Reactive Messaging HTTP extension (quarkus-reactive-messaging-http).

前提条件

このガイドを完成させるには、以下が必要です:

  • less than 15 minutes

  • IDE

  • JDK 11+ がインストールされ、 JAVA_HOME が適切に設定されていること

  • Apache Maven 3.8.1+

  • ネイティブモードで実行したい場合は、GraalVM、Docker、Podman がインストールされていること

アーキテクチャ

In this guide we will implement a service, namely CostConverter that consumes messages with costs in multiple currencies and converts each cost to its value in Euro.

ユーザーが簡単にサービスを試すことができるように、コストを合計したHTTPリソース( CostCollector )と、新しいコストを追加して合計を見るためのシンプルなWebページを実装します。

アーキテクチャー

ソリューション

次のセクションで紹介する手順に沿って、ステップを踏んでアプリを作成することをお勧めします。ただし、完成した例にそのまま進んでも構いません。

Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.gitアーカイブ をダウンロードします。

The solution is located in the reactive-messaging-websockets-quickstart directory.

Creating the Maven Project

まず、新しいプロジェクトが必要です。以下のコマンドで新規プロジェクトを作成します。

mvn io.quarkus:quarkus-maven-plugin:2.11.1.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=reactive-messaging-websockets-quickstart \
    -Dextensions="reactive-messaging-http,resteasy-reactive-jackson" \
    -DnoExamples
cd reactive-messaging-websockets-quickstart

This command generates a Maven project with Reactive Messaging HTTP connector and RESTEasy Reactive with Jackson support extensions.

コンバーター

Create the src/main/java/org/acme/reactivews/CostConverter.java file, with the following content:

package org.acme.reactivews;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;

/**
 * A bean consuming costs in multiple currencies and producing prices in EUR from them
 */
@ApplicationScoped
public class CostConverter {

    private static final Map<String, Double> conversionRatios = new HashMap<>();

    static {
        conversionRatios.put("CHF", 0.93);
        conversionRatios.put("USD", 0.84);
        conversionRatios.put("PLN", 0.22);
        conversionRatios.put("EUR", 1.0);
    }

    @Incoming("incoming-costs") (1)
    @Outgoing("outgoing-costs") (2)
    Cost convert(Cost cost) { (3)
        Double conversionRatio = conversionRatios.get(cost.getCurrency().toUpperCase());
        if (conversionRatio == null) {
            return cost;
        }
        return new Cost(conversionRatio * cost.getValue(), "EUR");
    }
}
1 incoming-costs ストリームからメッセージを消費します。
2 返された値を outgoing-costs ストリームにディスパッチします。
3 Consume an event with payload of type Cost and produce another Cost, with value converted to Euro if we know the conversion ratio. In the case of consuming an arbitrary object, the reactive-messaging-http extension will attempt to deserialize the request body as a JSON object.
Unlike the outbound HTTP connector, the outbound WebSocket connector does not check if the message is consumed by the remote endpoint. It may happen that a failure to receive a message is not reported, e.g. if the remote side closes the WebSocket connection in a crucial moment.

Cost クラスを定義してみましょう。

package org.acme.reactivews;

public class Cost {
    private double value;
    private String currency;

    public Cost(double value, String currency) {
        this.value = value;
        this.currency = currency;
    }

    public Cost() {
    }

    public void setValue(double value) {
        this.value = value;
    }

    public void setCurrency(String currency) {
        this.currency = currency;
    }

    public double getValue() {
        return value;
    }

    public String getCurrency() {
        return currency;
    }
}

次のステップでは、 application.properties ファイルに両方のストリームの設定を作成します。

Configuring the Web Socket connector

We need to configure the Web Socket connector. This is done in the application.properties file. The keys are structured as follows:

mp.messaging.[outgoing|incoming].{channel-name}.{property}=value

channel-name セグメントは、 @Incoming および @Outgoing アノテーションで設定された値と一致する必要があります。

  • incoming-costs → an inbound WebSocket that receives costs

  • outgoing-costs → an outbound WebSocket that pushes converted costs

mp.messaging.outgoing.outgoing-costs.connector=quarkus-websocket

# the WebSockets are exposed on the same port as HTTP
# for non-test profiles, it is quarkus.http.port...
mp.messaging.outgoing.outgoing-costs.url=ws://localhost:${quarkus.http.port}/cost-collector

# for the test profile it is quarkus.http.test-port
%test.mp.messaging.outgoing.outgoing-costs.url=ws://localhost:${quarkus.http.test-port}/cost-collector


mp.messaging.incoming.incoming-costs.connector=quarkus-websocket
# the incoming-costs channel will be fed via a Web Socket endpoint on the `/costs` path
mp.messaging.incoming.incoming-costs.path=/costs

mp.messaging.incoming.collector.path=/cost-collector
mp.messaging.incoming.collector.connector=quarkus-websocket

コストコレクター

To illustrate that converting messages and passing them through works, let’s add a CostCollector class that consumes the Web Socket messages and exposes information about the sum of collected costs via REST:

package org.acme.reactivews;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;
import javax.ws.rs.GET;
import javax.ws.rs.Path;

@Path("/collected-costs")
@ApplicationScoped
public class CostCollector {
    private double sum;

    @GET
    // expose the sum of the collected costs
    public synchronized double getCosts() {
        return sum;
    }

    @Incoming("collector")
    // consume costs from collector channel
    synchronized void collect(Cost cost) {
        if ("EUR".equals(cost.getCurrency())) {
            sum += cost.getValue();
        }
    }
}

One more thing we have to do is to configure the collector channel in application.properties:

mp.messaging.incoming.collector.path=/cost-collector
mp.messaging.incoming.collector.connector=quarkus-websocket

The HTML page

アプリケーションと便利にやりとりするために、簡単なWebページを作成してみましょう。

このページでは、コストを追加するためのフォームと、現在のコストの合計の情報を提供します。ページは定期的に /cost-collector から現在の合計を要求することによって合計を更新します。

Change the src/main/resources/META-INF/resources/index.html file, with the following content:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Costs</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">

    <h2>Add a cost</h2>
    <div>
        <div>
            <label for="value">Value</label>
            <input type="text" id="value">
        </div>
        <div>
            <label for="currency">Currency</label>
            <select id="currency">
                <option value="CHF">Swiss franc</option>
                <option value="USD">United States dollar</option>
                <option value="PLN">Polish złoty</option>
                <option value="EUR">Euro</option>
            </select>
        </div>
        <input type="button" onclick="add()" value="Add">
    </div>


    <h2>Summary</h2>
    <div class="row">
        <p class="col-md-12">The total cost is <strong><span id="content">N/A</span>&nbsp;&euro;</strong>.</p>
    </div>
</div>
</body>
<script>
    var webSocket = new WebSocket(`ws://${document.location.host}/costs`);
    add = function() {
        const cost = {
            value: document.getElementById('value').value,
            currency: document.getElementById('currency').value
        };

        webSocket.send(JSON.stringify(cost));
    }

    updateCost = function() {
        fetch('collected-costs').then(response => response.text()).then(sum =>
            document.getElementById('content').textContent=sum
        );
    }

    window.setInterval(updateCost, 500);
</script>
</html>

実行

以下でアプリケーションを実行します。

./mvnw quarkus:dev

ブラウザで http://localhost:8080/index.html を開いてください。

ネイティブ実行

以下ででネイティブ実行可能ファイルをビルドすることができます。

./mvnw package -Pnative

さらに詳しく

WebSockets

All quarkus-websocket connector options:

# OUTGOING

# The target URL
mp.messaging.outgoing.<channelName>.url=ws://localhost:8234/

# Message serializer, optional, implementation of `io.quarkus.reactivemessaging.http.runtime.serializers.Serializer`
mp.messaging.outgoing.<channelName>.serializer=com.example.MySerializer

# The number of retries to make for sending a message to a remote websocket endpoint.
# A value greater than 0 is advised. Otherwise, a web socket timeout can result in a dropped message
# The default value is 1
mp.messaging.outgoing.<channelName>.maxRetries=1

# Configures the random factor when using back-off with maxAttempts > 1, 0.5 by default
mp.messaging.outgoing.<channelName>.jitter=0.7

# Configures a back-off delay between attempts to send a request.
# A random factor (jitter) is applied to increase the delay when several failures happen.
mp.messaging.outgoing.<channelName>.delay=2s


# INCOMING

# The path of the endpoint
mp.messaging.incoming.<channelName>.path=/my-ws-endpoint

# Web socket endpoint buffers messages if a consumer is not able to keep up.
# This setting specifies the size of the buffer. 8 by default
mp.messaging.incoming.<channelName>.buffer-size=3

リアクティブメッセージング

このエクステンションは、MicroProfile Reactive Messaging を利用してデータストリーミングアプリケーションを構築します。

さらに詳しく知りたい場合は、Quarkusで使用されている実装である SmallRye Reactive Messaging のドキュメントを確認してください。