スキーマレジストリとJSONスキーマと共にApache Kafkaを使用する
このガイドでは、Quarkus アプリケーションで Apache Kafka、https://json-schema.org/[JSON スキーマ] でシリアライズされたレコードを使用し、スキーマレジストリー (Confluent Schema Registry や Apicurio Registry など) に接続する方法を説明します。
特に Kafka や Kafka in Quarkus を使い慣れていない場合は、最初に リアクティブメッセージングでの Apache Kafka の使用 ガイドを確認することをお勧めします。
前提条件
このガイドを完成させるには、以下が必要です:
-
ざっと 30 minutes
-
IDE
-
JDK 17+がインストールされ、
JAVA_HOME
が適切に設定されていること -
Apache Maven 3.9.9
-
Docker と Docker Compose、または Podman 、および Docker Compose
-
使用したい場合は、 Quarkus CLI
-
ネイティブ実行可能ファイルをビルドしたい場合、MandrelまたはGraalVM(あるいはネイティブなコンテナビルドを使用する場合はDocker)をインストールし、 適切に設定していること
アーキテクチャ
このガイドでは、REST リソース、つまりムービー DTO を消費して Kafka トピックに配置する MovieResource
を実装します。
次に、同じトピックからメッセージを消費および収集するコンシューマーを実装します。収集されたメッセージは、 Server-Sent Events を介して別のリソースである ConsumedMovieResource
によって公開されます。
Movies は、JSON スキーマを使用してシリアライズおよびデシリアライズされます。 Movie を記述するスキーマは、Apicurio Registry に保存されます。 Confluent JSON Schema serde および Confluent Schema Registry を使用している場合も、同じ概念が適用されます。
ソリューション
次の章で紹介する手順に沿って、ステップを踏んでアプリを作成することをお勧めします。ただし、完成した例にそのまま進んでも構いません。
Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.git
、 アーカイブ をダウンロードします。
ソリューションは kafka-json-schema-quickstart
ディレクトリー にあります。
Mavenプロジェクトの作成
まず、新しいプロジェクトが必要です。以下のコマンドで新規プロジェクトを作成します。
Windowsユーザーの場合:
-
cmdを使用する場合、(バックスラッシュ
\
を使用せず、すべてを同じ行に書かないでください)。 -
Powershellを使用する場合は、
-D
パラメータを二重引用符で囲んでください。例:"-DprojectArtifactId=kafka-json-schema-quickstart"
Confluent Schema Registry を使用する場合、 |
JSON スキーマ
JSON スキーマはデータのシリアライズシステムです。データ構造はスキーマを使用して記述されます。
最初に Movie
構造を記述するスキーマを作成する必要があります。
レコード (Kafka メッセージ) のスキーマを使用して、src/main/resources/json-schema.json
というファイルを作成します。
{
"$id": "https://example.com/person.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Movie",
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "The movie's title."
},
"year": {
"type": "integer",
"description": "The movie's year."
}
}
}
JSON スキーマ定義から Java クラスを自動生成することはできないことに注意してください。したがって、シリアライズプロセスで使用できるように、次のように Java クラスを定義する必要があります。
package org.acme.kafka;
public class Movie {
private String title;
private Integer year;
public Movie() {
}
public Movie(String title, Integer year) {
this.title = title;
this.year = year;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public Integer getYear() {
return year;
}
public void setYear(Integer year) {
this.year = year;
}
}
Movie
プロデューサー
スキーマを定義したら、MovieResource
を実装できます。
MovieResource
を開き、Movie
DTO の Emitter
を挿入し、Movie
を消費する @POST
メソッドを実装して Emitter
経由で送信してください。
package org.acme.kafka;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
@Path("/movies")
public class MovieResource {
private static final Logger LOGGER = Logger.getLogger(MovieResource.class);
@Channel("movies")
Emitter<Movie> emitter;
@POST
public Response enqueueMovie(Movie movie) {
LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
emitter.send(movie);
return Response.accepted().build();
}
}
ここで、movies
チャネル (Emitter
がこのチャネルに出力) を Kafka トピックに マップ し、このチャネルで使用されるスキーマも マップ する必要があります。
これを実現するには、application.properties
ファイルを編集し、次のコンテンツを追加します。
# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka
# disable automatic detection of the serializers
quarkus.messaging.kafka.serializer-autodetection.enabled=false
# Set the value serializer for the channel `movies`
mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer
# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies
# set the schema to be used for the channel `movies`. Note that this property accepts just a name or a path and the serializer will look for the resource on the classpath.
mp.messaging.outgoing.movies.apicurio.registry.artifact.schema.location=json-schema.json
# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true
Avro シリアライゼーションとは異なり、自動検出 は JSON スキーマでは使用できないため、 Confluent Schema Registry を使用する場合、値 |
Movie
コンシューマー
Movie
データを含むレコードを Kafka に書き込むことができます。
そのデータは JSON スキーマを使用してシリアライズされます。
では、それらのコンシューマーを実装してみましょう。
movies-from-kafka
チャネルからの Movie
メッセージを消費し、Server-Sent Event を介して公開する ConsumedMovieResource
を作成します。
package org.acme.kafka;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;
import io.smallrye.mutiny.Multi;
@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {
@Channel("movies-from-kafka")
Multi<Movie> movies;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> stream() {
return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
}
}
アプリケーションのコードの最後のビットは、application.properties
の movies-from-kafka
チャネルの設定です。
# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka
# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies
# set the deserializer for the incoming channel
mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer
# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false
mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest
ここでも、Avro とは異なり、 Confluent Schema Registry を使用する場合は、値 ´io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer´ を使用して |
アプリケーションの実行
アプリケーションを開発モードで実行します。
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
Dev Services により、Kafka ブローカーと Apicurio Registry インスタンスは自動的に開始されます。詳細については、Dev Services for Kafka および Dev Services for Apicurio Registry を参照してください。
スキーマレジストリのURLをどこにも設定していないことに気づいたかもしれません。 これは、Dev Services for Apicurio Registryによって、Quarkus MessagingのすべてのKafkaチャンネルが、自動的に起動されたレジストリインスタンスを使用するように設定されるためです。 ネイティブ API に加えて、Apicurio Registry は Confluent Schema Registry と API 互換のエンドポイントも公開します。したがって、この自動設定は、Apicurio Registry serde と Confluent Schema Registry serde の両方で機能します。 ただし、Confluent Schema Registry 自体の実行に対する Dev Services のサポートはないことに注意してください。Confluent Schema Registry の実行インスタンスを使用する場合は、その URL を Kafka ブローカーの URL と併せて設定します。
|
2 番目のターミナルで、ConsumedMovieResource
リソースを curl
でクエリーします。
curl -N http://localhost:8080/consumed-movies
3番目のターミナルで、いくつかの movie をポストします。
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Shawshank Redemption","year":1994}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Godfather","year":1972}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Dark Knight","year":2008}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"12 Angry Men","year":1957}' \
http://localhost:8080/movies
2 番目のターミナルに表示されているものを確認します。次の行に何かが表示されているはずです。
data:'The Shawshank Redemption' from 1994
data:'The Godfather' from 1972
data:'The Dark Knight' from 2008
data:'12 Angry Men' from 1957
JVM またはネイティブモードでの実行
開発モードまたはテストモードで実行していない場合は、独自の Kafka ブローカーと Apicurio Registry を起動する必要があります。そのための最も簡単な方法は、docker-compose
を使用して適切なコンテナーを起動することです。
Confluent Schema Registry を使用している場合は、すでに Kafka ブローカーと Confluent Schema Registry インスタンスが実行および設定されています。ここに記した`docker-compose` や Apicurio Registry の設定に関する説明は無視してかまいません。 |
プロジェクトのルートに、次の内容の docker-compose.yaml
ファイルを作成します。
version: '2'
services:
zookeeper:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
schema-registry:
image: apicurio/apicurio-registry-mem:2.4.2.Final
ports:
- 8081:8080
depends_on:
- kafka
environment:
QUARKUS_PROFILE: prod
アプリケーションを開始する前に、まず Kafka ブローカーと Apicurio Registry を起動します。
docker-compose up
コンテナを停止するには、docker-compose down を使用します。docker-compose rm を使用してコンテナをクリーンアップすることもできます。
|
次のコマンドでアプリケーションをビルドできます。
quarkus build
./mvnw install
./gradlew build
その後、次のコマンドを使用して JVM モードで実行します。
java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar
デフォルトでは、アプリケーションは localhost:9092 でリッスンしている Kafka ブローカーに接続しようとします。ブートストラップサーバーは、java -Dkafka.bootstrap.servers=... -jar target/quarkus-app/quarkus-run.jar を使用して設定できます。
|
コマンドラインでレジストリー URL を指定する方法は便利ではないため、prod
プロファイルに対してのみ設定プロパティーを追加します。
%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2
以下でネイティブ実行可能ファイルをビルドできます。
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
次に、それを実行します。
./target/kafka-json-schema-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092
アプリケーションのテスト
上記のとおり、Dev Services for Kafka と Apicurio Registry は、Kafka ブローカーと Apicurio Registry インスタンスを開発モードおよびテスト用に自動で開始し、設定します。したがって、Kafka および Apicurio Registry を自分で設定する必要はなく、テストの作成に集中できます。
まず、REST クライアントと Awaitility のテスト依存関係をビルドファイルに追加します。
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")
テストでは、ムービーをループで送信し、送信したものを ConsumedMovieResource
が返すかどうかを確認します。
package org.acme.kafka;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
@QuarkusTest
public class MovieResourceTest {
@TestHTTPResource("/consumed-movies")
URI consumedMovies;
@Test
public void testHelloEndpoint() throws InterruptedException {
// create a client for `ConsumedMovieResource` and collect the consumed resources in a list
Client client = ClientBuilder.newClient();
WebTarget target = client.target(consumedMovies);
List<String> received = new CopyOnWriteArrayList<>();
SseEventSource source = SseEventSource.target(target).build();
source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));
// in a separate thread, feed the `MovieResource`
ExecutorService movieSender = startSendingMovies();
source.open();
// check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
await().atMost(5, SECONDS).until(() -> received.size() >= 2);
assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
"'12 Angry Men' from 1957"));
source.close();
// shutdown the executor that is feeding the `MovieResource`
movieSender.shutdownNow();
movieSender.awaitTermination(5, SECONDS);
}
private ExecutorService startSendingMovies() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
while (true) {
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
.when()
.post("/movies")
.then()
.statusCode(202);
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"12 Angry Men\",\"year\":1957}")
.when()
.post("/movies")
.then()
.statusCode(202);
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
break;
}
}
});
return executorService;
}
}
プロジェクトと一緒に生成された MovieResourceTest を変更しました。このテストクラスには、ネイティブ実行可能ファイルに対して同じテストを実行するサブクラ ス`NativeMovieResourceIT` があります。実行するには、次を実行します。
|
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
手動セットアップ
Dev Services を使用できず、Kafka ブローカーと Apicurio Registry インスタンスを手動で開始する場合は、QuarkusTestResourceLifecycleManager を定義します。
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<version>0.105.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
testImplementation("io.strimzi:strimzi-test-container:0.105.0") {
exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;
import java.util.HashMap;
import java.util.Map;
import org.testcontainers.containers.GenericContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;
public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {
private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();
private GenericContainer<?> registry;
@Override
public Map<String, String> start() {
kafka.start();
registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.4.2.Final")
.withExposedPorts(8080)
.withEnv("QUARKUS_PROFILE", "prod");
registry.start();
Map<String, String> properties = new HashMap<>();
properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
"http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2");
properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
return properties;
}
@Override
public void stop() {
registry.stop();
kafka.stop();
}
}
@QuarkusTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
...
}
互換性のあるバージョンのApicurio Registryの使用
quarkus-apicurio-registry-json-schema
エクステンションは、最近のバージョンの Apicurio Registry クライアントに依存しています。
Apicurio Registry サーバーおよびクライアントのほとんどのバージョンには後方互換性があります。
場合によっては、Serdes で使用するクライアントがサーバーと互換性があることを確認する必要があります。
例えば、Apicurio Dev Serviceで、バージョン 2.1.5.Final
を使用するようにイメージ名を設定した場合:
quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.1.5.Final
apicurio-registry-serdes-json-schema-serde
の依存関係と REST クライアント apicurio-common-rest-client-vertx
の依存関係が、互換性のあるバージョンに設定されていることを確認する必要があります。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-json-schema</artifactId>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-json-schema-serde</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-json-schema-serde</artifactId>
<version>2.1.5.Final</version>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-jdk</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
<version>0.1.5.Final</version>
</dependency>
dependencies {
implementation(platform("io.quarkus.platform:quarkus-bom:2.12.3.Final"))
...
implementation("io.quarkus:quarkus-apicurio-registry-json-schema")
implementation("io.apicurio:apicurio-registry-serdes-json-schema-serde") {
exclude group: "io.apicurio", module: "apicurio-common-rest-client-jdk"
exclude group: "io.apicurio", module: "apicurio-registry-client"
exclude group: "io.apicurio", module: "apicurio-registry-common"
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-client") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-common") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-common-rest-client-vertx") {
version {
strictly "0.1.5.Final"
}
}
}
apicurio-registry-client
および apicurio-common-rest-client-vertx
の既知の旧互換バージョンは以下の通りです。
-
apicurio-registry-client
2.1.5.Final と 0.1.5.Finalapicurio-common-rest-client-vertx
-
apicurio-registry-client
2.3.1.Final と 0.1.13.Finalapicurio-common-rest-client-vertx
Confluent Schema Registryの使用
Confluent Schema Registry を使用する場合は、quarkus-apicurio-registry-json-schema
エクステンションではなく、quarkus-confluent-registry-json-schema
エクステンションが必要です。
また、pom.xml
/ build.gradle
ファイルにいくつかの依存関係とカスタム Maven リポジトリーを追加する必要があります。
<dependencies>
...
<!-- the extension -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema</artifactId>
</dependency>
<!-- Confluent registry libraries use Jakarta REST client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<version>7.2.0</version>
<exclusions>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<!-- io.confluent:kafka-json-schema-serializer is only available from this repository: -->
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
repositories {
...
maven {
url "https://packages.confluent.io/maven/"
}
}
dependencies {
...
implementation("io.quarkus:quarkus-confluent-registry-json-schema")
// Confluent registry libraries use Jakarta REST client
implementation("io.quarkus:quarkus-rest-client")
implementation("io.confluent:kafka-json-schema-serializer:7.2.0") {
exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
}
}
JVM モードでは、io.confluent:kafka-json-schema-serializer
の任意のバージョンを使用できます。
ネイティブモードでは、Quarkus はバージョン 6.2.x
、7.0.x
、7.1.x
、7.2.x
、7.3.x
をサポートします。
バージョン 7.4.x
および 7.5.x
では、Confluent スキーマシリアライザーに問題があるため、別の依存関係を追加する必要があります:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</dependency>
dependencies {
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}
それ以外のバージョンでは、ネイティブコンフィギュレーションの調整が必要な場合があります。