The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Using Apache Kafka Streams

このガイドでは、QuarkusアプリケーションがApache Kafka Streams APIを利用して、Apache Kafkaベースのストリーム処理アプリケーションを実装する方法を説明します。

前提条件

このガイドを完成させるには、以下が必要です:

  • ざっと 30 minutes

  • IDE

  • JDK 11+ がインストールされ、 JAVA_HOME が適切に設定されていること

  • Apache Maven 3.8.1+

  • Docker and Docker Compose or Podman, and Docker Compose

  • 使用したい場合、 Quarkus CLI

  • ネイティブ実行可能ファイルをビルドしたい場合、MandrelまたはGraalVM(あるいはネイティブなコンテナビルドを使用する場合はDocker)をインストールし、 適切に設定していること

事前に Kafka quickstart を読んでおくことをお勧めします。

Kafka Streams用のQuarkusエクステンションを使用すると、Quarkus Dev Modeをサポートすることで、開発期間を非常に短縮することができます(例: ./mvnw compile quarkus:dev を参照)。Kafka Streamsトポロジーのコードを変更した後、次の入力メッセージが到着すると、アプリケーションが自動的にリロードされます。

推奨される開発セットアップは、処理されたトピックに対して一定の間隔 (たとえば毎秒) でテストメッセージを作成するプロデューサを用意し、 kafkacat のようなツールを使用してストリーミング アプリケーションの出力トピックを観察することです。開発モードを使用すると、保存時にストリーミングアプリケーションの最新バージョンによって生成された出力トピッ ク上のメッセージを即座に見ることができます。

最高の開発環境を実現するために、以下の設定を Kafka ブローカーに適用することをお勧めします。

group.min.session.timeout.ms=250

また、以下の設定をQuarkusの `application.properties`で指定します。

kafka-streams.consumer.session.timeout.ms=250
kafka-streams.consumer.heartbeat.interval.ms=200

これらの設定を併用することで、アプリケーションを開発モードで再起動した後に、非常に迅速にブローカに再接続できるようになります。

アーキテクチャ

このガイドでは、(ランダムな)温度値を 1 つのコンポーネント ( generator ) で生成します。これらの値は、与えられた気象観測所に関連付けられ、Kafka トピック ( temperature-values ) に書き込まれます。別のトピック ( weather-stations ) には、気象観測所自体に関するマスターデータ (id と名前) だけが格納されています。

2 つ目のコンポーネント ( aggregator ) は、2 つの Kafka トピックから読み込み、ストリーミングパイプラインで処理します。

  • ウェザーステーション ID では、この2つのトピックが結合されています。

  • 各気象台ごとに最低、最高、平均気温が決定されます。

  • この集約されたデータは、第三のトピック ( temperatures-aggregated ) に書き出されます。

出力トピックを検査することで、データを調べることができます。Kafka Streams の 対話型クエリ を公開することで、各気象観測所の最新の結果を単純な REST クエリで取得することができます。

全体的なアーキテクチャはこんな感じです。

Architecture,

ソリューション

次のセクションで紹介する手順に沿って、ステップを踏んでアプリを作成することをお勧めします。ただし、完成した例にそのまま進んでも構いません。

Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.gitアーカイブ をダウンロードします。

ソリューションは kafka-streams-quickstart ディレクトリ にあります。

Producer Maven プロジェクトの作成

まず、温度値プロデューサを持つ新しいプロジェクトが必要です。以下のコマンドで新規プロジェクトを作成します。

CLI
quarkus create app org.acme:kafka-streams-quickstart-producer \
    --extension=kafka \
    --no-code
mv kafka-streams-quickstart-producer producer

Gradleプロジェクトを作成するには、 --gradle または --gradle-kotlin-dsl オプションを追加します。

Quarkus CLIのインストール方法については、Quarkus CLIガイドをご参照ください。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:2.11.1.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-streams-quickstart-producer \
    -Dextensions="kafka" \
    -DnoCode
mv kafka-streams-quickstart-producer producer

Gradleプロジェクトを作成するには、 -DbuildTool=gradle または -DbuildTool=gradle-kotlin-dsl オプションを追加します。

このコマンドは、Reactive Messaging と Kafka コネクタエクステンションをインポートして Maven プロジェクトを生成します。

すでにQuarkusプロジェクトが設定されている場合は、プロジェクトのベースディレクトリーで以下のコマンドを実行することで、プロジェクトに smallrye-reactive-messaging-kafka エクステンションを追加することができます。

CLI
quarkus extension add 'quarkus-smallrye-reactive-messaging-kafka'
Maven
./mvnw quarkus:add-extension -Dextensions="quarkus-smallrye-reactive-messaging-kafka"
Gradle
./gradlew addExtension --extensions="quarkus-smallrye-reactive-messaging-kafka"

This will add the following to your build file:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-smallrye-reactive-messaging-kafka")

温度値プロデューサー

以下の内容の producer/src/main/java/org/acme/kafka/streams/producer/generator/ValuesGenerator.java ファイルを作成します。

package org.acme.kafka.streams.producer.generator;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;

import javax.enterprise.context.ApplicationScoped;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;

/**
 * A bean producing random temperature data every second.
 * The values are written to a Kafka topic (temperature-values).
 * Another topic contains the name of weather stations (weather-stations).
 * The Kafka configuration is specified in the application configuration.
 */
@ApplicationScoped
public class ValuesGenerator {

    private static final Logger LOG = Logger.getLogger(ValuesGenerator.class);

    private Random random = new Random();

    private List<WeatherStation> stations = List.of(
                        new WeatherStation(1, "Hamburg", 13),
                        new WeatherStation(2, "Snowdonia", 5),
                        new WeatherStation(3, "Boston", 11),
                        new WeatherStation(4, "Tokio", 16),
                        new WeatherStation(5, "Cusco", 12),
                        new WeatherStation(6, "Svalbard", -7),
                        new WeatherStation(7, "Porthsmouth", 11),
                        new WeatherStation(8, "Oslo", 7),
                        new WeatherStation(9, "Marrakesh", 20));

    @Outgoing("temperature-values")                                        (1)
    public Multi<Record<Integer, String>> generate() {
        return Multi.createFrom().ticks().every(Duration.ofMillis(500))    (2)
                .onOverflow().drop()
                .map(tick -> {
                    WeatherStation station = stations.get(random.nextInt(stations.size()));
                    double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
                            .setScale(1, RoundingMode.HALF_UP)
                            .doubleValue();

                    LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
                    return Record.of(station.id, Instant.now() + ";" + temperature);
                });
    }

    @Outgoing("weather-stations")                                          (3)
    public Multi<Record<Integer, String>> weatherStations() {
        return Multi.createFrom().items(stations.stream()
            .map(s -> Record.of(
                    s.id,
                    "{ \"id\" : " + s.id +
                    ", \"name\" : \"" + s.name + "\" }"))
        );
    }

    private static class WeatherStation {

        int id;
        String name;
        int averageTemperature;

        public WeatherStation(int id, String name, int averageTemperature) {
            this.id = id;
            this.name = name;
            this.averageTemperature = averageTemperature;
        }
    }
}
1 返却された Multi から temperature-values にアイテムを発送するように Reactive Messaging に指示します。
2 このメソッドは、0.5 秒ごとにランダムな温度値を放出する Mutiny ストリーム ( Multi ) を返します。
3 返された Multi (気象観測所の静的リスト) から weather-stations にアイテムをディスパッチするように、Reactive Messaging に指示します。

この 2 つのメソッドは、それぞれ temperature-valuesweather-stations という名前のストリームにアイテムが送信される リアクティブストリーム を返します。

トピック構成

2つのチャンネルは、Quarkus設定ファイル application.properties を使用してKafkaトピックにマッピングされます。そのためには、ファイル producer/src/main/resources/application.properties に次のように追加します。

# Configure the Kafka broker location
kafka.bootstrap.servers=localhost:9092

mp.messaging.outgoing.temperature-values.connector=smallrye-kafka
mp.messaging.outgoing.temperature-values.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.temperature-values.value.serializer=org.apache.kafka.common.serialization.StringSerializer

mp.messaging.outgoing.weather-stations.connector=smallrye-kafka
mp.messaging.outgoing.weather-stations.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.weather-stations.value.serializer=org.apache.kafka.common.serialization.StringSerializer

これは、Kafka ブートストラップサーバー、2 つのトピック、および対応する (デ)シリアライザを設定します。さまざまな設定オプションの詳細については、Kafka ドキュメントの Producer 設定Consumer 設定 のセクションを参照してください。

アグリゲータMavenプロジェクトの作成

プロデューサアプリケーションを用意したら、Kafka Streams パイプラインを実行するアグリゲータアプリケーションを実装しましょう。このように別のプロジェクトを作成します。

CLI
quarkus create app org.acme:kafka-streams-quickstart-aggregator \
    --extension=kafka-streams,resteasy-reactive-jackson \
    --no-code
mv kafka-streams-quickstart-aggregator aggregator

Gradleプロジェクトを作成するには、 --gradle または --gradle-kotlin-dsl オプションを追加します。

Quarkus CLIのインストール方法については、Quarkus CLIガイドをご参照ください。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:2.11.1.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-streams-quickstart-aggregator \
    -Dextensions="kafka-streams,resteasy-reactive-jackson" \
    -DnoCode
mv kafka-streams-quickstart-aggregator aggregator

Gradleプロジェクトを作成するには、 -DbuildTool=gradle または -DbuildTool=gradle-kotlin-dsl オプションを追加します。

This creates the aggregator project with the Quarkus extension for Kafka Streams and with the Jackson support for RESTEasy Reactive.

すでにQuarkusプロジェクトが設定されている場合は、プロジェクトのベースディレクトリーで以下のコマンドを実行することで、プロジェクトに kafka-streams エクステンションを追加することができます。

CLI
quarkus extension add 'kafka-streams'
Maven
./mvnw quarkus:add-extension -Dextensions="kafka-streams"
Gradle
./gradlew addExtension --extensions="kafka-streams"

これにより、 pom.xml に以下が追加されます:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-kafka-streams</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-kafka-streams")

パイプラインの実装

ストリーム処理アプリケーションの実装を開始しましょう。温度測定、気象観測所を表現し、集約された値を追跡するためのいくつかの値オブジェクトを作成することから始めましょう。

まず、次の内容でファイル aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/WeatherStation.java を作成します。

package org.acme.kafka.streams.aggregator.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection (1)
public class WeatherStation {

    public int id;
    public String name;
}
1 @RegisterForReflection アノテーションは、ネイティブコンパイル時にクラスとそのメンバーを保持するようQuarkusに指示します。 @RegisterForReflection アノテーションの詳細については、 ネイティブアプリケーションのヒントのページを参照してください。

次に、指定されたステーションの温度測定値を表すファイル aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/TemperatureMeasurement.java です。

package org.acme.kafka.streams.aggregator.model;

import java.time.Instant;

public class TemperatureMeasurement {

    public int stationId;
    public String stationName;
    public Instant timestamp;
    public double value;

    public TemperatureMeasurement(int stationId, String stationName, Instant timestamp,
            double value) {
        this.stationId = stationId;
        this.stationName = stationName;
        this.timestamp = timestamp;
        this.value = value;
    }
}

そして最後に aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/Aggregation.java 、イベントがストリーミング・パイプラインで処理されている間、集約された値を追跡するために使用されます。

package org.acme.kafka.streams.aggregator.model;

import java.math.BigDecimal;
import java.math.RoundingMode;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Aggregation {

    public int stationId;
    public String stationName;
    public double min = Double.MAX_VALUE;
    public double max = Double.MIN_VALUE;
    public int count;
    public double sum;
    public double avg;

    public Aggregation updateFrom(TemperatureMeasurement measurement) {
        stationId = measurement.stationId;
        stationName = measurement.stationName;

        count++;
        sum += measurement.value;
        avg = BigDecimal.valueOf(sum / count)
                .setScale(1, RoundingMode.HALF_UP).doubleValue();

        min = Math.min(min, measurement.value);
        max = Math.max(max, measurement.value);

        return this;
    }
}

次に、実際のストリーミングクエリの実装自体を aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/TopologyProducer.java ファイルで作成してみましょう。そのために必要なのは、Kafka Streams Topology を返す CDI プロデューサメソッドを宣言することだけです。

package org.acme.kafka.streams.aggregator.streams;

import java.time.Instant;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;

import org.acme.kafka.streams.aggregator.model.Aggregation;
import org.acme.kafka.streams.aggregator.model.TemperatureMeasurement;
import org.acme.kafka.streams.aggregator.model.WeatherStation;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;

import io.quarkus.kafka.client.serialization.ObjectMapperSerde;

@ApplicationScoped
public class TopologyProducer {

    static final String WEATHER_STATIONS_STORE = "weather-stations-store";

    private static final String WEATHER_STATIONS_TOPIC = "weather-stations";
    private static final String TEMPERATURE_VALUES_TOPIC = "temperature-values";
    private static final String TEMPERATURES_AGGREGATED_TOPIC = "temperatures-aggregated";

    @Produces
    public Topology buildTopology() {
        StreamsBuilder builder = new StreamsBuilder();

        ObjectMapperSerde<WeatherStation> weatherStationSerde = new ObjectMapperSerde<>(
                WeatherStation.class);
        ObjectMapperSerde<Aggregation> aggregationSerde = new ObjectMapperSerde<>(Aggregation.class);

        KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                WEATHER_STATIONS_STORE);

        GlobalKTable<Integer, WeatherStation> stations = builder.globalTable( (1)
                WEATHER_STATIONS_TOPIC,
                Consumed.with(Serdes.Integer(), weatherStationSerde));

        builder.stream(                                                       (2)
                        TEMPERATURE_VALUES_TOPIC,
                        Consumed.with(Serdes.Integer(), Serdes.String())
                )
                .join(                                                        (3)
                        stations,
                        (stationId, timestampAndValue) -> stationId,
                        (timestampAndValue, station) -> {
                            String[] parts = timestampAndValue.split(";");
                            return new TemperatureMeasurement(station.id, station.name,
                                    Instant.parse(parts[0]), Double.valueOf(parts[1]));
                        }
                )
                .groupByKey()                                                 (4)
                .aggregate(                                                   (5)
                        Aggregation::new,
                        (stationId, value, aggregation) -> aggregation.updateFrom(value),
                        Materialized.<Integer, Aggregation> as(storeSupplier)
                            .withKeySerde(Serdes.Integer())
                            .withValueSerde(aggregationSerde)
                )
                .toStream()
                .to(                                                          (6)
                        TEMPERATURES_AGGREGATED_TOPIC,
                        Produced.with(Serdes.Integer(), aggregationSerde)
                );

        return builder.build();
    }
}
1 weather-stations テーブルは、各気象台の現在の状態を表す GlobalKTable に読み込まれます。
2 temperature-values トピックは KStream に読み込まれます。このトピックに新しいメッセージが到着するたびに、パイプラインはこの測定のために処理されます。
3 temperature-values トピックからのメッセージは、トピックのキー (ウェザーステーション ID) を使用して、対応するウェザーステーションと結合されます。
4 値はメッセージキー(ウェザーステーションID)によってグループ化されます。
5 各グループ内では、最小値と最大値を追跡し、そのステーションのすべての測定値の平均値を計算することで、そのステーションのすべての測定値が集約されます( Aggregation タイプを参照)。
6 パイプラインの結果は temperatures-aggregated トピックに書き出しています。

Kafka Streams エクステンションは、Quarkusの設定ファイル application.properties で設定します。ファイル aggregator/src/main/resources/application.properties を以下の内容で作成します。

quarkus.kafka-streams.bootstrap-servers=localhost:9092
quarkus.kafka-streams.application-server=${hostname}:8080
quarkus.kafka-streams.topics=weather-stations,temperature-values

# pass-through options
kafka-streams.cache.max.bytes.buffering=10240
kafka-streams.commit.interval.ms=1000
kafka-streams.metadata.max.age.ms=500
kafka-streams.auto.offset.reset=earliest
kafka-streams.metrics.recording.level=DEBUG

quarkus.kafka-streams bootstrap-servers と は、それぞれ Kafka Streams プロパティー と にマップされます。 application-server bootstrap.servers application.server topics は Quarkus に固有のもので、アプリケーションは Kafka Streams エンジンを起動する前に、指定したすべてのトピックが存在するのを待ちます。これは、アプリケーションの起動時にまだ存在しないトピックの作成をグレースフルに待つために行われます。

あるいは、上記の ジェネレーター プロジェクトで行ったように、 quarkus.kafka-streams.bootstrap-servers の代わりに kafka.bootstrap.servers を使用することもできます。

kafka-streams ネームスペース内のすべてのプロパティーは、そのまま Kafka Streams エンジンに渡されます。プロパティーの値を変更するには、アプリケーションの再構築が必要です。

アプリケーションのビルドと実行

produceraggregator のアプリケーションをビルドできるようになりました。

./mvnw clean package -f producer/pom.xml
./mvnw clean package -f aggregator/pom.xml

Quarkusの開発モードを使ってホストマシン上で直接実行するのではなく、コンテナーイメージにパッケージ化してDocker Compose経由で起動します。これは、後で aggregator のアグリゲーションを複数のノードにスケーリングすることを実証するために行います。

Quarkusがデフォルトで作成した Dockerfile は、Kafka Streamsパイプラインを実行するために、 aggregator アプリケーションに1つの調整が必要です。そのためには、 aggregator/src/main/docker/Dockerfile.jvm ファイルを編集して、 FROM fabric8/java-alpine-openjdk8-jre の行を FROM fabric8/java-centos-openjdk8-jdk に置き換えます。

次に、2 つのアプリケーションを起動するための Docker Compose ファイル ( docker-compose.yaml ) を作成し、Apache Kafka と ZooKeeper と同様に以下のようにします。

version: '3.5'

services:
  zookeeper:
    image: strimzi/kafka:0.19.0-kafka-2.5.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs
    networks:
      - kafkastreams-network
  kafka:
    image: strimzi/kafka:0.19.0-kafka-2.5.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT} --override num.partitions=$${KAFKA_NUM_PARTITIONS}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_NUM_PARTITIONS: 3
    networks:
      - kafkastreams-network

  producer:
    image: quarkus-quickstarts/kafka-streams-producer:1.0
    build:
      context: producer
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    networks:
      - kafkastreams-network

  aggregator:
    image: quarkus-quickstarts/kafka-streams-aggregator:1.0
    build:
      context: aggregator
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS: kafka:9092
    networks:
      - kafkastreams-network

networks:
  kafkastreams-network:
    name: ks

produceraggregator のコンテナーイメージをビルドして、すべてのコンテナーを起動するには、 docker-compose up --build を実行します。

QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS の代わりに、 KAFKA_BOOTSTRAP_SERVERS を使うこともできます。

producer アプリケーションから、"temperature-values" トピックに送信されたメッセージに関するログステートメントが表示されるはずです。

ここで debezium/tooling イメージのインスタンスを実行し、他のすべてのコンテナーが実行しているのと同じネットワークにアタッチします。このイメージは、 kafkacathttpie などの便利なツールを提供しています。

docker run --tty --rm -i --network ks debezium/tooling:1.1

ツールコンテナー内で、 kafkacatを 実行して、ストリーミングパイプラインの結果を調べます。

kafkacat -b kafka:9092 -C -o beginning -q -t temperatures-aggregated

{"avg":34.7,"count":4,"max":49.4,"min":16.8,"stationId":9,"stationName":"Marrakesh","sum":138.8}
{"avg":15.7,"count":1,"max":15.7,"min":15.7,"stationId":2,"stationName":"Snowdonia","sum":15.7}
{"avg":12.8,"count":7,"max":25.5,"min":-13.8,"stationId":7,"stationName":"Porthsmouth","sum":89.7}
...

プロデューサが温度測定値を出力し続けると、新しい値が表示され、送信トピックの各値は、表現された気象観測所の最小、最大、および平均温度値を表示します。

インタラクティブクエリ

temperatures-aggregated のトピックを購読することは、新しい気温の値に反応するための素晴らしい方法です。しかし、特定の気象観測所の最新の集計値だけに興味があるのであれば、少しもったいないです。そこで、Kafka Streams の対話型クエリが威力を発揮します。ステートストアをクエリするシンプルな REST エンドポイントを公開することで、Kafka トピックを購読しなくても最新の集計結果を取得することができます。

まず、ファイル aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/InteractiveQueries.java 内に InteractiveQueries を作成することから始めましょう。

KafkaStreamsPipeline クラスに、与えられたキーの現在の状態を取得するメソッドをもう一つ追加しました。

package org.acme.kafka.streams.aggregator.streams;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.acme.kafka.streams.aggregator.model.Aggregation;
import org.acme.kafka.streams.aggregator.model.WeatherStationData;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

@ApplicationScoped
public class InteractiveQueries {

    @Inject
    KafkaStreams streams;

    public GetWeatherStationDataResult getWeatherStationData(int id) {
        Aggregation result = getWeatherStationStore().get(id);

        if (result != null) {
            return GetWeatherStationDataResult.found(WeatherStationData.from(result)); (1)
        }
        else {
            return GetWeatherStationDataResult.notFound();                             (2)
        }
    }

    private ReadOnlyKeyValueStore<Integer, Aggregation> getWeatherStationStore() {
        while (true) {
            try {
                return streams.store(TopologyProducer.WEATHER_STATIONS_STORE, QueryableStoreTypes.keyValueStore());
            } catch (InvalidStateStoreException e) {
                // ignore, store not ready yet
            }
        }
    }
}
1 指定されたステーションIDの値が見つかったので、その値が返されます。
2 存在しないステーションがクエリされたか、指定されたステーションに測定がまだ存在しないため、値が見つかりませんでした。

また、メソッドの戻り値の型もファイル aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/GetWeatherStationDataResult.java に作成します。

package org.acme.kafka.streams.aggregator.streams;

import java.util.Optional;
import java.util.OptionalInt;

import org.acme.kafka.streams.aggregator.model.WeatherStationData;

public class GetWeatherStationDataResult {

    private static GetWeatherStationDataResult NOT_FOUND =
            new GetWeatherStationDataResult(null);

    private final WeatherStationData result;

    private GetWeatherStationDataResult(WeatherStationData result) {
        this.result = result;
    }

    public static GetWeatherStationDataResult found(WeatherStationData data) {
        return new GetWeatherStationDataResult(data);
    }

    public static GetWeatherStationDataResult notFound() {
        return NOT_FOUND;
    }

    public Optional<WeatherStationData> getResult() {
        return Optional.ofNullable(result);
    }
}

また、気象台の実際の集計結果を表す aggregator/src/main/java/org/acme/kafka/streams/aggregator/model/WeatherStationData.java を作成します。

package org.acme.kafka.streams.aggregator.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class WeatherStationData {

    public int stationId;
    public String stationName;
    public double min = Double.MAX_VALUE;
    public double max = Double.MIN_VALUE;
    public int count;
    public double avg;

    private WeatherStationData(int stationId, String stationName, double min, double max,
            int count, double avg) {
        this.stationId = stationId;
        this.stationName = stationName;
        this.min = min;
        this.max = max;
        this.count = count;
        this.avg = avg;
    }

    public static WeatherStationData from(Aggregation aggregation) {
        return new WeatherStationData(
                aggregation.stationId,
                aggregation.stationName,
                aggregation.min,
                aggregation.max,
                aggregation.count,
                aggregation.avg);
    }
}

これで、 getWeatherStationData() を呼び出してクライアントにデータを返すシンプルな REST エンドポイント ( aggregator/src/main/java/org/acme/kafka/streams/aggregator/rest/WeatherStationEndpoint.java ) を追加することができます。

package org.acme.kafka.streams.aggregator.rest;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import org.acme.kafka.streams.aggregator.streams.GetWeatherStationDataResult;
import org.acme.kafka.streams.aggregator.streams.KafkaStreamsPipeline;

@ApplicationScoped
@Path("/weather-stations")
public class WeatherStationEndpoint {

    @Inject
    InteractiveQueries interactiveQueries;

    @GET
    @Path("/data/{id}")
    public Response getWeatherStationData(int id) {
        GetWeatherStationDataResult result = interactiveQueries.getWeatherStationData(id);

        if (result.getResult().isPresent()) {  (1)
            return Response.ok(result.getResult().get()).build();
        }
        else {
            return Response.status(Status.NOT_FOUND.getStatusCode(),
                    "No data found for weather station " + id).build();
        }
    }
}
1 値が取得されたかどうかに応じて、その値を返すか、404 レスポンスを返すかのどちらかを選択します。

このコードを用意して、Docker Composeでアプリケーションと aggregator サービスをリビルドしましょう。

./mvnw clean package -f aggregator/pom.xml
docker-compose stop aggregator
docker-compose up --build -d

これにより、 aggregator コンテナーが再構築され、サービスが再起動されます。これが完了したら、サービスの REST API を呼び出して、既存のステーションの 1 つの温度データを取得することができます。そのためには、前に起動したツーリングコンテナで httpie を使用します。

http aggregator:8080/weather-stations/data/1

HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 85
Content-Type: application/json
Date: Tue, 18 Jun 2019 19:29:16 GMT

{
    "avg": 12.9,
    "count": 146,
    "max": 41.0,
    "min": -25.6,
    "stationId": 1,
    "stationName": "Hamburg"
}

スケールアウト

Kafka Streams の非常に興味深い特性は、それらがスケールアウト可能であるということです。つまり、同じパイプラインを実行している複数のアプリケーションインスタンス間で負荷や状態を分散させることができます。各ノードには集約結果のサブセットが含まれますが、Kafka Streams は与えられたキーをホストしているノードの情報を取得するための API を提供しています。アプリケーションは、他のインスタンスから直接データを取得するか、クライアントにその他のノードの場所を指定するだけです。

aggregator アプリケーションの複数のインスタンスを起動すると、全体のアーキテクチャがこのようになります。

アーキテクチャ

InteractiveQueries クラスは、この分散型アーキテクチャ用に少し調整する必要があります。

public GetWeatherStationDataResult getWeatherStationData(int id) {
    StreamsMetadata metadata = streams.metadataForKey(                  (1)
            TopologyProducer.WEATHER_STATIONS_STORE,
            id,
            Serdes.Integer().serializer()
    );

    if (metadata == null || metadata == StreamsMetadata.NOT_AVAILABLE) {
        LOG.warn("Found no metadata for key {}", id);
        return GetWeatherStationDataResult.notFound();
    }
    else if (metadata.host().equals(host)) {                            (2)
        LOG.info("Found data for key {} locally", id);
        Aggregation result = getWeatherStationStore().get(id);

        if (result != null) {
            return GetWeatherStationDataResult.found(WeatherStationData.from(result));
        }
        else {
            return GetWeatherStationDataResult.notFound();
        }
    }
    else {                                                              (3)
        LOG.info(
            "Found data for key {} on remote host {}:{}",
            id,
            metadata.host(),
            metadata.port()
        );
        return GetWeatherStationDataResult.foundRemotely(metadata.host(), metadata.port());
    }
}

public List<PipelineMetadata> getMetaData() {                           (4)
    return streams.allMetadataForStore(TopologyProducer.WEATHER_STATIONS_STORE)
            .stream()
            .map(m -> new PipelineMetadata(
                    m.hostInfo().host() + ":" + m.hostInfo().port(),
                    m.topicPartitions()
                        .stream()
                        .map(TopicPartition::toString)
                        .collect(Collectors.toSet()))
            )
            .collect(Collectors.toList());
}
1 与えられたウェザーステーションIDのストリームメタデータが取得されます。
2 与えられたキー(ウェザーステーションID)はローカルのアプリケーションノードによって管理されています。
3 与えられたキーは別のアプリケーションノードによって管理されています; この場合、そのノードに関する情報(ホストとポート)が返されます。
4 getMetaData() メソッドが追加され、アプリケーション・クラスター内の全ノードのリストを呼び出し元に提供するようになりました。

GetWeatherStationDataResult のタイプは、それに合わせて調整する必要があります。

package org.acme.kafka.streams.aggregator.streams;

import java.util.Optional;
import java.util.OptionalInt;

import org.acme.kafka.streams.aggregator.model.WeatherStationData;

public class GetWeatherStationDataResult {

    private static GetWeatherStationDataResult NOT_FOUND =
            new GetWeatherStationDataResult(null, null, null);

    private final WeatherStationData result;
    private final String host;
    private final Integer port;

    private GetWeatherStationDataResult(WeatherStationData result, String host,
            Integer port) {
        this.result = result;
        this.host = host;
        this.port = port;
    }

    public static GetWeatherStationDataResult found(WeatherStationData data) {
        return new GetWeatherStationDataResult(data, null, null);
    }

    public static GetWeatherStationDataResult foundRemotely(String host, int port) {
        return new GetWeatherStationDataResult(null, host, port);
    }

    public static GetWeatherStationDataResult notFound() {
        return NOT_FOUND;
    }

    public Optional<WeatherStationData> getResult() {
        return Optional.ofNullable(result);
    }

    public Optional<String> getHost() {
        return Optional.ofNullable(host);
    }

    public OptionalInt getPort() {
        return port != null ? OptionalInt.of(port) : OptionalInt.empty();
    }
}

Also, the return type for getMetaData() must be defined (aggregator/src/main/java/org/acme/kafka/streams/aggregator/streams/PipelineMetadata.java):

package org.acme.kafka.streams.aggregator.streams;

import java.util.Set;

public class PipelineMetadata {

    public String host;
    public Set<String> partitions;

    public PipelineMetadata(String host, Set<String> partitions) {
        this.host = host;
        this.partitions = partitions;
    }
}

最後に、RESTエンドポイントクラスを更新する必要があります。

package org.acme.kafka.streams.aggregator.rest;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import org.acme.kafka.streams.aggregator.streams.GetWeatherStationDataResult;
import org.acme.kafka.streams.aggregator.streams.KafkaStreamsPipeline;
import org.acme.kafka.streams.aggregator.streams.PipelineMetadata;

@ApplicationScoped
@Path("/weather-stations")
public class WeatherStationEndpoint {

    @Inject
    InteractiveQueries interactiveQueries;

    @GET
    @Path("/data/{id}")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response getWeatherStationData(int id) {
        GetWeatherStationDataResult result = interactiveQueries.getWeatherStationData(id);

        if (result.getResult().isPresent()) {                     (1)
            return Response.ok(result.getResult().get()).build();
        }
        else if (result.getHost().isPresent()) {                  (2)
            URI otherUri = getOtherUri(result.getHost().get(), result.getPort().getAsInt(),
                    id);
            return Response.seeOther(otherUri).build();
        }
        else {                                                    (3)
            return Response.status(Status.NOT_FOUND.getStatusCode(),
                    "No data found for weather station " + id).build();
        }
    }

    @GET
    @Path("/meta-data")
    @Produces(MediaType.APPLICATION_JSON)
    public List<PipelineMetadata> getMetaData() {                 (4)
        return interactiveQueries.getMetaData();
    }

    private URI getOtherUri(String host, int port, int id) {
        try {
            return new URI("http://" + host + ":" + port + "/weather-stations/data/" + id);
        }
        catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }
}
1 データはローカルで見つかったので、それを返す。
2 データは他のノードで管理されているので、指定されたキーのデータが他のノードに保存されている場合は、リダイレクト(HTTPステータスコード303)で返信する。
3 指定されたウェザーステーションIDに対するデータが見つからなかった。
4 アプリケーションクラスターを形成しているすべてのホストの情報を表示する。

ここで再び aggregator サービスを停止してリビルドします。そして、3つのインスタンスを起動してみましょう。

./mvnw clean package -f aggregator/pom.xml
docker-compose stop aggregator
docker-compose up --build -d --scale aggregator=3

3つのインスタンスのいずれかでREST APIを呼び出す場合、要求されたウェザーステーションIDの集約は、クエリを受信したノードにローカルに格納されるか、他の2つのノードのいずれかに格納されるかのどちらかであるかもしれません。

Docker Composeのロードバランサーがラウンドロビン方式で aggregator サービスにリクエストを配信するので、実際のノードを直接呼び出すことにします。アプリケーションはREST経由ですべてのホスト名の情報を公開しています。

http aggregator:8080/weather-stations/meta-data

HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 202
Content-Type: application/json
Date: Tue, 18 Jun 2019 20:00:23 GMT

[
    {
        "host": "2af13fe516a9:8080",
        "partitions": [
            "temperature-values-2"
        ]
    },
    {
        "host": "32cc8309611b:8080",
        "partitions": [
            "temperature-values-1"
        ]
    },
    {
        "host": "1eb39af8d587:8080",
        "partitions": [
            "temperature-values-0"
        ]
    }
]

レスポンスに表示されている 3 つのホストのうちの 1 つからデータを取得します (実際のホスト名は異なります)。

http 2af13fe516a9:8080/weather-stations/data/1

そのノードがキー「1」のデータを保持している場合は、このようなレスポンスが得られます。

HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 74
Content-Type: application/json
Date: Tue, 11 Jun 2019 19:16:31 GMT

{
  "avg": 11.9,
  "count": 259,
  "max": 50.0,
  "min": -30.1,
  "stationId": 1,
  "stationName": "Hamburg"
}

そうでない場合、サービスはリダイレクトを送信します。

HTTP/1.1 303 See Other
Connection: keep-alive
Content-Length: 0
Date: Tue, 18 Jun 2019 20:01:03 GMT
Location: http://1eb39af8d587:8080/weather-stations/data/1

また、 --follow option を渡すことで httpie が自動的にリダイレクトに従うようにすることもできます。

http --follow 2af13fe516a9:8080/weather-stations/data/1

ネイティブ実行

Kafka Streams用のQuarkusエクステンションを使用すると、GraalVMを介してストリーム処理アプリケーションをネイティブ実行することができます。

To run both the producer and aggregator applications in native mode, the Maven builds can be executed using -Dnative:

./mvnw clean package -f producer/pom.xml -Dnative -Dnative-image.container-runtime=docker
./mvnw clean package -f aggregator/pom.xml -Dnative -Dnative-image.container-runtime=docker

ここで、 QUARKUS_MODE という名前の環境変数を作成し、値を"native"に設定します。

export QUARKUS_MODE=native

これは、 produceraggregator のイメージをビルドする際に正しい Dockerfile を使用するために Docker Compose ファイルで使用されます。Kafka Streams アプリケーションは、ネイティブモードでは 50 MB 未満の RSS で動作します。そのためには、 aggregator/src/main/docker/Dockerfile.native のプログラム呼び出しに Xmx オプションを追加します。

CMD ["./application", "-Dquarkus.http.host=0.0.0.0", "-Xmx32m"]

ここで、上記のようにDocker Composeを起動します(コンテナーイメージのリビルドを忘れずに)。

Kafka Streams のヘルスチェック

quarkus-smallrye-health のエクステンションを使用している場合は、 quarkus-kafka-streams が自動的に以下を追加します。

  • quarkus.kafka-streams.topics プロパティーで宣言されたすべてのトピックが作成されているかどうかを検証するための Readiness ヘルスチェック

  • Kafka Streams の状態に基づく Liveness ヘルスチェック

そのため、アプリケーションの /q/health エンドポイントにアクセスすると、Kafka Streams の状態や、利用可能なトピックや不足しているトピックについての情報を得ることができます。

これは、ステータスが DOWN になった場合の例です。

curl -i http://aggregator:8080/q/health

HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 454

{
    "status": "DOWN",
    "checks": [
        {
            "name": "Kafka Streams state health check",  (1)
            "status": "DOWN",
            "data": {
                "state": "CREATED"
            }
        },
        {
            "name": "Kafka Streams topics health check",  (2)
            "status": "DOWN",
            "data": {
                "available_topics": "weather-stations,temperature-values",
                "missing_topics": "hygrometry-values"
            }
        }
    ]
}
1 Liveness ヘルスチェック。 /q/health/live エンドポイントでも利用可能。
2 Rediness ヘルスチェック。 /q/health/ready エンドポイントでも利用可能。

そのため、ご覧のように quarkus.kafka-streams.topics のいずれかが欠けているか、Kafka Streams の stateRUNNING でないとすぐにステータスが DOWN になります。

トピックがない場合、 Kafka Streams topics health check .の data フィールドに available_topics キーは表示されません。また、トピックがない場合は、 Kafka Streams topics health checkdata フィールドに missing_topics キーは表示されません。

もちろん、 quarkus-kafka-streams エクステンションのヘルスチェックを無効にすることもできます。application.properties の中で quarkus.kafka-streams.health.enabledfalse にしてください。

言うまでもなく、それぞれのエンドポイント /q/health/live/q/health/ready に対して自前の Liveness および Rediness プローブを作成することもできます。

Liveness ヘルスチェック

Liveness チェックの一例をご紹介します。

curl -i http://aggregator:8080/q/health/live

HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 225

{
    "status": "DOWN",
    "checks": [
        {
            "name": "Kafka Streams state health check",
            "status": "DOWN",
            "data": {
                "state": "CREATED"
            }
        }
    ]
}

stateKafkaStreams.State Enum から来ています。

Rediness ヘルスチェック

ここでは、Rediness チェックの一例をご紹介します。

curl -i http://aggregator:8080/q/health/ready

HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 265

{
    "status": "DOWN",
    "checks": [
        {
            "name": "Kafka Streams topics health check",
            "status": "DOWN",
            "data": {
                "missing_topics": "weather-stations,temperature-values"
            }
        }
    ]
}

さらに詳しく

This guide has shown how you can build stream processing applications using Quarkus and the Kafka Streams APIs, both in JVM and native modes. For running your KStreams application in production, you could also add health checks and metrics for the data pipeline. Refer to the Quarkus guides on Micrometer, SmallRye Metrics, and SmallRye Health to learn more.

設定リファレンス

ビルド時に固定される設定プロパティ - それ以外の設定プロパティは実行時に上書き可能

Configuration property

タイプ

デフォルト

Whether a health check is published in case the smallrye-health extension is present (defaults to true).

Environment variable: QUARKUS_KAFKA_STREAMS_HEALTH_ENABLED

boolean

true

A unique identifier for this Kafka Streams application. If not set, defaults to quarkus.application.name.

Environment variable: QUARKUS_KAFKA_STREAMS_APPLICATION_ID

string

${quarkus.application.name}

A comma-separated list of host:port pairs identifying the Kafka bootstrap server(s). If not set, fallback to kafka.bootstrap.servers, and if not set either use localhost:9012.

Environment variable: QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS

list of host:port

localhost:9012

A unique identifier of this application instance, typically in the form host:port.

Environment variable: QUARKUS_KAFKA_STREAMS_APPLICATION_SERVER

string

A comma-separated list of topic names. The pipeline will only be started once all these topics are present in the Kafka cluster and ignore.topics is set to false.

Environment variable: QUARKUS_KAFKA_STREAMS_TOPICS

list of string

Timeout to wait for topic names to be returned from admin client. If set to 0 (or negative), topics check is ignored.

Environment variable: QUARKUS_KAFKA_STREAMS_TOPICS_TIMEOUT

Duration

10S

The schema registry key. Different schema registry libraries expect a registry URL in different configuration properties. For Apicurio Registry, use apicurio.registry.url. For Confluent schema registry, use schema.registry.url.

Environment variable: QUARKUS_KAFKA_STREAMS_SCHEMA_REGISTRY_KEY

string

schema.registry.url

The schema registry URL.

Environment variable: QUARKUS_KAFKA_STREAMS_SCHEMA_REGISTRY_URL

string

Environment variable: QUARKUS_KAFKA_STREAMS_SECURITY_PROTOCOL

string

SASL mechanism used for client connections

Environment variable: QUARKUS_KAFKA_STREAMS_SASL_MECHANISM

string

JAAS login context parameters for SASL connections in the format used by JAAS configuration files

Environment variable: QUARKUS_KAFKA_STREAMS_SASL_JAAS_CONFIG

string

The fully qualified name of a SASL client callback handler class

Environment variable: QUARKUS_KAFKA_STREAMS_SASL_CLIENT_CALLBACK_HANDLER_CLASS

string

The fully qualified name of a SASL login callback handler class

Environment variable: QUARKUS_KAFKA_STREAMS_SASL_LOGIN_CALLBACK_HANDLER_CLASS

string

The fully qualified name of a class that implements the Login interface

Environment variable: QUARKUS_KAFKA_STREAMS_SASL_LOGIN_CLASS

string

The Kerberos principal name that Kafka runs as

Environment variable: QUARKUS_KAFKA_STREAMS_SASL_KERBEROS_SERVICE_NAME

string

Kerberos kinit command path

Environment variable: QUARKUS_KAFKA_STREAMS_SASL_KERBEROS_KINIT_CMD

string

Login thread will sleep until the specified window factor of time from last refresh

Environment variable: QUARKUS_KAFKA_STREAMS_SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR

double

Percentage of random jitter added to the renewal time

Environment variable: QUARKUS_KAFKA_STREAMS_SASL_KERBEROS_TICKET_RENEW_JITTER

double

Percentage of random jitter added to the renewal time

Environment variable: QUARKUS_KAFKA_STREAMS_SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN

long

Login refresh thread will sleep until the specified window factor relative to the credential’s lifetime has been reached-

Environment variable: QUARKUS_KAFKA_STREAMS_SASL_LOGIN_REFRESH_WINDOW_FACTOR

double

The maximum amount of random jitter relative to the credential’s lifetime

Environment variable: QUARKUS_KAFKA_STREAMS_SASL_LOGIN_REFRESH_WINDOW_JITTER

double

The desired minimum duration for the login refresh thread to wait before refreshing a credential

Environment variable: QUARKUS_KAFKA_STREAMS_SASL_LOGIN_REFRESH_MIN_PERIOD

Duration

The amount of buffer duration before credential expiration to maintain when refreshing a credential

Environment variable: QUARKUS_KAFKA_STREAMS_SASL_LOGIN_REFRESH_BUFFER

Duration

The SSL protocol used to generate the SSLContext

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_PROTOCOL

string

The name of the security provider used for SSL connections

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_PROVIDER

string

A list of cipher suites

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_CIPHER_SUITES

string

The list of protocols enabled for SSL connections

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_ENABLED_PROTOCOLS

string

Trust store type

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_TRUSTSTORE_TYPE

string

Trust store location

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_TRUSTSTORE_LOCATION

string

Trust store password

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_TRUSTSTORE_PASSWORD

string

Trust store certificates

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_TRUSTSTORE_CERTIFICATES

string

Key store type

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_KEYSTORE_TYPE

string

Key store location

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_KEYSTORE_LOCATION

string

Key store password

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_KEYSTORE_PASSWORD

string

Key store private key

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_KEYSTORE_KEY

string

Key store certificate chain

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_KEYSTORE_CERTIFICATE_CHAIN

string

Password of the private key in the key store

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_KEY_PASSWORD

string

The algorithm used by key manager factory for SSL connections

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_KEYMANAGER_ALGORITHM

string

The algorithm used by trust manager factory for SSL connections

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_TRUSTMANAGER_ALGORITHM

string

The endpoint identification algorithm to validate server hostname using server certificate

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM

string

https

The SecureRandom PRNG implementation to use for SSL cryptography operations

Environment variable: QUARKUS_KAFKA_STREAMS_SSL_SECURE_RANDOM_IMPLEMENTATION

string

期間フォーマットについて

期間のフォーマットは標準の java.time.Duration フォーマットを使用します。詳細は Duration#parse() javadoc を参照してください。

数値で始まる期間の値を指定することもできます。この場合、値が数値のみで構成されている場合、コンバーターは値を秒として扱います。そうでない場合は、 PT が暗黙的に値の前に付加され、標準の java.time.Duration 形式が得られます。