The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Apache Kafka を使用した SmallRye Reactive Messaging の概要

このガイドでは、Quarkus アプリケーションが SmallRye Reactive Messaging を利用して Apache Kafka とやりとりする仕組みを説明します。

前提条件

このガイドを完成させるには、以下が必要です:

  • 約15分

  • IDE

  • JDK 11+ がインストールされ、 JAVA_HOME が適切に設定されていること

  • Apache Maven 3.8.1+

  • Docker and Docker Compose or Podman, and Docker Compose

  • 使用したい場合、 Quarkus CLI

  • ネイティブ実行可能ファイルをビルドしたい場合、MandrelまたはGraalVM(あるいはネイティブなコンテナビルドを使用する場合はDocker)をインストールし、 適切に設定していること

アーキテクチャ

このガイドでは、Kafka と通信する 2 つのアプリケーションを開発します。最初のアプリケーションは 見積りリクエスト を Kafka に送信し、見積り トピックからの Kafka メッセージを消費します。2 つ目のアプリケーションは 見積りリクエスト を受信し、見積り を送り返します。

アーキテクチャー

The first application, the producer, will let the user request some quotes over an HTTP endpoint. For each quote request a random identifier is generated and returned to the user, to mark the quote request as pending. At the same time, the generated request id is sent over a Kafka topic quote-requests.

プロデューサー

2 つ目のアプリケーションである processor は、quote-requests トピックから読み取り、見積りにランダムな価格を設定し、quotes という名前の Kafka トピックに送信します。

最後に、プロデューサー は見積りを読み取り、サーバーから送信されたイベントを使用してブラウザーに送信します。したがって、ユーザーには、見積り価格が 保留 から受信した価格にリアルタイムで更新されていることがわかります。

ソリューション

次の章で紹介する手順に沿って、ステップを踏んでアプリを作成することをお勧めします。ただし、完成した例にそのまま進んでも構いません。

Git リポジトリーのクローンを作成: git clonehttps://github.com/quarkusio/quarkus-quickstarts.git、または アーカイブ をダウンロードします。

ソリューションは kafka-quickstart ディレクトリー にあります。

Maven プロジェクトの作成

まず、プロデューサープロセッサー の 2 つのプロジェクトを作成する必要があります。

ターミナルで プロデューサー プロジェクトを作成するには、次のコマンドを実行します。

CLI
quarkus create app org.acme:kafka-quickstart-producer \
    --extension=resteasy-reactive-jackson,smallrye-reactive-messaging-kafka \
    --no-code

Gradleプロジェクトを作成するには、 --gradle または --gradle-kotlin-dsl オプションを追加します。

Quarkus CLIのインストール方法については、Quarkus CLIガイドをご参照ください。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:2.11.1.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-quickstart-producer \
    -Dextensions="resteasy-reactive-jackson,smallrye-reactive-messaging-kafka" \
    -DnoCode

Gradleプロジェクトを作成するには、 -DbuildTool=gradle または -DbuildTool=gradle-kotlin-dsl オプションを追加します。

このコマンドは、プロジェクト構造を作成し、使用する 2 つの Quarkus エクステンションを選択します。

  1. RESTEasy Reactive とその Jackson サポート (JSON を処理するため) による HTTP エンドポイントの提供。

  2. リアクティブメッセージング用の Kafka コネクター

同じディレクトリーから processor プロジェクトを作成するには、次のコマンドを実行します。

CLI
quarkus create app org.acme:kafka-quickstart-processor \
    --extension=smallrye-reactive-messaging-kafka \
    --no-code

Gradleプロジェクトを作成するには、 --gradle または --gradle-kotlin-dsl オプションを追加します。

Quarkus CLIのインストール方法については、Quarkus CLIガイドをご参照ください。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:2.11.1.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-quickstart-processor \
    -Dextensions="smallrye-reactive-messaging-kafka" \
    -DnoCode

Gradleプロジェクトを作成するには、 -DbuildTool=gradle または -DbuildTool=gradle-kotlin-dsl オプションを追加します。

その時点で、次の構造になっているはずです。

.
├── kafka-quickstart-processor
│  ├── README.md
│  ├── mvnw
│  ├── mvnw.cmd
│  ├── pom.xml
│  └── src
│     └── main
│        ├── docker
│        ├── java
│        └── resources
│           └── application.properties
└── kafka-quickstart-producer
   ├── README.md
   ├── mvnw
   ├── mvnw.cmd
   ├── pom.xml
   └── src
      └── main
         ├── docker
         ├── java
         └── resources
            └── application.properties

お気に入りの IDE で 2 つのプロジェクトを開きます。

Dev Services

開発モードを使用するとき、またはテストのために、Kafka ブローカーを起動する必要はありません。Quarkus が自動的にブローカーを開始します。詳細については、Dev Services for Kafka を参照してください。

見積りオブジェクト

Quote クラスは、プロデューサー プロジェクトと プロセッサー プロジェクトの両方で使用されます。簡単にするために、ここではクラスを複製します。どちらのプロジェクトでも、次の内容の src/main/java/org/acme/kafka/model/Quote.java ファイルを作成します。

package org.acme.kafka.model;

public class Quote {

    public String id;
    public int price;

    /**
    * Default constructor required for Jackson serializer
    */
    public Quote() { }

    public Quote(String id, int price) {
        this.id = id;
        this.price = price;
    }

    @Override
    public String toString() {
        return "Quote{" +
                "id='" + id + '\'' +
                ", price=" + price +
                '}';
    }
}

Quote オブジェクトの JSON 表現は、Kafka トピックに送信されるメッセージ、および Web ブラウザーに送信されるサーバー送信イベントで使用されます。

Quarkus には、JSON Kafka メッセージを処理する機能が組み込まれています。次のセクションでは、Jackson のシリアライザー/デシリアライザークラスを作成します。

見積りリクエストの送信

プロデューサー プロジェクト内に、src/main/java/org/acme/kafka/producer/QuotesResource.java ファイルを作成し、次のコンテンツを追加します。

package org.acme.kafka.producer;

import java.util.UUID;

import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.acme.kafka.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/quotes")
public class QuotesResource {

    @Channel("quote-requests")
    Emitter<String> quoteRequestEmitter; (1)

    /**
     * Endpoint to generate a new quote request id and send it to "quote-requests" Kafka topic using the emitter.
     */
    @POST
    @Path("/request")
    @Produces(MediaType.TEXT_PLAIN)
    public String createRequest() {
        UUID uuid = UUID.randomUUID();
        quoteRequestEmitter.send(uuid.toString()); (2)
        return uuid.toString(); (3)
    }
}
1 リアクティブメッセージングの Emitter を挿入して、quote-requests チャネルにメッセージを送信します。
2 ポストリクエストで、ランダムな UUID を生成し、エミッターを使用してそれを Kafka トピックに送信します。
3 同じ UUID をクライアントに返します。

quote-requests チャネルは、クラスパス上の唯一のコネクターであるため、Kafka トピックとして管理されます。特に明記されていない限り、この例のように、Quarkus はチャネル名をトピック名として使用します。したがって、この例では、アプリケーションは quote-requests トピックに書き込みます。Quarkus は、EmitterString 値を生成することを検出するため、シリアライザーも自動的に設定します。

複数のコネクターがある場合は、アプリケーション設定で使用するコネクターを指定する必要があります。

見積りリクエストの処理

ここでは、見積りリクエストを使用して価格を提示します。プロセッサー プロジェクト内に、src/main/java/org/acme/kafka/Processor/QuotesProcessor.java ファイルを作成し、次のコンテンツを追加します。

package org.acme.kafka.processor;

import java.util.Random;

import javax.enterprise.context.ApplicationScoped;

import org.acme.kafka.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Blocking;

/**
 * A bean consuming data from the "quote-requests" Kafka topic (mapped to "requests" channel) and giving out a random quote.
 * The result is pushed to the "quotes" Kafka topic.
 */
@ApplicationScoped
public class QuotesProcessor {

    private Random random = new Random();

    @Incoming("requests") (1)
    @Outgoing("quotes")   (2)
    @Blocking             (3)
    public Quote process(String quoteRequest) throws InterruptedException {
        // simulate some hard working task
        Thread.sleep(200);
        return new Quote(quoteRequest, random.nextInt(100));
    }
}
1 メソッドが requests チャネルからのアイテムを消費することを示します。
2 メソッドによって返されるオブジェクトが quotes チャネルに送信されることを示します。
3 処理が blocking であり、呼び出し元のスレッドで実行できないことを示します。

quote-requests トピックからのすべての Kafka record について、Reactive Messaging は`process` メソッドを呼び出し、返された Quote オブジェクトを quotes チャネルに送信します。この場合、application.properties ファイルでチャネルを設定して、requests および quotes チャネルを設定する必要があります。

%dev.quarkus.http.port=8081

# Configure the incoming `quote-requests` Kafka topic
mp.messaging.incoming.requests.topic=quote-requests
mp.messaging.incoming.requests.auto.offset.reset=earliest

この場合、1 つの受信コネクター設定と 1 つの送信コネクター設定があり、それぞれに異なる名前が付けられていることに注意してください。設定キーは次のように構成されています。

mp.messaging.[outgoing|incoming].{channel-name}.property=value

channel-name セグメントは、 @Incoming および @Outgoing アノテーションで設定された値と一致する必要があります。

  • quote-requests → この Kafka トピックから見積りリクエストを読みます

  • quotes → この Kafka トピックに見積りを書き込みます

この設定の詳細については、Kafka ドキュメントの プロデューサー設定 and コンシューマー設定 セクションを参照してください。これらのプロパティーは、kafka という接頭辞で設定されます。設定プロパティーの完全なリストは、Kafka リファレンスガイド - 設定 にあります。

mp.messaging.incoming.requests.auto.offset.reset=earliest は、コンシューマーグループにコミットされたオフセットがない場合に、最初のオフセットからトピックの読み取りを開始するようにアプリケーションに指示します。つまり、プロセッサーアプリケーションを起動する前に送信されたメッセージも処理します。

シリアライザーまたはデシリアライザーを設定する必要はありません。Quarkus はそれらを検出し、何も見つからない場合は、JSON シリアル化を使用してそれらを生成します。

見積りの受信

プロデューサー プロジェクトに戻ります。QuotesResource を変更して、Kafka からの見積りを消費し、サーバー送信イベントを介してクライアントに送り返します。

import io.smallrye.mutiny.Multi;

...

@Channel("quotes")
Multi<Quote> quotes; (1)

/**
 * Endpoint retrieving the "quotes" Kafka topic and sending the items to a server sent event.
 */
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
    return quotes; (3)
}
1 @Channel 修飾子を使用して quotes チャネルを挿入します。
2 Server Sent Events を使用してコンテンツが送信されたことを示します。
3 ストリーム (Reactive Stream) を返します。

Quarkus は quotes チャネルを quotes Kafka トピックに自動的に関連付けるため、何も設定する必要はありません。また、Quote クラスのデシリアライザーも生成します。

Kafka でのメッセージのシリアライズ

この例では、Jackson を使用して Kafka メッセージをシリアライズ/デシリアライズしました。メッセージのシリアルライズに関するその他のオプションについては、Kafka リファレンスガイド - シリアル化 を参照してください。

スキーマレジストリーを使用した契約ファーストのアプローチを採用することを強くお勧めします。スキーマレジストリーと Avro で Apache Kafka を使用する方法の詳細については、スキーマレジストリーと Avro での Apache Kafka の使用方法 ガイドを参照してください。

HTML ページ

見積りをリクエストし、SSE で取得した価格を表示する HTML ページの最終調整を行います。

プロデューサー プロジェクト内に、次の内容で src/main/resources/META-INF/resources/quotes.html ファイルを作成します。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Prices</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
    <div class="card">
        <div class="card-body">
            <h2 class="card-title">Quotes</h2>
            <button class="btn btn-info" id="request-quote">Request Quote</button>
            <div class="quotes"></div>
        </div>
    </div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script>
    $("#request-quote").click((event) => {
        fetch("/quotes/request", {method: "POST"})
        .then(res => res.text())
        .then(qid => {
            var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
            $(".quotes").prepend(row);
        });
    });

    var source = new EventSource("/quotes");
    source.onmessage = (event) => {
      var json = JSON.parse(event.data);
      $(`#${json.id}`).html((index, html) => {
        return html.replace("Pending", `\$\xA0${json.price}`);
      });
    };
</script>
</html>

ここでは何も特別なことはありません。ユーザーがボタンをクリックすると、見積りをリクエストするための HTTP リクエストが作成され、保留中の見積りがリストに追加されます。SSE を介して受け取った見積もりごとに、リスト内の対応するアイテムが更新されます。

実行

両方のアプリケーションを実行する必要があります。1 つの端末で、次を実行します。

mvn -f kafka-quickstart-producer quarkus:dev

別の端末で、次を実行します。

mvn -f kafka-quickstart-processor quarkus:dev

Quarkus は、Kafka ブローカーを自動的に起動し、アプリケーションを設定して、異なるアプリケーション間で Kafka ブローカーインスタンスを共有します。詳細については、Dev Services for Kafka を参照してください。

ブラウザーで http://localhost:8080/quotes.html を開き、ボタンをクリックして見積りをリクエストします。

JVM またはネイティブモードでの実行

開発モードまたはテストモードで実行していない場合は、Kafka ブローカーを起動する必要があります。 Apache Kafka Web サイト に記載された手順に従うか、次の内容で docker-compose.yaml ファイルを作成できます。

version: '3.5'

services:

  zookeeper:
    image: quay.io/strimzi/kafka:0.23.0-kafka-2.8.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs
    networks:
      - kafka-quickstart-network

  kafka:
    image: quay.io/strimzi/kafka:0.23.0-kafka-2.8.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    networks:
      - kafka-quickstart-network

  producer:
    image: quarkus-quickstarts/kafka-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: producer
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    depends_on:
      - kafka
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    ports:
      - "8080:8080"
    networks:
      - kafka-quickstart-network

  processor:
    image: quarkus-quickstarts/kafka-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: processor
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    depends_on:
      - kafka
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    networks:
      - kafka-quickstart-network

networks:
  kafka-quickstart-network:
    name: kafkaquickstart

最初に、次のコマンドを使用して両方のアプリケーションを JVM モードでビルドします。

mvn -f kafka-quickstart-producer package
mvn -f kafka-quickstart-processor package

パッケージ化したら、docker-compose up を実行します。

これは開発クラスターであり、本番では使用しないでください。

アプリケーションをネイティブ実行可能ファイルとしてビルドし、実行することもできます。まず、両方のアプリケーションをネイティブとしてコンパイルします。

mvn -f kafka-quickstart-producer package -Dnative -Dquarkus.native.container-build=true
mvn -f kafka-quickstart-processor package -Dnative -Dquarkus.native.container-build=true

次のコマンドでシステムを実行します。

export QUARKUS_MODE=native
docker-compose up --build

詳細

このガイドでは、Quarkus を使用して Kafka とやりとりする方法を示しました。 SmallRye Reactive Messaging を利用して、データストリーミングアプリケーションを構築します。

機能と設定オプションの完全なリストについては、Apache Kafka エクステンションのリファレンスガイド を確認してください。

このガイドでは、Apache Kafka と対話するための Apache Kafka フレームワークについて説明します。Kafka の Quarkus エクステンションでは、Kafka クライアントの直接使用 も可能です。