The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.
このページを編集

スキーマレジストリとJSONスキーマと共にApache Kafkaを使用する

このガイドでは、Quarkus アプリケーションで Apache Kafka、https://json-schema.org/[JSON スキーマ] でシリアライズされたレコードを使用し、スキーマレジストリー (Confluent Schema RegistryApicurio Registry など) に接続する方法を説明します。

特に Kafka や Kafka in Quarkus を使い慣れていない場合は、最初に リアクティブメッセージングでの Apache Kafka の使用 ガイドを確認することをお勧めします。

前提条件

このガイドを完成させるには、以下が必要です:

  • ざっと 30 minutes

  • IDE

  • JDK 17+がインストールされ、 JAVA_HOME が適切に設定されていること

  • Apache Maven 3.9.12

  • Docker と Docker Compose、または Podman 、および Docker Compose

  • 使用したい場合は、 Quarkus CLI

  • ネイティブ実行可能ファイルをビルドしたい場合、MandrelまたはGraalVM(あるいはネイティブなコンテナビルドを使用する場合はDocker)をインストールし、 適切に設定していること

アーキテクチャ

このガイドでは、REST リソース、つまりムービー DTO を消費して Kafka トピックに配置する MovieResource を実装します。

次に、同じトピックからメッセージを消費および収集するコンシューマーを実装します。収集されたメッセージは、 Server-Sent Events を介して別のリソースである ConsumedMovieResource によって公開されます。

Movies は、JSON スキーマを使用してシリアライズおよびデシリアライズされます。 Movie を記述するスキーマは、Apicurio Registry に保存されます。 Confluent JSON Schema serde および Confluent Schema Registry を使用している場合も、同じ概念が適用されます。

ソリューション

次の章で紹介する手順に沿って、ステップを踏んでアプリを作成することをお勧めします。ただし、完成した例にそのまま進んでも構いません。

Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.gitアーカイブ をダウンロードします。

ソリューションは kafka-json-schema-quickstart ディレクトリー にあります。

Mavenプロジェクトの作成

まず、新しいプロジェクトが必要です。以下のコマンドで新規プロジェクトを作成します。

コマンドラインインタフェース
quarkus create app org.acme:kafka-json-schema-quickstart \
    --extension='rest-jackson,messaging-kafka,apicurio-registry-json-schema' \
    --no-code
cd kafka-json-schema-quickstart

Gradleプロジェクトを作成するには、 --gradle または --gradle-kotlin-dsl オプションを追加します。

Quarkus CLIのインストールと使用方法の詳細については、 Quarkus CLI ガイドを参照してください。

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.34.6:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-json-schema-quickstart \
    -Dextensions='rest-jackson,messaging-kafka,apicurio-registry-json-schema' \
    -DnoCode
cd kafka-json-schema-quickstart

Gradleプロジェクトを作成するには、 -DbuildTool=gradle または -DbuildTool=gradle-kotlin-dsl オプションを追加します。

Windowsユーザーの場合:

  • cmdを使用する場合、(バックスラッシュ \ を使用せず、すべてを同じ行に書かないでください)。

  • Powershellを使用する場合は、 -D パラメータを二重引用符で囲んでください。例: "-DprojectArtifactId=kafka-json-schema-quickstart"

Confluent Schema Registry を使用する場合、quarkus-apicurio-registry-json-schema エクステンションは必要ありません。 代わりに、quarkus-confluent-registry-json-schema エクステンションとさらにいくつかの依存関係が必要です。 詳細は、Confluent Schema Registryの使用 を参照してください。

JSON スキーマ

JSON スキーマはデータのシリアライズシステムです。データ構造はスキーマを使用して記述されます。 最初に Movie 構造を記述するスキーマを作成する必要があります。 レコード (Kafka メッセージ) のスキーマを使用して、src/main/resources/json-schema.json というファイルを作成します。

{
  "$id": "https://example.com/person.schema.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Movie",
  "type": "object",
  "properties": {
    "title": {
      "type": "string",
      "description": "The movie's title."
    },
    "year": {
      "type": "integer",
      "description": "The movie's year."
    }
  }
}

JSON スキーマ定義から Java クラスを自動生成することはできないことに注意してください。したがって、シリアライズプロセスで使用できるように、次のように Java クラスを定義する必要があります。

package org.acme.kafka;

public class Movie {

    private String title;
    private Integer year;

    public Movie() {
    }

    public Movie(String title, Integer year) {
        this.title = title;
        this.year = year;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public Integer getYear() {
        return year;
    }

    public void setYear(Integer year) {
        this.year = year;
    }
}

Movie プロデューサー

スキーマを定義したら、MovieResource を実装できます。

MovieResource を開き、Movie DTO の Emitter を挿入し、Movie を消費する @POST メソッドを実装して Emitter 経由で送信してください。

package org.acme.kafka;

import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;

@Path("/movies")
public class MovieResource {
    private static final Logger LOGGER = Logger.getLogger(MovieResource.class);

    @Channel("movies")
    Emitter<Movie> emitter;

    @POST
    public Response enqueueMovie(Movie movie) {
        LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
        emitter.send(movie);
        return Response.accepted().build();
    }

}

ここで、movies チャネル (Emitter がこのチャネルに出力) を Kafka トピックに マップ し、このチャネルで使用されるスキーマも マップ する必要があります。 これを実現するには、application.properties ファイルを編集し、次のコンテンツを追加します。

# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka

# disable automatic detection of the serializers
quarkus.messaging.kafka.serializer-autodetection.enabled=false

# Set the value serializer for the channel `movies`
mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer

# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies

# set the schema to be used for the channel `movies`. Note that this property accepts just a name or a path and the serializer will look for the resource on the classpath.
mp.messaging.outgoing.movies.apicurio.registry.artifact.schema.location=json-schema.json

# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true

Avro シリアライゼーションとは異なり、自動検出 は JSON スキーマでは使用できないため、value.serializer を定義する必要があることに注意してください。 Avro の場合と同様に、apicurio.registry.auto-register プロパティーを定義する必要があります。

Confluent Schema Registry を使用する場合、値 io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer を使用して value.serializer も定義する必要があります。 これは自動的に検出されます。 Confluent Schema Registry における apicurio.registry.auto-register の類似物は、auto.register.schemas と呼ばれるものです。 これはデフォルトで true に設定されているため、この例では設定する必要はありません。 自動スキーマ登録を無効にする場合は、明示的に false に設定できます。

Movie コンシューマー

Movie データを含むレコードを Kafka に書き込むことができます。 そのデータは JSON スキーマを使用してシリアライズされます。 では、それらのコンシューマーを実装してみましょう。

movies-from-kafka チャネルからの Movie メッセージを消費し、Server-Sent Event を介して公開する ConsumedMovieResource を作成します。

package org.acme.kafka;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {

    @Channel("movies-from-kafka")
    Multi<Movie> movies;

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<String> stream() {
        return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
    }
}

アプリケーションのコードの最後のビットは、application.propertiesmovies-from-kafka チャネルの設定です。

# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka

# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies

# set the deserializer for the incoming channel
mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer

# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false

mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest

ここでも、Avro とは異なり、value.deserializer を定義する必要があります。

Confluent Schema Registry を使用する場合は、値 ´io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer´ を使用して value.deserializer も設定する必要があります。 どちらも自動的に検出されます。

アプリケーションの実行

アプリケーションを開発モードで実行します。

コマンドラインインタフェース
quarkus dev
Maven
./mvnw quarkus:dev
Gradle
./gradlew --console=plain quarkusDev

Dev Services により、Kafka ブローカーと Apicurio Registry インスタンスは自動的に開始されます。詳細については、Dev Services for Kafka および Dev Services for Apicurio Registry を参照してください。

スキーマレジストリのURLをどこにも設定していないことに気づいたかもしれません。 これは、Dev Services for Apicurio Registryによって、Quarkus MessagingのすべてのKafkaチャンネルが、自動的に起動されたレジストリインスタンスを使用するように設定されるためです。

ネイティブ API に加えて、Apicurio Registry は Confluent Schema Registry と API 互換のエンドポイントも公開します。したがって、この自動設定は、Apicurio Registry serde と Confluent Schema Registry serde の両方で機能します。

ただし、Confluent Schema Registry 自体の実行に対する Dev Services のサポートはないことに注意してください。Confluent Schema Registry の実行インスタンスを使用する場合は、その URL を Kafka ブローカーの URL と併せて設定します。

kafka.bootstrap.servers=PLAINTEXT://localhost:9092
mp.messaging.connector.smallrye-kafka.schema.registry.url=http://localhost:8081

2 番目のターミナルで、ConsumedMovieResource リソースを curl でクエリーします。

curl -N http://localhost:8080/consumed-movies

3番目のターミナルで、いくつかの movie をポストします。

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Shawshank Redemption","year":1994}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Godfather","year":1972}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"The Dark Knight","year":2008}' \
  http://localhost:8080/movies

curl --header "Content-Type: application/json" \
  --request POST \
  --data '{"title":"12 Angry Men","year":1957}' \
  http://localhost:8080/movies

2 番目のターミナルに表示されているものを確認します。次の行に何かが表示されているはずです。

data:'The Shawshank Redemption' from 1994

data:'The Godfather' from 1972

data:'The Dark Knight' from 2008

data:'12 Angry Men' from 1957

JVM またはネイティブモードでの実行

開発モードまたはテストモードで実行していない場合は、独自の Kafka ブローカーと Apicurio Registry を起動する必要があります。そのための最も簡単な方法は、docker-compose を使用して適切なコンテナーを起動することです。

Confluent Schema Registry を使用している場合は、すでに Kafka ブローカーと Confluent Schema Registry インスタンスが実行および設定されています。ここに記した`docker-compose` や Apicurio Registry の設定に関する説明は無視してかまいません。

プロジェクトのルートに、次の内容の docker-compose.yaml ファイルを作成します。

services:

  kafka:
    image: quay.io/strimzi/kafka:latest-kafka-4.1.0
    command: [
      "sh", "-c",
      "./bin/kafka-storage.sh format --standalone -t $$(./bin/kafka-storage.sh random-uuid) -c ./config/server.properties && ./bin/kafka-server-start.sh ./config/server.properties"
    ]
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"

  schema-registry:
    image: quay.io/apicurio/apicurio-registry:3.1.7
    ports:
      - 8081:8080
    depends_on:
      - kafka
    environment:
      QUARKUS_PROFILE: prod

アプリケーションを開始する前に、まず Kafka ブローカーと Apicurio Registry を起動します。

docker-compose up
コンテナを停止するには、docker-compose down を使用します。docker-compose rm を使用してコンテナをクリーンアップすることもできます。

次のコマンドでアプリケーションをビルドできます。

コマンドラインインタフェース
quarkus build
Maven
./mvnw install
Gradle
./gradlew build

その後、次のコマンドを使用して JVM モードで実行します。

java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v3 -jar target/quarkus-app/quarkus-run.jar
デフォルトでは、アプリケーションは localhost:9092 でリッスンしている Kafka ブローカーに接続しようとします。ブートストラップサーバーは、java -Dkafka.bootstrap.servers=... -jar target/quarkus-app/quarkus-run.jar を使用して設定できます。

コマンドラインでレジストリー URL を指定する方法は便利ではないため、prod プロファイルに対してのみ設定プロパティーを追加します。

%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v3

以下でネイティブ実行可能ファイルをビルドできます。

コマンドラインインタフェース
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.native.enabled=true

次に、それを実行します。

./target/kafka-json-schema-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092

アプリケーションのテスト

上記のとおり、Dev Services for Kafka と Apicurio Registry は、Kafka ブローカーと Apicurio Registry インスタンスを開発モードおよびテスト用に自動で開始し、設定します。したがって、Kafka および Apicurio Registry を自分で設定する必要はなく、テストの作成に集中できます。

まず、REST クライアントと Awaitility のテスト依存関係をビルドファイルに追加します。

pom.xml
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-rest-client</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")

テストでは、ムービーをループで送信し、送信したものを ConsumedMovieResource が返すかどうかを確認します。

package org.acme.kafka;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;

@QuarkusTest
public class MovieResourceTest {

    @TestHTTPResource("/consumed-movies")
    URI consumedMovies;

    @Test
    public void testHelloEndpoint() throws InterruptedException {
        // create a client for `ConsumedMovieResource` and collect the consumed resources in a list
        Client client = ClientBuilder.newClient();
        WebTarget target = client.target(consumedMovies);

        List<String> received = new CopyOnWriteArrayList<>();

        SseEventSource source = SseEventSource.target(target).build();
        source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));

        // in a separate thread, feed the `MovieResource`
        ExecutorService movieSender = startSendingMovies();

        source.open();

        // check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
        await().atMost(5, SECONDS).until(() -> received.size() >= 2);
        assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
                "'12 Angry Men' from 1957"));
        source.close();

        // shutdown the executor that is feeding the `MovieResource`
        movieSender.shutdownNow();
        movieSender.awaitTermination(5, SECONDS);
    }

    private ExecutorService startSendingMovies() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            while (true) {
                given()
                        .contentType(ContentType.JSON)
                        .body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
                .when()
                        .post("/movies")
                .then()
                        .statusCode(202);

                given()
                        .contentType(ContentType.JSON)
                        .body("{\"title\":\"12 Angry Men\",\"year\":1957}")
                .when()
                        .post("/movies")
                .then()
                        .statusCode(202);

                try {
                    Thread.sleep(200L);
                } catch (InterruptedException e) {
                    break;
                }
            }
        });
        return executorService;
    }

}
プロジェクトと一緒に生成された MovieResourceTest を変更しました。このテストクラスには、ネイティブ実行可能ファイルに対して同じテストを実行するサブクラ ス`NativeMovieResourceIT` があります。実行するには、次を実行します。
コマンドラインインタフェース
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.native.enabled=true

手動セットアップ

Dev Services を使用できず、Kafka ブローカーと Apicurio Registry インスタンスを手動で開始する場合は、QuarkusTestResourceLifecycleManager を定義します。

pom.xml
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>strimzi-test-container</artifactId>
    <version>0.112.0</version>
    <scope>test</scope>
    <exclusions>
        <exclusion>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
build.gradle
testImplementation("io.strimzi:strimzi-test-container:0.112.0") {
    exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;

import java.util.HashMap;
import java.util.Map;

import org.testcontainers.containers.GenericContainer;

import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;

public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {

    private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();

    private GenericContainer<?> registry;

    @Override
    public Map<String, String> start() {
        kafka.start();
        registry = new GenericContainer<>("apicurio/apicurio-registry:3.0.7")
                .withExposedPorts(8080)
                .withEnv("QUARKUS_PROFILE", "prod");
        registry.start();
        Map<String, String> properties = new HashMap<>();
        properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
                "http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v3");
        properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
        return properties;
    }

    @Override
    public void stop() {
        registry.stop();
        kafka.stop();
    }
}
@QuarkusTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
    ...
}

Apicurio Registry 2.x から 3.x への移行

Apicurio Registry 3.x では、スキーマ ID 形式が 8 バイト (long) から 4 バイト (int) 識別子に変わる 破壊的な変更 が導入されています。これは、v2 と v3 のプロデューサー/コンシューマー間のメッセージ互換性に影響します。

スキーマ ID 形式の変更

Apicurio Registry 3.x では、スキーマ ID 形式が 8 バイト (long) から 4 バイト (int) 識別子に変更されました。これは、明示的な設定なしには、v2 で生成されたメッセージを v3 アプリケーションで消費できないこと (およびその逆も) を意味します。

移行シナリオ

新規アプリケーション (既存の v2 メッセージなし): 設定は不要です。v3 のデフォルトが自動的に使用されます。

既存の v2 メッセージの消費: v2 で生成されたメッセージを読み取る必要があるチャネルには、 Legacy8ByteIdHandler を設定します。

# Per-channel configuration for consuming v2 messages
mp.messaging.incoming.my-channel.apicurio.registry.id-handler=io.apicurio.registry.serde.Legacy8ByteIdHandler

# Or configure globally for all channels
mp.messaging.connector.smallrye-kafka.apicurio.registry.id-handler=io.apicurio.registry.serde.Legacy8ByteIdHandler

v2 互換メッセージの生成: ダウンストリームのコンシューマーがまだ v2 を使用している場合は、プロデューサーを設定します。

mp.messaging.outgoing.my-channel.apicurio.registry.id-handler=io.apicurio.registry.serde.Legacy8ByteIdHandler

標準 ID ハンドラー: 固定形式

v2 (8 バイト) と v3 (4 バイト) の両形式は同じマジックバイトを使用します。標準 ID ハンドラー (Legacy8ByteIdHandler および Default4ByteIdHandler) は、設定に基づいて固定数のバイトを読み取ります。メッセージごとの自動検出はありません。

単一のトピックに混合メッセージ (8 バイト ID のものと 4 バイト ID のもの) が含まれている場合、標準 ID ハンドラーを使用するコンシューマーは失敗します。 Legacy8ByteIdHandler で設定されたコンシューマーは常に 8 バイトを読み取り、 Default4ByteIdHandler で設定されたコンシューマーは常に 4 バイトを読み取ります。不一致は破損や逆シリアル化エラーを引き起こします。

標準 ID ハンドラーを使用する場合、段階的な移行が機能するのは以下の場合のみです。

  • 各トピックのメッセージが単一の形式 (v2 または v3 のいずれか一方のみ) であること

  • チャネルごとの設定が各トピックを正しい ID ハンドラーにルーティングすること

同じ Apicurio Registry インスタンスで両方の形式を使用するプロデューサーとコンシューマーを持つことはできますが、同じトピック内でそれらを混在させることはできません。

混合 v2/v3 メッセージを含むトピックの場合、Optimistic フォールバック ID ハンドラー セクションで説明されている OptimisticFallbackIdHandler の使用を検討してください。

既存の v2 メッセージを含むトピックの移行パス

既に v2 メッセージを含むトピックの場合、有効な移行アプローチは次のとおりです。

  1. 停止とドレイン: プロデューサーを停止 → トピックを完全にドレイン → すべてのサービスをアップグレード → v3 設定で再起動

  2. 新規トピック移行: 新しい v3 トピックを作成し、トラフィックをそちらに移行

  3. 一時的な障害の許容: 切り替え期間中の一時的な障害を許容する

  4. Optimistic フォールバックハンドラー: 段階的な移行には OptimisticFallbackIdHandler を使用します (以下を参照)。

Optimistic フォールバック ID ハンドラー

Apicurio Registry は、v2 から v3 への段階的な移行に役立つ OptimisticFallbackIdHandler を提供します。このハンドラーは次のとおりです。

  • 4バイト (v3) の ID を持つ新しいメッセージを 書き込みます

  • 4バイト (v3) と 8バイト (v2) の両方の ID を 読み取ります

# Configure the optimistic fallback handler for migration
mp.messaging.connector.smallrye-kafka.apicurio.registry.id-handler=io.apicurio.registry.serde.OptimisticFallbackIdHandler

# Or per-channel
mp.messaging.incoming.my-channel.apicurio.registry.id-handler=io.apicurio.registry.serde.OptimisticFallbackIdHandler
OptimisticFallbackIdHandler は、4 バイト ID と 8 バイト ID を区別するために次の仮定を置きます: スキーマ ID は 0 より大きく、最大整数値よりも小さいこと。これはほとんどのユースケースで通常当てはまりますが、このハンドラーを使用する前にスキーマ ID がこの制約を満たしていることを確認する必要があります。

このアプローチにより、次のような段階的な移行が可能になります。

  1. まず、コンシューマーを OptimisticFallbackIdHandler でアップグレードします (v2 と v3 の両方のメッセージを読み取ることができます)。

  2. 次に、プロデューサーをアップグレードします (v3 形式で書き込みを開始します)。

  3. すべての v2 メッセージが消費されたら、オプションで Default4ByteIdHandler に切り替えます。

Apicurio Registry 2.x サーバーとの互換性

Apicurio Registry 3.x クライアントライブラリーは、引き続き Apicurio Registry 2.x サーバーで動作します。ただし、Apicurio Registry 2.x は現在、積極的にメンテナンスされていません。

Confluent Schema Registryの使用

Confluent Schema Registry を使用する場合は、quarkus-apicurio-registry-json-schema エクステンションではなく、quarkus-confluent-registry-json-schema エクステンションが必要です。 また、pom.xml / build.gradle ファイルにいくつかの依存関係とカスタム Maven リポジトリーを追加する必要があります。

pom.xml
<dependencies>
    ...
    <!-- the extension -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-confluent-registry-json-schema</artifactId>
    </dependency>
    <!-- Confluent registry libraries use Jakarta REST client -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-rest-client</artifactId>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-json-schema-serializer</artifactId>
        <version>7.2.0</version>
        <exclusions>
            <exclusion>
                <groupId>jakarta.ws.rs</groupId>
                <artifactId>jakarta.ws.rs-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

<repositories>
    <!-- io.confluent:kafka-json-schema-serializer is only available from this repository: -->
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>
build.gradle
repositories {
    ...

    maven {
        url "https://packages.confluent.io/maven/"
    }
}

dependencies {
    ...

    implementation("io.quarkus:quarkus-confluent-registry-json-schema")

    // Confluent registry libraries use Jakarta REST client
    implementation("io.quarkus:quarkus-rest-client")

    implementation("io.confluent:kafka-json-schema-serializer:7.2.0") {
        exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
    }
}

JVM モードでは、io.confluent:kafka-json-schema-serializer の任意のバージョンを使用できます。 ネイティブモードでは、Quarkus はバージョン 6.2.x7.0.x7.1.x7.2.x7.3.x をサポートします。

バージョン 7.4.x および 7.5.x では、Confluent スキーマシリアライザーに問題があるため、別の依存関係を追加する必要があります:

pom.xml
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-csv</artifactId>
</dependency>
build.gradle
dependencies {
    implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}

それ以外のバージョンでは、ネイティブコンフィギュレーションの調整が必要な場合があります。

関連コンテンツ