The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

QuarkusアプリケーションでApache Kafkaを開始する

Apache Kafka は、メッセージの保持、再生機能、コンシューマーグループなどのユニークな特性を提供する分散型ストリーミングプラットフォームとして広く普及しています。Kafka は非常に拡張性が高く、耐障害性に優れており、多くの近代的なシステムの要になりつつあります。とはいえ、Kafka単体で存在するものではなく、アプリケーションに適したメッセージング技術を選択するのは難しいことです。 このリンク先 のように、あなたの決定を後押しするために役立つ記事がたくさんあります。この記事は Kafka についてのもので、AMQP についての同等の記事は後日公開される予定です。

この投稿では、QuarkusアプリケーションでApache Kafkaを10ステップ未満で使い始める方法を学びます。イベント駆動型マイクロサービスを構築するための宣言的アプローチである Reactive Messaging を使用しますが、 ベア Kafka APIやKafka Streamsを使用することもできます。

完全なコードは GitHubから入手できます。

ステップ 1 - プロジェクトの生成

最初の一歩として、正しい依存関係を持つ新しいプロジェクト構造を取得してみましょう。 https://code.quarkus.io にアクセスして、グループIDとアーティファクトIDを入力します。そして、エクステンションのリストで以下を選択します:

  • SmallRye Reactive Messaging - Kafka Connector

  • RESTEasy Jackson

getting started kafka code

"Example Code"を無効にすることで、生成されたプロジェクトにサンプルコードが含まれないようにすることができます。

そして、 Generate your application をクリックして、プロジェクトをzipファイルとしてダウンロードし、解凍して、お気に入りのIDEにロードします。

生成された pom.xml を開くと、 quarkus-smallrye-reactive-messaging-kafkaquarkus-resteasy-jackson の依存関係が宣言されていることがわかります。

ステップ2 - 何を交換するか?

交換するものが必要です。あまりオリジナリティを持たずに、 Movie オブジェクトを送受信するとしましょう。プロジェクトの中で、以下の内容の org.acme.Movie クラスを作成します:

package org.acme;

public class Movie {

    public String title;
    public int year;

}

Kafka では、レコードを生成、消費します。レコードにはキーと値が含まれています。例えば、映画の公開年をキーにして、タイトルを値にするとします。

また、これらのレコードを送信するトピックを決める必要があります。単純化して、トピックを movies としましょう。

ステップ3 - アプリケーションの設定

上記の通り、Reactive Messagingを使用します。Reactive Messagingを使用する場合、あるチャンネルにメッセージを送信し、別のチャンネルからメッセージを受信します。これらのチャンネルは、基礎となるメッセージング技術に設定によってマッピングされます。このアプリケーションでは、受信および配信チャンネルが movies Kafkaチャンネルを使用することを指定する必要があります。 src/main/resources/application.properties で、以下の内容を追加します:

# The Kafka broker location (defaults to localhost:9092)
kafka.bootstrap.servers=localhost:9092

# Configuring the incoming channel (reading from Kafka)
mp.messaging.incoming.movies-in.connector=smallrye-kafka
mp.messaging.incoming.movies-in.topic=movies
mp.messaging.incoming.movies-in.key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.movies-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Configuring the outgoing channel (writing to Kafka)
mp.messaging.outgoing.movies-out.connector=smallrye-kafka
mp.messaging.outgoing.movies-out.topic=movies
mp.messaging.outgoing.movies-out.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer

ブローカーの場所を kafka.bootstrap.servers で設定した後、 movies-in (レコードを受信)と movies-out (レコードを配信)の 2 つのチャンネルを設定します。

mp.messaging.incoming.movies-in というプレフィックスを使ってチャネルを設定します。 connector 属性は、このチャネルを担当するコネクタ (ここでは Kafka コネクタ) を示しています。また、キーと値のデシリアライザも設定する必要があります。

アウトバウンド movies-out チャネルを設定するには、 mp.messaging.outgoing.movies-out のプレフィックスを使用します。そのチャネルを担当するコネクタを示すだけでなく、キーと値のシリアライザも設定する必要があります。

ステップ 4 - Kafkaにムービーを公開する

さて、いよいよKafkaにレコードを送信します。以下の内容で org.acme.MovieProducer クラスを作成します:

package org.acme;

import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped
public class MovieProducer {

    @Inject @Channel("movies-out")
    Emitter<Record<Integer, String>> emitter;

    public void sendMovieToKafka(Movie movie) {
        emitter.send(Record.of(movie.year, movie.title));
    }
}

このクラスでは、 Emitter 、つまりチャンネルへのメッセージ送信を担当するオブジェクトを注入します。このエミッタは movies-out チャンネルにアタッチされています (つまり、Kafka にメッセージを送信します)。映画の公開年をキーに、タイトルを値にして Record オブジェクトを送信しています。

そのため、残りのアプリケーションは sendMovieToKafka メソッドを使って、Kafka トピックにムービーを送信するだけです。

ステップ5 - 映画を消費する

今度は反対側に目を向けて、Kafkaからムービーを取り出してみましょう。

package org.acme;

import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MovieConsumer {

    private final Logger logger = Logger.getLogger(MovieConsumer.class);

    @Incoming("movies-in")
    public void receive(Record<Integer, String> record) {
        logger.infof("Got a movie: %d - %s", record.key(), record.value());
    }
}

ここでは、 @Incoming アノテーションを使用して、受信したレコードごとに receive メソッドを呼び出すようにQuarkusに指示しています。

ステップ6 - RESTエンドポイントからのムービーの送信

REST エンドポイントから Kafka にメッセージを送信することはよくあることです。そのためには、以下のような内容の org.acme.MovieResource クラスを作成します:

package org.acme;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class MovieResource {

    @Inject
    MovieProducer producer;

    @POST
    public Response send(Movie movie) {
        producer.sendMovieToKafka(movie);
        // Return an 202 - Accepted response.
        return Response.accepted().build();
    }
}

ステップ7 - これを実行してみましょう!

さて、まずはKafkaブローカーが必要です。 Apache Kafka のクイックスタート に従うか、以下の docker-compose.yaml ファイルを使用してください:

version: '2'

services:

  zookeeper:
    image: strimzi/kafka:0.20.1-kafka-2.6.0
    command: [
        "sh", "-c",
        "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: strimzi/kafka:0.20.1-kafka-2.6.0
    command: [
        "sh", "-c",
        "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

プロジェクト内に docker-compose.yaml ファイルをコピーし、ターミナルから、ブローカーを起動します: docker-compose up -d

そして、アプリケーションを次のように実行します:

./mvnw quarkus:dev

アプリケーションは開発モードで動作します。つまり、コードをまだ変更することが出来、変更した場合、自動的にリロードされます。

別の端末で、次のようなHTTP POSTリクエストをいくつか送信します:

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1994, "title":"The Shawshank Redemption"}' \
http://localhost:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1972, "title":"The Godfather"}' \
http://localhost:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":2008, "title":"The Dark Knight"}' \
http://localhost:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1994, "title":"Pulp Fiction"}' \
http://localhost:8080/

curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":2010, "title":"Inception"}' \
http://localhost:8080/

アプリケーションを実行しているターミナルでは、以下のように表示されます:

...
2021-01-13 09:29:41,087 INFO  [org.acm.MovieConsumer] (vert.x-eventloop-thread-9) Got a movie: 1994 - Pulp Fiction
2021-01-13 09:29:41,114 INFO  [org.acm.MovieConsumer] (vert.x-eventloop-thread-9) Got a movie: 2010 - Inception
...

動いています!

ステップ8 - ネイティブパッケージ

GraalVMが正しくインストールされ、設定されている場合 、このアプリケーションをネイティブ実行可能ファイルとしてパッケージ化することができます:

./mvnw package -Pnative

そして、 ./target/getting-started-kafka-1.0.0-SNAPSHOT-runner でネイティブ実行可能ファイルを実行すると、Kafkaを使用したQuarkusアプリケーションが数ミリ秒で起動し、消費するメモリー量はとんでもない量:100レコードを取り込んでも30MB未満!になります。

$ rss getting-started-kafka-1.0.0-SNAPSHOT-runner
PID 0M COMMAND
49321 30M ./target/getting-started-kafka-1.0.0-SNAPSHOT-runner

まとめ

10分もしないうちに、Apache Kafkaを使った新しいQuarkusアプリケーションができました。さらに進みたい場合は、 Kafkaガイドをチェックしてください。