スキーマレジストリとJSONスキーマと共にApache Kafkaを使用する
This guide shows how your Quarkus application can use Apache Kafka, JSON Schema serialized records, and connect to a schema registry (such as the Confluent Schema Registry or Apicurio Registry).
特に Kafka や Kafka in Quarkus を使い慣れていない場合は、最初に リアクティブメッセージングでの Apache Kafka の使用 ガイドを確認することをお勧めします。
前提条件
このガイドを完成させるには、以下が必要です:
-
ざっと 30 minutes
-
IDE
-
JDK 17+がインストールされ、
JAVA_HOME
が適切に設定されていること -
Apache Maven 3.9.9
-
Docker と Docker Compose、または Podman 、および Docker Compose
-
使用したい場合は、 Quarkus CLI
-
ネイティブ実行可能ファイルをビルドしたい場合、MandrelまたはGraalVM(あるいはネイティブなコンテナビルドを使用する場合はDocker)をインストールし、 適切に設定していること
アーキテクチャ
このガイドでは、REST リソース、つまりムービー DTO を消費して Kafka トピックに配置する MovieResource
を実装します。
次に、同じトピックからメッセージを消費および収集するコンシューマーを実装します。収集されたメッセージは、 Server-Sent Events を介して別のリソースである ConsumedMovieResource
によって公開されます。
The Movies will be serialized and deserialized using JSON Schema. The schema, describing the Movie, is stored in Apicurio Registry. The same concept applies if you are using the Confluent JSON Schema serde and Confluent Schema Registry.
ソリューション
次の章で紹介する手順に沿って、ステップを踏んでアプリを作成することをお勧めします。ただし、完成した例にそのまま進んでも構いません。
Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.git
、 アーカイブ をダウンロードします。
The solution is located in the kafka-json-schema-quickstart
directory.
Mavenプロジェクトの作成
まず、新しいプロジェクトが必要です。以下のコマンドで新規プロジェクトを作成します。
Windowsユーザーの場合:
-
cmdを使用する場合、(バックスラッシュ
\
を使用せず、すべてを同じ行に書かないでください)。 -
Powershellを使用する場合は、
-D
パラメータを二重引用符で囲んでください。例:"-DprojectArtifactId=kafka-json-schema-quickstart"
If you use Confluent Schema Registry, you don’t need the |
Json Schema
Json Schema is a data serialization system. Data structures are described using schemas.
The first thing we need to do is to create a schema describing the Movie
structure.
Create a file called src/main/resources/json-schema.json
with the schema for our record (Kafka message):
{
"$id": "https://example.com/person.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Movie",
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "The movie's title."
},
"year": {
"type": "integer",
"description": "The movie's year."
}
}
}
Note that auto-generating the Java class from the JSON Schema definition is not possible. Therefore, you must define the Java class as follows so it can be used by the serialization process:
package org.acme.kafka;
public class Movie {
private String title;
private Integer year;
public Movie() {
}
public Movie(String title, Integer year) {
this.title = title;
this.year = year;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public Integer getYear() {
return year;
}
public void setYear(Integer year) {
this.year = year;
}
}
Movie
プロデューサー
スキーマを定義したら、MovieResource
を実装できます。
MovieResource
を開き、Movie
DTO の Emitter
を挿入し、Movie
を消費する @POST
メソッドを実装して Emitter
経由で送信してください。
package org.acme.kafka;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
@Path("/movies")
public class MovieResource {
private static final Logger LOGGER = Logger.getLogger(MovieResource.class);
@Channel("movies")
Emitter<Movie> emitter;
@POST
public Response enqueueMovie(Movie movie) {
LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
emitter.send(movie);
return Response.accepted().build();
}
}
Now, we need to map the movies
channel (the Emitter
emits to this channel) to a Kafka topic and also map the schema to be used on this channel.
To achieve this, edit the application.properties
file, and add the following content:
# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka
# disable automatic detection of the serializers
quarkus.messaging.kafka.serializer-autodetection.enabled=false
# Set the value serializer for the channel `movies`
mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer
# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies
# set the schema to be used for the channel `movies`. Note that this property accepts just a name or a path and the serializer will look for the resource on the classpath.
mp.messaging.outgoing.movies.apicurio.registry.artifact.schema.location=json-schema.json
# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true
Note that unlike in the avro serialization, autodetect can’t be used with JSON Schema, so we must define the If you use Confluent Schema Registry, in this case you must define the |
Movie
コンシューマー
So, we can write records into Kafka containing our Movie
data.
That data is serialized using JSON Schema.
Now, it’s time to implement a consumer for them.
movies-from-kafka
チャネルからの Movie
メッセージを消費し、Server-Sent Event を介して公開する ConsumedMovieResource
を作成します。
package org.acme.kafka;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;
import io.smallrye.mutiny.Multi;
@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {
@Channel("movies-from-kafka")
Multi<Movie> movies;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> stream() {
return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
}
}
アプリケーションのコードの最後のビットは、application.properties
の movies-from-kafka
チャネルの設定です。
# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka
# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies
# set the deserializer for the incoming channel
mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer
# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false
mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest
Again, unlike with Avro, we have to define the If you use Confluent Schema Registry, you must configure |
アプリケーションの実行
アプリケーションを開発モードで実行します。
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
Dev Services により、Kafka ブローカーと Apicurio Registry インスタンスは自動的に開始されます。詳細については、Dev Services for Kafka および Dev Services for Apicurio Registry を参照してください。
You might have noticed that we didn’t configure the schema registry URL anywhere. This is because Dev Services for Apicurio Registry configures all Kafka channels in Quarkus Messaging to use the automatically started registry instance. ネイティブ API に加えて、Apicurio Registry は Confluent Schema Registry と API 互換のエンドポイントも公開します。したがって、この自動設定は、Apicurio Registry serde と Confluent Schema Registry serde の両方で機能します。 ただし、Confluent Schema Registry 自体の実行に対する Dev Services のサポートはないことに注意してください。Confluent Schema Registry の実行インスタンスを使用する場合は、その URL を Kafka ブローカーの URL と併せて設定します。
|
2 番目のターミナルで、ConsumedMovieResource
リソースを curl
でクエリーします。
curl -N http://localhost:8080/consumed-movies
3番目のターミナルで、いくつかの movie をポストします。
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Shawshank Redemption","year":1994}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Godfather","year":1972}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Dark Knight","year":2008}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"12 Angry Men","year":1957}' \
http://localhost:8080/movies
2 番目のターミナルに表示されているものを確認します。次の行に何かが表示されているはずです。
data:'The Shawshank Redemption' from 1994
data:'The Godfather' from 1972
data:'The Dark Knight' from 2008
data:'12 Angry Men' from 1957
JVM またはネイティブモードでの実行
開発モードまたはテストモードで実行していない場合は、独自の Kafka ブローカーと Apicurio Registry を起動する必要があります。そのための最も簡単な方法は、docker-compose
を使用して適切なコンテナーを起動することです。
Confluent Schema Registry を使用している場合は、すでに Kafka ブローカーと Confluent Schema Registry インスタンスが実行および設定されています。ここに記した`docker-compose` や Apicurio Registry の設定に関する説明は無視してかまいません。 |
プロジェクトのルートに、次の内容の docker-compose.yaml
ファイルを作成します。
version: '2'
services:
zookeeper:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
schema-registry:
image: apicurio/apicurio-registry-mem:2.4.2.Final
ports:
- 8081:8080
depends_on:
- kafka
environment:
QUARKUS_PROFILE: prod
アプリケーションを開始する前に、まず Kafka ブローカーと Apicurio Registry を起動します。
docker-compose up
コンテナを停止するには、docker-compose down を使用します。docker-compose rm を使用してコンテナをクリーンアップすることもできます。
|
次のコマンドでアプリケーションをビルドできます。
quarkus build
./mvnw install
./gradlew build
その後、次のコマンドを使用して JVM モードで実行します。
java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar
デフォルトでは、アプリケーションは localhost:9092 でリッスンしている Kafka ブローカーに接続しようとします。ブートストラップサーバーは、java -Dkafka.bootstrap.servers=... -jar target/quarkus-app/quarkus-run.jar を使用して設定できます。
|
コマンドラインでレジストリー URL を指定する方法は便利ではないため、prod
プロファイルに対してのみ設定プロパティーを追加します。
%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2
以下でネイティブ実行可能ファイルをビルドできます。
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
次に、それを実行します。
./target/kafka-json-schema-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092
アプリケーションのテスト
上記のとおり、Dev Services for Kafka と Apicurio Registry は、Kafka ブローカーと Apicurio Registry インスタンスを開発モードおよびテスト用に自動で開始し、設定します。したがって、Kafka および Apicurio Registry を自分で設定する必要はなく、テストの作成に集中できます。
まず、REST クライアントと Awaitility のテスト依存関係をビルドファイルに追加します。
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")
テストでは、ムービーをループで送信し、送信したものを ConsumedMovieResource
が返すかどうかを確認します。
package org.acme.kafka;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
@QuarkusTest
public class MovieResourceTest {
@TestHTTPResource("/consumed-movies")
URI consumedMovies;
@Test
public void testHelloEndpoint() throws InterruptedException {
// create a client for `ConsumedMovieResource` and collect the consumed resources in a list
Client client = ClientBuilder.newClient();
WebTarget target = client.target(consumedMovies);
List<String> received = new CopyOnWriteArrayList<>();
SseEventSource source = SseEventSource.target(target).build();
source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));
// in a separate thread, feed the `MovieResource`
ExecutorService movieSender = startSendingMovies();
source.open();
// check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
await().atMost(5, SECONDS).until(() -> received.size() >= 2);
assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
"'12 Angry Men' from 1957"));
source.close();
// shutdown the executor that is feeding the `MovieResource`
movieSender.shutdownNow();
movieSender.awaitTermination(5, SECONDS);
}
private ExecutorService startSendingMovies() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
while (true) {
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
.when()
.post("/movies")
.then()
.statusCode(202);
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"12 Angry Men\",\"year\":1957}")
.when()
.post("/movies")
.then()
.statusCode(202);
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
break;
}
}
});
return executorService;
}
}
プロジェクトと一緒に生成された MovieResourceTest を変更しました。このテストクラスには、ネイティブ実行可能ファイルに対して同じテストを実行するサブクラ ス`NativeMovieResourceIT` があります。実行するには、次を実行します。
|
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
手動セットアップ
Dev Services を使用できず、Kafka ブローカーと Apicurio Registry インスタンスを手動で開始する場合は、QuarkusTestResourceLifecycleManager を定義します。
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<version>0.105.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
testImplementation("io.strimzi:strimzi-test-container:0.105.0") {
exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;
import java.util.HashMap;
import java.util.Map;
import org.testcontainers.containers.GenericContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;
public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {
private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();
private GenericContainer<?> registry;
@Override
public Map<String, String> start() {
kafka.start();
registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.4.2.Final")
.withExposedPorts(8080)
.withEnv("QUARKUS_PROFILE", "prod");
registry.start();
Map<String, String> properties = new HashMap<>();
properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
"http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2");
properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
return properties;
}
@Override
public void stop() {
registry.stop();
kafka.stop();
}
}
@QuarkusTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
...
}
互換性のあるバージョンのApicurio Registryの使用
The quarkus-apicurio-registry-json-schema
extension depends on recent versions of Apicurio Registry client,
and most versions of Apicurio Registry server and client are backwards compatible.
For some you need to make sure that the client used by Serdes is compatible with the server.
For example, with Apicurio Dev Service if you set the image name to use version 2.1.5.Final
:
quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.1.5.Final
You need to make sure that apicurio-registry-serdes-json-schema-serde
dependency
and the REST client apicurio-common-rest-client-vertx
dependency are set to compatible versions:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-json-schema</artifactId>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-json-schema-serde</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-json-schema-serde</artifactId>
<version>2.1.5.Final</version>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-jdk</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
<version>0.1.5.Final</version>
</dependency>
dependencies {
implementation(platform("io.quarkus.platform:quarkus-bom:2.12.3.Final"))
...
implementation("io.quarkus:quarkus-apicurio-registry-json-schema")
implementation("io.apicurio:apicurio-registry-serdes-json-schema-serde") {
exclude group: "io.apicurio", module: "apicurio-common-rest-client-jdk"
exclude group: "io.apicurio", module: "apicurio-registry-client"
exclude group: "io.apicurio", module: "apicurio-registry-common"
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-client") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-common") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-common-rest-client-vertx") {
version {
strictly "0.1.5.Final"
}
}
}
apicurio-registry-client
および apicurio-common-rest-client-vertx
の既知の旧互換バージョンは以下の通りです。
-
apicurio-registry-client
2.1.5.Final と 0.1.5.Finalapicurio-common-rest-client-vertx
-
apicurio-registry-client
2.3.1.Final と 0.1.13.Finalapicurio-common-rest-client-vertx
Confluent Schema Registryの使用
If you want to use the Confluent Schema Registry, you need the quarkus-confluent-registry-json-schema
extension, instead of the quarkus-apicurio-registry-json-schema
extension.
Also, you need to add a few dependencies and a custom Maven repository to your pom.xml
/ build.gradle
file:
<dependencies>
...
<!-- the extension -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema</artifactId>
</dependency>
<!-- Confluent registry libraries use Jakarta REST client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<version>7.2.0</version>
<exclusions>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<!-- io.confluent:kafka-json-schema-serializer is only available from this repository: -->
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
repositories {
...
maven {
url "https://packages.confluent.io/maven/"
}
}
dependencies {
...
implementation("io.quarkus:quarkus-confluent-registry-json-schema")
// Confluent registry libraries use Jakarta REST client
implementation("io.quarkus:quarkus-rest-client")
implementation("io.confluent:kafka-json-schema-serializer:7.2.0") {
exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
}
}
In JVM mode, any version of io.confluent:kafka-json-schema-serializer
can be used.
In native mode, Quarkus supports the following versions: 6.2.x
, 7.0.x
, 7.1.x
, 7.2.x
, 7.3.x
.
For version 7.4.x
and 7.5.x
, due to an issue with the Confluent Schema Serializer, you need to add another dependency:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</dependency>
dependencies {
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}
それ以外のバージョンでは、ネイティブコンフィギュレーションの調整が必要な場合があります。