Schema RegistryとAvroと共にApache Kafkaを使用
このガイドでは、Quarkus アプリケーションが Apache Kafka、 Avro でシリアライズされたレコードを使用し、スキーマレジストリー (Confluent スキーマレジストリー または Apicurio レジストリー) に接続する方法を説明しています。
特に Kafka や Kafka in Quarkus を使い慣れていない場合は、最初に リアクティブメッセージングでの Apache Kafka の使用 ガイドを確認することをお勧めします。
前提条件
このガイドを完成させるには、以下が必要です:
-
ざっと 30 minutes
-
IDE
-
JDK 17+がインストールされ、
JAVA_HOMEが適切に設定されていること -
Apache Maven 3.9.12
-
Docker と Docker Compose、または Podman 、および Docker Compose
-
使用したい場合は、 Quarkus CLI
-
ネイティブ実行可能ファイルをビルドしたい場合、MandrelまたはGraalVM(あるいはネイティブなコンテナビルドを使用する場合はDocker)をインストールし、 適切に設定していること
アーキテクチャ
このガイドでは、REST リソース、つまりムービー DTO を消費して Kafka トピックに配置する MovieResource を実装します。
次に、同じトピックからメッセージを消費および収集するコンシューマーを実装します。収集されたメッセージは、 Server-Sent Events を介して別のリソースである ConsumedMovieResource によって公開されます。
Movies は、Avro を使用してシリアライズおよびデシリアライズされます。Movie を記述するスキーマは、Apicurio Registry に保存されます。Confluent Avro serde および Confluent Schema Registry を使用している場合も同じ概念が適用されます。
Maven プロジェクトの作成
まず、新しいプロジェクトが必要です。以下のコマンドで新規プロジェクトを作成します。
Windowsユーザーの場合:
-
cmdを使用する場合、(バックスラッシュ
\を使用せず、すべてを同じ行に書かないでください)。 -
Powershellを使用する場合は、
-Dパラメータを二重引用符で囲んでください。例:"-DprojectArtifactId=kafka-avro-schema-quickstart"
|
Confluent Schema Registry を使用する場合、 |
Avro スキーマ
Apache Avro はデータシリアル化システムです。データ構造はスキーマを使用して記述されます。最初に Movie 構造を記述するスキーマを作成する必要があります。レコードのスキーマ (Kafka メッセージ) を使用して、 src/main/avro/movie.avsc というファイルを作成します。
{
"namespace": "org.acme.kafka.quarkus",
"type": "record",
"name": "Movie",
"fields": [
{
"name": "title",
"type": "string"
},
{
"name": "year",
"type": "int"
}
]
}
次のコマンドでプロジェクトをビルドする場合:
quarkus build
./mvnw install
./gradlew build
movies.avsc は、 target/generated-sources/avsc ディレクトリーに配置された Movie.java ファイルにコンパイルされます。
Avro 構文とサポートされているタイプの詳細については、 Avro 仕様 を参照してください。
Quarkus を使用すると、Avro スキーマを処理するために特定の Maven プラグインを使用する必要はありません。これはすべて、 quarkus-avro エクステンションによって行われます。
|
次のコマンドでプロジェクトを実行する場合:
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
スキーマファイルに加えた変更は、生成された Java ファイルに自動的に適用されます。
Movie プロデューサー
スキーマを定義したら、 MovieResource を実装できます。
MovieResource を開き、 Movie DTO の Emitter を挿入し、 Movie を消費する @POST メソッドを実装して Emitter 経由で送信してください。
package org.acme.kafka;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
@Path("/movies")
public class MovieResource {
private static final Logger LOGGER = Logger.getLogger(MovieResource.class);
@Channel("movies")
Emitter<Movie> emitter;
@POST
public Response enqueueMovie(Movie movie) {
LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
emitter.send(movie);
return Response.accepted().build();
}
}
ここで、 movies チャネル (Emitter はこのチャネルに出力) を Kafka トピックに マップ する必要があります。これを実現するには、 application.properties ファイルを編集し、次のコンテンツを追加します。
# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka
# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies
# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true
|
Confluent Schema Registry を使用する場合は、 |
Movie コンシューマー
Movie データを含むレコードを Kafka に書き込むことができます。そのデータは Avro を使用してシリアライズされます。では、それらのコンシューマーを実装してみましょう。
movies-from-kafka チャネルからの Movie メッセージを消費し、Server-Sent Event を介して公開する ConsumedMovieResource を作成します。
package org.acme.kafka;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;
import io.smallrye.mutiny.Multi;
@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {
@Channel("movies-from-kafka")
Multi<Movie> movies;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> stream() {
return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
}
}
アプリケーションのコードの最後のビットは、 application.properties の movies-from-kafka チャネルの設定です。
# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka
# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies
# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false
mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest
|
Confluent Schema Registry を使用する場合は、 |
アプリケーションの実行
アプリケーションを開発モードで実行します。
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
Dev Services により、Kafka ブローカーと Apicurio Registry インスタンスは自動的に開始されます。詳細については、Dev Services for Kafka および Dev Services for Apicurio Registry を参照してください。
|
スキーマレジストリのURLをどこにも設定していないことに気づいたかもしれません。 これは、Dev Services for Apicurio Registryによって、Quarkus MessagingのすべてのKafkaチャンネルが、自動的に起動されたレジストリインスタンスを使用するように設定されるためです。 ネイティブ API に加えて、Apicurio Registry は Confluent Schema Registry と API 互換のエンドポイントも公開します。したがって、この自動設定は、Apicurio Registry serde と Confluent Schema Registry serde の両方で機能します。 ただし、Confluent Schema Registry 自体の実行に対する Dev Services のサポートはないことに注意してください。Confluent Schema Registry の実行インスタンスを使用する場合は、その URL を Kafka ブローカーの URL と併せて設定します。
|
2 番目のターミナルで、 ConsumedMovieResource リソースを curl でクエリーします。
curl -N http://localhost:8080/consumed-movies
3番目のターミナルで、いくつかの movie をポストします。
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Shawshank Redemption","year":1994}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Godfather","year":1972}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Dark Knight","year":2008}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"12 Angry Men","year":1957}' \
http://localhost:8080/movies
2 番目のターミナルに表示されているものを確認します。次の行に何かが表示されているはずです。
data:'The Shawshank Redemption' from 1994
data:'The Godfather' from 1972
data:'The Dark Knight' from 2008
data:'12 Angry Men' from 1957
JVM またはネイティブモードでの実行
開発モードまたはテストモードで実行していない場合は、独自の Kafka ブローカーと Apicurio Registry を起動する必要があります。そのための最も簡単な方法は、 docker-compose を使用して適切なコンテナーを起動することです。
| Confluent Schema Registry を使用している場合は、すでに Kafka ブローカーと Confluent Schema Registry インスタンスが実行および設定されています。ここに記した`docker-compose` や Apicurio Registry の設定に関する説明は無視してかまいません。 |
プロジェクトのルートに、次の内容の docker-compose.yaml ファイルを作成します。
services:
kafka:
image: quay.io/strimzi/kafka:latest-kafka-4.1.0
command: [
"sh", "-c",
"./bin/kafka-storage.sh format --standalone -t $$(./bin/kafka-storage.sh random-uuid) -c ./config/server.properties && ./bin/kafka-server-start.sh ./config/server.properties"
]
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
schema-registry:
image: quay.io/apicurio/apicurio-registry:3.1.7
ports:
- 8081:8080
depends_on:
- kafka
environment:
QUARKUS_PROFILE: prod
アプリケーションを開始する前に、まず Kafka ブローカーと Apicurio Registry を起動します。
docker-compose up
コンテナを停止するには、 docker-compose down を使用します。 docker-compose rm を使用してコンテナをクリーンアップすることもできます。
|
次のコマンドでアプリケーションをビルドできます。
quarkus build
./mvnw install
./gradlew build
その後、次のコマンドを使用して JVM モードで実行します。
java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v3 -jar target/quarkus-app/quarkus-run.jar
デフォルトでは、アプリケーションは localhost:9092 でリッスンしている Kafka ブローカーに接続しようとします。ブートストラップサーバーは、 java -Dkafka.bootstrap.servers=... -jar target/quarkus-app/quarkus-run.jar を使用して設定できます。
|
コマンドラインでレジストリー URL を指定する方法は便利ではないため、 prod プロファイルに対してのみ設定プロパティーを追加します。
%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v3
以下でネイティブ実行可能ファイルをビルドできます。
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
次に、それを実行します。
./target/kafka-avro-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092
アプリケーションのテスト
上記のとおり、Dev Services for Kafka と Apicurio Registry は、Kafka ブローカーと Apicurio Registry インスタンスを開発モードおよびテスト用に自動で開始し、設定します。したがって、Kafka および Apicurio Registry を自分で設定する必要はなく、テストの作成に集中できます。
まず、REST クライアントと Awaitility のテスト依存関係をビルドファイルに追加します。
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")
テストでは、ムービーをループで送信し、送信したものを ConsumedMovieResource が返すかどうかを確認します。
package org.acme.kafka;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
@QuarkusTest
public class MovieResourceTest {
@TestHTTPResource("/consumed-movies")
URI consumedMovies;
@Test
public void testHelloEndpoint() throws InterruptedException {
// create a client for `ConsumedMovieResource` and collect the consumed resources in a list
Client client = ClientBuilder.newClient();
WebTarget target = client.target(consumedMovies);
List<String> received = new CopyOnWriteArrayList<>();
SseEventSource source = SseEventSource.target(target).build();
source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));
// in a separate thread, feed the `MovieResource`
ExecutorService movieSender = startSendingMovies();
source.open();
// check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
await().atMost(5, SECONDS).until(() -> received.size() >= 2);
assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
"'12 Angry Men' from 1957"));
source.close();
// shutdown the executor that is feeding the `MovieResource`
movieSender.shutdownNow();
movieSender.awaitTermination(5, SECONDS);
}
private ExecutorService startSendingMovies() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
while (true) {
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
.when()
.post("/movies")
.then()
.statusCode(202);
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"12 Angry Men\",\"year\":1957}")
.when()
.post("/movies")
.then()
.statusCode(202);
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
break;
}
}
});
return executorService;
}
}
プロジェクトと一緒に生成された MovieResourceTest を変更しました。このテストクラスには、ネイティブ実行可能ファイルに対して同じテストを実行するサブクラ ス`NativeMovieResourceIT` があります。実行するには、次を実行します。
|
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
手動セットアップ
Dev Services を使用できず、Kafka ブローカーと Apicurio Registry インスタンスを手動で開始する場合は、QuarkusTestResourceLifecycleManager を定義します。
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<version>0.112.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
testImplementation("io.strimzi:strimzi-test-container:0.112.0") {
exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;
import java.util.HashMap;
import java.util.Map;
import org.testcontainers.containers.GenericContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;
public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {
private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();
private GenericContainer<?> registry;
@Override
public Map<String, String> start() {
kafka.start();
registry = new GenericContainer<>("apicurio/apicurio-registry:3.0.7")
.withExposedPorts(8080)
.withEnv("QUARKUS_PROFILE", "prod");
registry.start();
Map<String, String> properties = new HashMap<>();
properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
"http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v3");
properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
return properties;
}
@Override
public void stop() {
registry.stop();
kafka.stop();
}
}
@QuarkusTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
...
}
Migrating from Apicurio Registry 2.x to 3.x
| Apicurio Registry 3.x introduces a breaking change in schema ID format from 8-byte (long) to 4-byte (int) identifiers. This affects message compatibility between v2 and v3 producers/consumers. |
Schema ID Format Change
Apicurio Registry 3.x changed the schema ID format from 8-byte (long) to 4-byte (int) identifiers. This means messages produced with v2 cannot be consumed by v3 applications (and vice versa) without explicit configuration.
Migration Scenarios
New applications (no existing v2 messages): No configuration needed. The v3 defaults will be used automatically.
Consuming existing v2 messages: Configure the Legacy8ByteIdHandler for channels that need to read v2-produced messages:
# Per-channel configuration for consuming v2 messages
mp.messaging.incoming.my-channel.apicurio.registry.id-handler=io.apicurio.registry.serde.Legacy8ByteIdHandler
# Or configure globally for all channels
mp.messaging.connector.smallrye-kafka.apicurio.registry.id-handler=io.apicurio.registry.serde.Legacy8ByteIdHandler
Producing v2-compatible messages: If downstream consumers still use v2, configure the producer:
mp.messaging.outgoing.my-channel.apicurio.registry.id-handler=io.apicurio.registry.serde.Legacy8ByteIdHandler
Standard ID Handlers: Fixed Format
Both v2 (8-byte) and v3 (4-byte) formats use the same magic byte.
The standard ID handlers (Legacy8ByteIdHandler and Default4ByteIdHandler) read a fixed number of bytes based on configuration—there is no per-message auto-detection.
If a single topic contains mixed messages (some with 8-byte IDs, some with 4-byte IDs), consumers using standard ID handlers will fail.
A consumer configured with Legacy8ByteIdHandler always reads 8 bytes; one with Default4ByteIdHandler always reads 4 bytes.
Mismatches cause corruption or deserialization errors.
|
With standard ID handlers, gradual migration only works when:
-
Each topic has messages in ONE format (either v2 or v3, not both)
-
Per-channel configuration routes each topic to the correct ID handler
You can have producers and consumers using both formats in the same Apicurio Registry instance, but not mixing them in the same topic.
For topics with mixed v2/v3 messages, consider using the OptimisticFallbackIdHandler described in the Optimistic Fallback ID Handler section.
|
Migration Paths for Topics with Existing v2 Messages
For topics that already contain v2 messages, valid migration approaches are:
-
Stop and drain: Stop producers → drain topic completely → upgrade all services → restart with v3 configuration
-
New topic migration: Create a new v3 topic and migrate traffic to it
-
Accept transient failures: Accept transient failures during the switchover period
-
Optimistic fallback handler: Use the
OptimisticFallbackIdHandlerfor gradual migration (see below)
Optimistic Fallback ID Handler
Apicurio Registry provides an OptimisticFallbackIdHandler that can help with gradual migration from v2 to v3.
This handler:
-
Writes new messages with 4-byte (v3) IDs
-
Reads both 4-byte (v3) and 8-byte (v2) IDs
# Configure the optimistic fallback handler for migration
mp.messaging.connector.smallrye-kafka.apicurio.registry.id-handler=io.apicurio.registry.serde.OptimisticFallbackIdHandler
# Or per-channel
mp.messaging.incoming.my-channel.apicurio.registry.id-handler=io.apicurio.registry.serde.OptimisticFallbackIdHandler
The OptimisticFallbackIdHandler makes the following assumption to distinguish between 4-byte and 8-byte IDs: schema IDs are greater than 0 and smaller than the maximum integer value.
This is typically true for most use cases, but you should verify your schema IDs meet this constraint before using this handler.
|
This approach enables a gradual migration where:
-
Upgrade consumers first with
OptimisticFallbackIdHandler(they can read both v2 and v3 messages) -
Then upgrade producers (they start writing v3 format)
-
Once all v2 messages are consumed, optionally switch to
Default4ByteIdHandler
Confluent Schema Registryの使用
Confluent Schema Registry を使用する場合、 quarkus-apicurio-registry-avro エクステンションの代わりに、 quarkus-confluent-registry-avro エクステンションと、さらにいくつかの依存関係が必要です。また、Confluent Maven リポジトリーを pom.xml / build.gradle ファイルに追加する必要があります。
<dependencies>
...
<!-- the extension -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
</dependency>
<!-- Confluent registry libraries use Jakarta REST client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.2.0</version>
<exclusions>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<!-- io.confluent:kafka-avro-serializer is only available from this repository: -->
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
repositories {
...
maven {
url "https://packages.confluent.io/maven/"
}
}
dependencies {
...
implementation("io.quarkus:quarkus-confluent-registry-avro")
// Confluent registry libraries use Jakarta REST client
implementation("io.quarkus:quarkus-rest-client")
implementation("io.confluent:kafka-avro-serializer:7.2.0") {
exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
}
}
JVMモードでは、 io.confluent:kafka-avro-serializer の任意のバージョンを使用できます。
ネイティブモードでは、Quarkusは以下のバージョンをサポートしています: 6.2.x 7.0.x 7.1.x 7.2.x 7.3.x 。
バージョン 7.4.x 以降では、Confluent スキーマシリアライザーの問題のため、別の依存関係を追加する必要があります:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</dependency>
dependencies {
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}
それ以外のバージョンでは、ネイティブコンフィギュレーションの調整が必要な場合があります。
Avro コード生成の詳細
このガイドでは、Quarkus コード生成メカニズムを使用して、Avro スキーマから Java ファイルを生成しました。
内部で、メカニズムは org.apache.avro:avro-compiler を使用します。
次の設定プロパティーを使用して、動作を変更できます。
-
avro.codegen.[avsc|avdl|avpr].imports- 最初にコンパイルされるべきファイルやディレクトリのリストで、後にコンパイルされるスキーマからインポートできるようにします。インポートされたファイルは互いに参照しあってはならないことに注意してください。すべてのパスはsrc/[main|test]/avroディレクトリ、またはビルドシステムによって設定された任意のソースディレクトリのavroサブディレクトリからの相対パスでなければなりません。カンマで区切られたリストとして渡されます。 -
avro.codegen.stringType- Avro 文字列に使用する Java タイプ。CharSequence、String、Utf8のいずれかである可能性があります。デフォルトはStringです。 -
avro.codegen.createOptionalGetters- 要求されたタイプの Optional を返す`getOptional…` メソッドを生成できるようにします。デフォルトはfalseです。 -
avro.codegen.enableDecimalLogicalType- 10 進型に Java クラスを使用するかどうかを決定します。デフォルトはfalseです。 -
avro.codegen.createSetters- レコードのフィールドにセッターを作成するかどうかを決定します。デフォルトはfalseです。 -
avro.codegen.gettersReturnOptional- 要求されたタイプの Optional を返すget…メソッドを生成できるようにします。デフォルトはfalseです。 -
avro.codegen.optionalGettersForNullableFieldsOnlyは、gettersReturnOptionalオプションと連携して機能します。設定されている場合、Optionalゲッターは null 許容フィールドに対してのみ生成されます。フィールドが必須の場合、通常のゲッターが生成されます。デフォルトはfalseです。
さらに詳しく
-
Quarkus での Kafka、Schema Registry、Avro の使用方法 - ガイドの基となるブログ投稿。Avro の紹介とスキーマレジストリーの概念について説明しています。