The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Apache Kafka リファレンスガイド

このガイドでは、Quarkus アプリケーションが SmallRye Reactive Messaging を利用して Apache Kafka とやりとりする仕組みを説明します。

1. はじめに

Apache Kafkaは、人気の高いオープンソースの分散型イベントストリーミングプラットフォームです。高性能なデータパイプライン、ストリーミング分析、データ統合、ミッションクリティカルなアプリケーションなどによく利用されています。メッセージキューやエンタープライズメッセージングプラットフォームに似ており、以下のことが可能です。

  • レコード と呼ばれるイベントのストリームを 発行 (書き込み)したり、 購読 (読み込み)したりすることができます。

  • 記録のストリームを トピック 内に永続的かつ確実に *保存し*ます。

  • 記録のストリームを発生時または遡及的に 処理 します。

そして、これらの機能はすべて、分散型で、拡張性が高く、弾力性があり、耐障害性があり、安全な方法で提供されます。

2. Apache KafkaのためのQuarkus Extension

Quarkus provides support for Apache Kafka through SmallRye Reactive Messaging framework. Based on Eclipse MicroProfile Reactive Messaging specification 2.0, it proposes a flexible programming model bridging CDI and event-driven.

This guide provides an in-depth look on Apache Kafka and SmallRye Reactive Messaging framework. For a quick start take a look at Getting Started to SmallRye Reactive Messaging with Apache Kafka.

You can add the smallrye-reactive-messaging-kafka extensions to your project by running the following command in your project base directory:

CLI
quarkus extension add 'smallrye-reactive-messaging-kafka'
Maven
./mvnw quarkus:add-extension -Dextensions="smallrye-reactive-messaging-kafka"
Gradle
./gradlew addExtension --extensions="smallrye-reactive-messaging-kafka"

これにより、 pom.xml に以下が追加されます:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-smallrye-reactive-messaging-kafka")

The extension includes kafka-clients version 3.1.0 as a transitive dependency and is compatible with Kafka brokers version 2.x.

3. Smallrye Kafka コネクターの設定

Because Smallrye Reactive Messaging framework supports different messaging backends like Apache Kafka, AMQP, Apache Camel, JMS, MQTT, etc., it employs a generic vocabulary:

  • Applications send and receive messages. A message wraps a payload and can be extended with some metadata. With the Kafka connector, a message corresponds to a Kafka record.

  • Messages transit on channels. Application components connect to channels to publish and consume messages. The Kafka connector maps channels to Kafka topics.

  • Channels are connected to message backends using connectors. Connectors are configured to map incoming messages to a specific channel (consumed by the application) and collect outgoing messages sent to a specific channel. Each connector is dedicated to a specific messaging technology. For example, the connector dealing with Kafka is named smallrye-kafka.

A minimal configuration for the Kafka connector with an incoming channel looks like the following:

%prod.kafka.bootstrap.servers=kafka:9092 (1)
mp.messaging.incoming.prices.connector=smallrye-kafka (2)
1 Configure the broker location for the production profile. You can configure it globally or per channel using mp.messaging.incoming.$channel.bootstrap.servers property. In dev mode and when running tests, Dev Services for Kafka automatically starts a Kafka broker. When not provided this property defaults to localhost:9092.
2 Configure the connector to manage the prices channel. By default, the topic name is same as the channel name. You can configure the topic attribute to override it.
The %prod prefix indicates that the property is only used when the application runs in prod mode (so not in dev or test). Refer to the Profile documentation for further details.
Connector auto-attachment

If you have a single connector on your classpath, you can omit the connector attribute configuration. Quarkus automatically associates orphan channels to the (unique) connector found on the classpath. Orphans channels are outgoing channels without a downstream consumer or incoming channels without an upstream producer.

This auto-attachment can be disabled using:

quarkus.reactive-messaging.auto-connector-attachment=false

4. Kafkaからのメッセージの受信

Continuing from the previous minimal configuration, your Quarkus application can receive message payload directly:

import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

There are several other ways your application can consume incoming messages:

Message
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
    // access record metadata
    var metadata = msg.getMetadata(IncomingKafkaRecordMetadata.class).orElseThrow();
    // process the message payload.
    double price = msg.getPayload();
    // Acknowledge the incoming message (commit the offset)
    return msg.ack();
}

The Message type lets the consuming method access the incoming message metadata and handle the acknowledgment manually. We’ll explore different acknowledgment strategies in コミット戦略.

If you want to access the Kafka record objects directly, use:

ConsumerRecord
@Incoming("prices")
public void consume(ConsumerRecord<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
    String topic = record.topic();
    int partition = record.partition();
    // ...
}

ConsumerRecord is provided by the underlying Kafka client and can be injected directly to the consumer method. Another simpler approach consists in using Record:

Record
@Incoming("prices")
public void consume(Record<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
}

Record is a simple wrapper around key and payload of the incoming Kafka record.

@Channel

Alternatively, your application can inject a Multi in your bean and subscribe to its events as the following example:

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.annotations.Channel;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }
}

This is a good example of how to integrate a Kafka consumer with another downstream, in this example exposing it as a Server-Sent Events endpoint.

When consuming messages with @Channel, the application code is responsible for the subscription. In the example above, the RESTEasy Reactive endpoint handles that for you.

Following types can be injected as channels:

@Inject @Channel("prices") Multi<Double> streamOfPayloads;

@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;

@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;

@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

As with the previous Message example, if your injected channel receives payloads (Multi<T>), it acknowledges the message automatically, and support multiple subscribers. If you injected channel receives Message (Multi<Message<T>>), you will be responsible for the acknowledgment and broadcasting. We will explore sending broadcast messages in Broadcasting messages on multiple consumers.

Injecting @Channel("prices") or having @Incoming("prices") does not automatically configure the application to consume messages from Kafka. You need to configure an inbound connector with mp.messaging.incoming.prices... or have an @Outgoing("prices") method somewhere in your application (in which case, prices will be an in-memory channel).

4.1. ブロッキング処理

Reactive Messaging invokes your method on an I/O thread. See the Quarkus Reactive Architecture documentation for further details on this topic. But, you often need to combine Reactive Messaging with blocking processing such as database interactions. For this, you need to use the @Blocking annotation indicating that the processing is blocking and should not be run on the caller thread.

例えば、以下のコードは、Hibernate with Panacheを 使用してデータベースに受信ペイロードを格納する方法を示しています。

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;
import javax.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

完全な例は kafka-panache-quickstart ディレクトリー にあります。

@Blocking アノテーションは 2 つあります。

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

They have the same effect. Thus, you can use both. The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order. The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order.

Detailed information on the usage of @Blocking annotation can be found in SmallRye Reactive Messaging – Handling blocking execution.

@Transactional

If your method is annotated with @Transactional, it will be considered blocking automatically, even if the method is not annotated with @Blocking.

4.2. 確認戦略

All messages received by a consumer must be acknowledged. In the absence of acknowledgment, the processing is considered in error. If the consumer method receives a Record or a payload, the message will be acked on method return, also known as Strategy.POST_PROCESSING. If the consumer method returns another reactive stream or CompletionStage, the message will be acked when the downstream message is acked. You can override the default behavior to ack the message on arrival (Strategy.PRE_PROCESSING), or do not ack the message at all (Strategy.NONE) on the consumer method as in the following example:

@Incoming("prices")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void process(double price) {
    // process price
}

If the consumer method receives a Message, the acknowledgment strategy is Strategy.MANUAL and the consumer method is in charge of ack/nack the message.

@Incoming("prices")
public CompletionStage<Void> process(Message<Double> msg) {
    // process price
    return msg.ack();
}

As mentioned above, the method can also override the acknowledgment strategy to PRE_PROCESSING or NONE.

4.3. コミット戦略

When a message produced from a Kafka record is acknowledged, the connector invokes a commit strategy. These strategies decide when the consumer offset for a specific topic/partition is committed. Committing an offset indicates that all previous records have been processed. It is also the position where the application would restart the processing after a crash recovery or a restart.

Committing every offset has performance penalties as Kafka offset management can be slow. However, not committing the offset often enough may lead to message duplication if the application crashes between two commits.

The Kafka connector supports three strategies:

  • throttled keeps track of received messages and commits an offset of the latest acked message in sequence (meaning, all previous messages were also acked). This strategy guarantees at-least-once delivery even if the channel performs asynchronous processing. The connector tracks the received records and periodically (period specified by auto.commit.interval.ms, default: 5000 ms) commits the highest consecutive offset. The connector will be marked as unhealthy if a message associated with a record is not acknowledged in throttled.unprocessed-record-max-age.ms (default: 60000 ms). Indeed, this strategy cannot commit the offset as soon as a single record processing fails (see エラー処理戦略 to configure what happens on failing processing). If throttled.unprocessed-record-max-age.ms is set to less than or equal to 0, it does not perform any health check verification. Such a setting might lead to running out of memory if there are "poison pill" messages (that are never acked). This strategy is the default if enable.auto.commit is not explicitly set to true.

  • latest commits the record offset received by the Kafka consumer as soon as the associated message is acknowledged (if the offset is higher than the previously committed offset). This strategy provides at-least-once delivery if the channel processes the message without performing any asynchronous processing. This strategy should not be used in high load environment, as offset commit is expensive. However, it reduces the risk of duplicates.

  • ignore performs no commit. This strategy is the default strategy when the consumer is explicitly configured with enable.auto.commit to true. It delegates the offset commit to the underlying Kafka client. When enable.auto.commit is true this strategy DOES NOT guarantee at-least-once delivery. SmallRye Reactive Messaging processes records asynchronously, so offsets may be committed for records that have been polled but not yet processed. In case of a failure, only records that were not committed yet will be re-processed.

The Kafka connector disables the Kafka auto commit when it is not explicitly enabled. This behavior differs from the traditional Kafka consumer. If high throughput is important for you, and you are not limited by the downstream, we recommend to either:

  • use the throttled policy,

  • or set enable.auto.commit to true and annotate the consuming method with @Acknowledgment(Acknowledgment.Strategy.NONE).

4.4. エラー処理戦略

If a message produced from a Kafka record is nacked, a failure strategy is applied. The Kafka connector supports three strategies:

  • fail: fail the application, no more records will be processed (default strategy). The offset of the record that has not been processed correctly is not committed.

  • ignore: the failure is logged, but the processing continue. The offset of the record that has not been processed correctly is committed.

  • dead-letter-queue: the offset of the record that has not been processed correctly is committed, but the record is written to a Kafka dead letter topic.

The strategy is selected using the failure-strategy attribute.

In the case of dead-letter-queue, you can configure the following attributes:

  • dead-letter-queue.topic: the topic to use to write the records not processed correctly, default is dead-letter-topic-$channel, with $channel being the name of the channel.

  • dead-letter-queue.key.serializer: the serializer used to write the record key on the dead letter queue. By default, it deduces the serializer from the key deserializer.

  • dead-letter-queue.value.serializer: the serializer used to write the record value on the dead letter queue. By default, it deduces the serializer from the value deserializer.

The record written on the dead letter queue contains a set of additional headers about the original record:

  • dead-letter-reason: the reason of the failure

  • dead-letter-cause: the cause of the failure if any

  • dead-letter-topic: the original topic of the record

  • dead-letter-partition: the original partition of the record (integer mapped to String)

  • dead-letter-offset: the original offset of the record (long mapped to String)

4.4.1. Retrying processing

You can combine Reactive Messaging with SmallRye Fault Tolerance, and retry processing if it failed:

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
public void consume(String v) {
   // ... retry if this method throws an exception
}

You can configure the delay, the number of retries, the jitter, etc.

If your method returns a Uni or CompletionStage, you need to add the @NonBlocking annotation:

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
@NonBlocking
public Uni<String> consume(String v) {
   // ... retry if this method throws an exception or the returned Uni produce a failure
}
The @NonBlocking annotation is only required with SmallRye Fault Tolerance 5.1.0 and earlier. Starting with SmallRye Fault Tolerance 5.2.0 (available since Quarkus 2.1.0.Final), it is not necessary. See SmallRye Fault Tolerance documentation for more information.

The incoming messages are acknowledged only once the processing completes successfully. So, it commits the offset after the successful processing. If the processing still fails, even after all retries, the message is nacked and the failure strategy is applied.

4.4.2. Handling Deserialization Failures

When a deserialization failure occurs, you can intercept it and provide a failure strategy. To achieve this, you need to create a bean implementing DeserializationFailureHandler<T> interface:

@ApplicationScoped
@Identifier("failure-retry") // Set the name of the failure handler
public class MyDeserializationFailureHandler
    implements DeserializationFailureHandler<JsonObject> { // Specify the expected type

    @Override
    public JsonObject decorateDeserialization(Uni<JsonObject> deserialization, String topic, boolean isKey,
            String deserializer, byte[] data, Headers headers) {
        return deserialization
                    .onFailure().retry().atMost(3)
                    .await().atMost(Duration.ofMillis(200));
    }
}

To use this failure handler, the bean must be exposed with the @Identifier qualifier and the connector configuration must specify the attribute mp.messaging.incoming.$channel.[key|value]-deserialization-failure-handler (for key or value deserializers).

The handler is called with details of the deserialization, including the action represented as Uni<T>. On the deserialization Uni failure strategies like retry, providing a fallback value or applying timeout can be implemented.

4.5. Consumer Groups

In Kafka, a consumer group is a set of consumers which cooperate to consume data from a topic. A topic is divided into a set of partitions. The partitions of a topic are assigned among the consumers in the group, effectively allowing to scale consumption throughput. Note that each partition is assigned to a single consumer from a group. However, a consumer can be assigned multiple partitions if the number of partitions is greater than the number of consumer in the group.

Let’s explore briefly different producer/consumer patterns and how to implement them using Quarkus:

  1. Single consumer thread inside a consumer group

    This is the default behavior of an application subscribing to a Kafka topic: Each Kafka connector will create a single consumer thread and place it inside a single consumer group. Consumer group id defaults to the application name as set by the quarkus.application.name configuration property. It can also be set using the kafka.group.id property.

    アーキテクチャー
  2. Multiple consumer threads inside a consumer group

    For a given application instance, the number of consumers inside the consumer group can be configured using mp.messaging.incoming.$channel.partitions property. The partitions of the subscribed topic will be divided among the consumer threads. Note that if the partitions value exceed the number of partitions of the topic, some consumer threads won’t be assigned any partitions.

    アーキテクチャー
  3. Multiple consumer applications inside a consumer group

    Similar to the previous example, multiple instances of an application can subscribe to a single consumer group, configured via mp.messaging.incoming.$channel.group.id property, or left default to the application name. This in turn will divide partitions of the topic among application instances.

    アーキテクチャー
  4. Pub/Sub: Multiple consumer groups subscribed to a topic

    Lastly different applications can subscribe independently to same topics using different consumer group ids. For example, messages published to a topic called orders can be consumed independently on two consumer applications, one with mp.messaging.incoming.orders.group.id=invoicing and second with mp.messaging.incoming.orders.group.id=shipping. Different consumer groups can thus scale independently according to the message consumption requirements.

    アーキテクチャー

A common business requirement is to consume and process Kafka records in order. The Kafka broker preserves order of records inside a partition and not inside a topic. Therefore, it is important to think about how records are partitioned inside a topic. The default partitioner uses record key hash to compute the partition for a record, or when the key is not defined, chooses a partition randomly per batch or records.

During normal operation, a Kafka consumer preserves the order of records inside each partition assigned to it. Smallrye Reactive Messaging keeps this order for processing, unless @Blocking(ordered = false) is used (see ブロッキング処理).

Note that due to consumer rebalances, Kafka consumers only guarantee at-least-once processing of single records, meaning that uncommitted records can be processed again by consumers.

4.5.1. Consumer Rebalance Listener

Inside a consumer group, as new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as rebalancing the group. To handle offset commit and assigned partitions yourself, you can provide a consumer rebalance listener. To achieve this, implement the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener interface and expose it as a CDI bean with the @Idenfier qualifier. A common use case is to store offset in a separate data store to implement exactly-once semantic, or starting the processing at a specific offset.

The listener is invoked every time the consumer topic/partition assignment changes. For example, when the application starts, it invokes the partitionsAssigned callback with the initial set of topics/partitions associated with the consumer. If, later, this set changes, it calls the partitionsRevoked and partitionsAssigned callbacks again, so you can implement custom logic.

Note that the rebalance listener methods are called from the Kafka polling thread and will block the caller thread until completion. That’s because the rebalance protocol has synchronization barriers, and using asynchronous code in a rebalance listener may be executed after the synchronization barrier.

When topics/partitions are assigned or revoked from a consumer, it pauses the message delivery and resumes once the rebalance completes.

If the rebalance listener handles offset commit on behalf of the user (using the NONE commit strategy), the rebalance listener must commit the offset synchronously in the partitionsRevoked callback. We also recommend applying the same logic when the application stops.

Unlike the ConsumerRebalanceListener from Apache Kafka, the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener methods pass the Kafka Consumer and the set of topics/partitions.

In the following example we set up a consumer that always starts on messages from at most 10 minutes ago (or offset 0). First we need to provide a bean that implements io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener and is annotated with io.smallrye.common.annotation.Identifier. We then must configure our inbound connector to use this bean.

package inbound;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.TopicPartition;

import javax.enterprise.context.ApplicationScoped;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

@ApplicationScoped
@Identifier("rebalanced-example.rebalancer")
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener {

    private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName());

    /**
     * When receiving a list of partitions, will search for the earliest offset within 10 minutes
     * and seek the consumer to it.
     *
     * @param consumer   underlying consumer
     * @param partitions set of assigned topic partitions
     */
    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        long now = System.currentTimeMillis();
        long shouldStartAt = now - 600_000L; //10 minute ago

        Map<TopicPartition, Long> request = new HashMap<>();
        for (TopicPartition partition : partitions) {
            LOGGER.info("Assigned " + partition);
            request.put(partition, shouldStartAt);
        }
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(request);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> position : offsets.entrySet()) {
            long target = position.getValue() == null ? 0L : position.getValue().offset();
            LOGGER.info("Seeking position " + target + " for " + position.getKey());
            consumer.seek(position.getKey(), target);
        }
    }

}
package inbound;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class KafkaRebalancedConsumer {

    @Incoming("rebalanced-example")
    @Acknowledgment(Acknowledgment.Strategy.NONE)
    public CompletionStage<Void> consume(IncomingKafkaRecord<Integer, String> message) {
        // We don't need to ACK messages because in this example,
        // we set offset during consumer rebalance
        return CompletableFuture.completedFuture(null);
    }

}

To configure the inbound connector to use the provided listener, we either set the consumer rebalance listener’s identifier: mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer

Or have the listener’s name be the same as the group id:

mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer

Setting the consumer rebalance listener’s name takes precedence over using the group id.

4.5.2. Using unique consumer groups

If you want to process all the records from a topic (from its beginning), you need:

  1. to set auto.offset.reset = earliest

  2. assign your consumer to a consumer group not used by any other application.

Quarkus generates a UUID that changes between two executions (including in dev mode). So, you are sure no other consumer uses it, and you receive a new unique group id every time your application starts.

You can use that generated UUID as the consumer group as follows:

mp.messaging.incoming.your-channel.auto.offset.reset=earliest
mp.messaging.incoming.your-channel.group.id=${quarkus.uuid}
If the group.id attribute is not set, it defaults the quarkus.application.name configuration property.

4.6. Receiving Kafka Records in Batches

By default, incoming methods receive each Kafka record individually. Under the hood, Kafka consumer clients poll the broker constantly and receive records in batches, presented inside the ConsumerRecords container.

In batch mode, your application can receive all the records returned by the consumer poll in one go.

To achieve this you need to specify a compatible container type to receive all the data:

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

The incoming method can also receive Message<List<Payload>>, KafkaRecordBatch<Key, Payload> ConsumerRecords<Key, Payload> types. They give access to record details such as offset or timestamp:

@Incoming("prices")
public CompletionStage<Void> consumeMessage(KafkaRecordBatch<String, Double> records) {
    for (KafkaRecord<String, Double> record : records) {
        String payload = record.getPayload();
        String topic = record.getTopic();
        // process messages
    }
    // ack will commit the latest offsets (per partition) of the batch.
    return records.ack();
}

Note that the successful processing of the incoming record batch will commit the latest offsets for each partition received inside the batch. The configured commit strategy will be applied for these records only.

Conversely, if the processing throws an exception, all messages are nacked, applying the failure strategy for all the records inside the batch.

Quarkus autodetects batch types for incoming channels and sets batch configuration automatically. You can configure batch mode explicitly with mp.messaging.incoming.$channel.batch property.

5. Sending messages to Kafka

Configuration for the Kafka connector outgoing channels is similar to that of incoming:

%prod.kafka.bootstrap.servers=kafka:9092 (1)
mp.messaging.outgoing.prices-out.connector=smallrye-kafka (2)
mp.messaging.outgoing.prices-out.topic=prices (3)
1 Configure the broker location for the production profile. You can configure it globally or per channel using mp.messaging.outgoing.$channel.bootstrap.servers property. In dev mode and when running tests, Dev Services for Kafka automatically starts a Kafka broker. When not provided, this property defaults to localhost:9092.
2 Configure the connector to manage the prices-out channel.
3 By default, the topic name is same as the channel name. You can configure the topic attribute to override it.

Inside application configuration, channel names are unique. Therefore, if you’d like to configure an incoming and outgoing channel on the same topic, you will need to name channels differently (like in the examples of this guide, mp.messaging.incoming.prices and mp.messaging.outgoing.prices-out).

Then, your application can generate messages and publish them to the prices-out channel. It can use double payloads as in the following snippet:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class KafkaPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

You should not call methods annotated with @Incoming and/or @Outgoing directly from your code. They are invoked by the framework. Having user code invoking them would not have the expected outcome.

Note that the generate method returns a Multi<Double>, which implements the Reactive Streams Publisher interface. This publisher will be used by the framework to generate messages and send them to the configured Kafka topic.

Instead of returning a payload, you can return a io.smallrye.reactive.messaging.kafka.Record to send key/value pairs:

@Outgoing("out")
public Multi<Record<String, Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> Record.of("my-key", random.nextDouble()));
}

Payload can be wrapped inside org.eclipse.microprofile.reactive.messaging.Message to have more control on the written records:

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
                            .withKey("my-key")
                            .withTopic("my-key-prices")
                            .withHeaders(new RecordHeaders().add("my-header", "value".getBytes()))
                            .build()));
}

OutgoingKafkaRecordMetadata allows to set metadata attributes of the Kafka record, such as key, topic, partition or timestamp. One use case is to dynamically select the destination topic of a message. In this case, instead of configuring the topic inside your application configuration file, you need to use the outgoing metadata to set the name of the topic.

Other than method signatures returning a Reactive Stream Publisher (Multi being an implementation of Publisher), outgoing method can also return single message. In this case the producer will use this method as generator to create an infinite stream.

@Outgoing("prices-out") T generate(); // T excluding void

@Outgoing("prices-out") Message<T> generate();

@Outgoing("prices-out") Uni<T> generate();

@Outgoing("prices-out") Uni<Message<T>> generate();

@Outgoing("prices-out") CompletionStage<T> generate();

@Outgoing("prices-out") CompletionStage<Message<T>> generate();

5.1. Sending messages with @Emitter

時には、命令的な方法でメッセージを送ることが必要になる場合もあります。

For example, if you need to send a message to a stream when receiving a POST request inside a REST endpoint. In this case, you cannot use @Outgoing because your method has parameters.

この場合には Emitter が利用できます。

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("price-create")
    Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        CompletionStage<Void> ack = priceEmitter.send(price);
    }
}

Sending a payload returns a CompletionStage, completed when the message is acked. If the message transmission fails, the CompletionStage is completed exceptionally with the reason of the nack.

The Emitter configuration is done the same way as the other stream configuration used by @Incoming and @Outgoing.

Using the Emitter you are sending messages from your imperative code to reactive messaging. These messages are stored in a queue until they are sent. If the Kafka producer client can’t keep up with messages trying to be sent over to Kafka, this queue can become a memory hog and you may even run out of memory. You can use @OnOverflow to configure back-pressure strategy. It lets you configure the size of the queue (default is 256) and the strategy to apply when the buffer size is reached. Available strategies are DROP, LATEST, FAIL, BUFFER, UNBOUNDED_BUFFER and NONE.

With the Emitter API, you can also encapsulate the outgoing payload inside Message<T>. As with the previous examples, Message lets you handle the ack/nack cases differently.

import java.util.concurrent.CompletableFuture;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(Message.of(price)
            .withAck(() -> {
                // Called when the message is acked
                return CompletableFuture.completedFuture(null);
            })
            .withNack(throwable -> {
                // Called when the message is nacked
                return CompletableFuture.completedFuture(null);
            }));
    }
}

If you prefer using Reactive Stream APIs, you can use MutinyEmitter that will return Uni<Void> from the send method. You can therefore use Mutiny APIs for handling downstream messages and errors.

import org.eclipse.microprofile.reactive.messaging.Channel;

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;

import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("price-create")
    MutinyEmitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public Uni<String> addPrice(Double price) {
        return quoteRequestEmitter.send(price)
                .map(x -> "ok")
                .onFailure().recoverWithItem("ko");
    }
}

It is also possible to block on sending the event to the emitter with the sendAndAwait method. It will only return from the method when the event is acked or nacked by the receiver.

非推奨

io.smallrye.reactive.messaging.annotations.Emitterio.smallrye.reactive.messaging.annotations.Channelio.smallrye.reactive.messaging.annotations.OnOverflow クラスは現在非推奨となっており、以下のように置き換えられています。

  • org.eclipse.microprofile.reactive.messaging.Emitter

  • org.eclipse.microprofile.reactive.messaging.Channel

  • org.eclipse.microprofile.reactive.messaging.OnOverflow

新しい Emitter.send メソッドは、生成されたメッセージが確認されると、CompletionStage の完了を返します。

More information on how to use Emitter can be found in SmallRye Reactive Messaging – Emitters and Channels

5.2. Write Acknowledgement

When Kafka broker receives a record, its acknowledgement can take time depending on the configuration. Also, it stores in-memory the records that cannot be written.

By default, the connector does wait for Kafka to acknowledge the record to continue the processing (acknowledging the received Message). You can disable this by setting the waitForWriteCompletion attribute to false.

Note that the acks attribute has a huge impact on the record acknowledgement.

If a record cannot be written, the message is nacked.

5.3. Backpressure

The Kafka outbound connector handles back-pressure, monitoring the number of in-flight messages waiting to be written to the Kafka broker. The number of in-flight messages is configured using the max-inflight-messages attribute and defaults to 1024.

The connector only sends that amount of messages concurrently. No other messages will be sent until at least one in-flight message gets acknowledged by the broker. Then, the connector writes a new message to Kafka when one of the broker’s in-flight messages get acknowledged. Be sure to configure Kafka’s batch.size and linger.ms accordingly.

You can also remove the limit of in-flight messages by setting max-inflight-messages to 0. However, note that the Kafka producer may block if the number of requests reaches max.in.flight.requests.per.connection.

5.4. Retrying message dispatch

When the Kafka producer receives an error from the server, if it is a transient, recoverable error, the client will retry sending the batch of messages. This behavior is controlled by retries and retry.backoff.ms parameters. In addition to this, SmallRye Reactive Messaging will retry individual messages on recoverable errors, depending on the retries and delivery.timeout.ms parameters.

Note that while having retries in a reliable system is a best practice, the max.in.flight.requests.per.connection parameter defaults to 5, meaning that the order of the messages is not guaranteed. If the message order is a must for your use case, setting max.in.flight.requests.per.connection to 1 will make sure a single batch of messages is sent at a time, in the expense of limiting the throughput of the producer.

For applying retry mechanism on processing errors, see the section on Retrying processing.

5.5. Handling Serialization Failures

For Kafka producer client serialization failures are not recoverable, thus the message dispatch is not retried. In these cases you may need to apply a failure strategy for the serializer. To achieve this, you need to create a bean implementing SerializationFailureHandler<T> interface:

@ApplicationScoped
@Identifier("failure-fallback") // Set the name of the failure handler
public class MySerializationFailureHandler
    implements SerializationFailureHandler<JsonObject> { // Specify the expected type

    @Override
    public byte[] decorateSerialization(Uni<byte[]> serialization, String topic, boolean isKey,
        String serializer, Object data, Headers headers) {
        return serialization
                    .onFailure().retry().atMost(3)
                    .await().indefinitely();
    }
}

To use this failure handler, the bean must be exposed with the @Identifier qualifier and the connector configuration must specify the attribute mp.messaging.outgoing.$channel.[key|value]-serialization-failure-handler (for key or value serializers).

The handler is called with details of the serialization, including the action represented as Uni<byte[]>. Note that the method must await on the result and return the serialized byte array.

5.6. In-memory channels

In some use cases, it is convenient to use the messaging patterns to transfer messages inside the same application. When you don’t connect a channel to a messaging backend like Kafka, everything happens in-memory, and the streams are created by chaining methods together. Each chain is still a reactive stream and enforces the back-pressure protocol.

The framework verifies that the producer/consumer chain is complete, meaning that if the application writes messages into an in-memory channel (using a method with only @Outgoing, or an Emitter), it must also consume the messages from within the application (using a method with only @Incoming or using an unmanaged stream).

5.7. Broadcasting messages on multiple consumers

By default, a channel can be linked to a single consumer, using @Incoming method or @Channel reactive stream. At application startup, channels are verified to form a chain of consumers and producers with single consumer and producer. You can override this behavior by setting mp.messaging.$channel.broadcast=true on a channel.

In case of in-memory channels, @Broadcast annotation can be used on the @Outgoing method. For example,

import java.util.Random;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Broadcast;

@ApplicationScoped
public class MultipleConsumer {

    private final Random random = new Random();

    @Outgoing("in-memory-channel")
    @Broadcast
    double generate() {
        return random.nextDouble();
    }

    @Incoming("in-memory-channel")
    void consumeAndLog(double price) {
        System.out.println(price);
    }

    @Incoming("in-memory-channel")
    @Outgoing("prices2")
    double consumeAndSend(double price) {
        return price;
    }
}

Reciprocally, multiple producers on the same channel can be merged by setting mp.messaging.incoming.$channel.merge=true. On the @Incoming methods, you can control how multiple channels are merged using the @Merge annotation.

5.8. Kafka Transactions

Kafka transactions enable atomic writes to multiple Kafka topics and partitions. The Kafka connector provides KafkaTransactions custom emitter for writing Kafka records inside a transaction. It can be injected as a regular emitter @Channel:

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@ApplicationScoped
public class KafkaTransactionalProducer {

    @Channel("tx-out-example")
    KafkaTransactions<String> txProducer;

    public Uni<Void> emitInTransaction() {
        return txProducer.withTransaction(emitter -> {
            emitter.send(KafkaRecord.of(1, "a"));
            emitter.send(KafkaRecord.of(2, "b"));
            emitter.send(KafkaRecord.of(3, "c"));
            return Uni.createFrom().voidItem();
        });
    }

}

The function given to the withTransaction method receives a TransactionalEmitter for producing records, and returns a Uni that provides the result of the transaction.

  • If the processing completes successfully, the producer is flushed and the transaction is committed.

  • If the processing throws an exception, returns a failing Uni, or marks the TransactionalEmitter for abort, the transaction is aborted.

Kafka transactional producers require configuring acks=all client property, and a unique id for transactional.id, which implies enable.idempotence=true. When Quarkus detects the use of KafkaTransactions for an outgoing channel it configures these properties on the channel, providing a default value of "${quarkus.application.name}-${channelName}" for transactional.id property.

Note that for production use the transactional.id must be unique across all application instances.

While a normal message emitter would support concurrent calls to send methods and consequently queues outgoing messages to be written to Kafka, a KafkaTransactions emitter only supports one transaction at a time. A transaction is considered in progress from the call to the withTransaction until the returned Uni results in success or failure. While a transaction is in progress, subsequent calls to the withTransaction, including nested ones inside the given function, will throw IllegalStateException.

Note that in Reactive Messaging, the execution of processing methods, is already serialized, unless @Blocking(ordered = false) is used. If withTransaction can be called concurrently, for example from a REST endpoint, it is recommended to limit the concurrency of the execution. This can be done using the @Bulkhead annotation from Microprofile Fault Tolerance.

5.8.1. Transaction-aware consumers

If you’d like to consume records only written and committed inside a Kafka transaction you need to configure the isolation.level property on the incoming channel as such:

mp.messaging.incoming.prices-in.isolation.level=read_committed

If you are using Dev Services for Kafka using Redpanda, you need to enable transactions.

6. Processing Messages

Applications streaming data often need to consume some events from a topic, process them and publish the result to a different topic. A processor method can be simply implemented using both the @Incoming and @Outgoing annotations:

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("price-in")
    @Outgoing("price-out")
    public double process(double price) {
        return price * CONVERSION_RATE;
    }

}

The parameter of the process method is the incoming message payload, whereas the return value will be used as the outgoing message payload. Previously mentioned signatures for parameter and return types are also supported, such as Message<T>, Record<K, V>, etc.

You can apply asynchronous stream processing by consuming and returning reactive stream Multi<T> type:

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class PriceProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("price-in")
    @Outgoing("price-out")
    public Multi<Double> process(Multi<Integer> prices) {
        return prices.filter(p -> p > 100).map(p -> p * CONVERSION_RATE);
    }

}

6.1. Propagating Record Key

When processing messages, you can propagate incoming record key to the outgoing record.

Enabled with mp.messaging.outgoing.$channel.propagate-record-key=true configuration, record key propagation produces the outgoing record with the same key as the incoming record.

If the outgoing record already contains a key, it won’t be overridden by the incoming record key. If the incoming record does have a null key, the mp.messaging.outgoing.$channel.key property is used.

6.2. Exactly-Once Processing

Kafka Transactions allows managing consumer offsets inside a transaction, together with produced messages. This enables coupling a consumer with a transactional producer in a consume-transform-produce pattern, also known as exactly-once processing.

The KafkaTransactions custom emitter provides a way to apply exactly-once processing to an incoming Kafka message inside a transaction.

The following example includes a batch of Kafka records inside a transaction.

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@ApplicationScoped
public class KafkaExactlyOnceProcessor {

    @Channel("prices-out")
    @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 500) (3)
    KafkaTransactions<Integer> txProducer;

    @Incoming("prices-in")
    public Uni<Void> emitInTransaction(KafkaRecordBatch<String, Integer> batch) { (1)
        return txProducer.withTransactionAndAck(batch, emitter -> { (2)
            for (KafkaRecord<String, Integer> record : batch) {
                emitter.send(KafkaRecord.of(record.getKey(), record.getPayload() + 1)); (3)
            }
            return Uni.createFrom().voidItem();
        });
    }

}
1 It is recommended to use exactly-once processing along with the batch consumption mode. While it is possible to use it with a single Kafka message, it’ll have a significant performance impact.
2 The consumed KafkaRecordBatch message is passed to the KafkaTransactions#withTransactionAndAck in order to handle the offset commits and message acks.
3 The send method writes records to Kafka inside the transaction, without waiting for send receipt from the broker. Messages pending to be written to Kafka will be buffered, and flushed before committing the transaction. It is therefore recommended configuring the @OnOverflow bufferSize in order to fit enough messages, for example the max.poll.records, maximum amount of records returned in a batch.
  • If the processing completes successfully, before committing the transaction, the topic partition offsets of the given batch message will be committed to the transaction.

  • If the processing needs to abort, after aborting the transaction, the consumer’s position is reset to the last committed offset, effectively resuming the consumption from that offset. If no consumer offset has been committed to a topic-partition, the consumer’s position is reset to the beginning of the topic-partition, even if the offset reset policy is `latest`.

When using exactly-once processing, consumed message offset commits are handled by the transaction and therefore the application should not commit offsets through other means. The consumer should have enable.auto.commit=false (the default) and set explicitly commit-strategy=ignore:

mp.messaging.incoming.prices-in.commit-strategy=ignore
mp.messaging.incoming.prices-in.failure-strategy=ignore

6.2.1. Error handling for the exactly-once processing

The Uni returned from the KafkaTransactions#withTransaction will yield a failure if the transaction fails and is aborted. The application can choose to handle the error case, but if a failing Uni is returned from the @Incoming method, the incoming channel will effectively fail and stop the reactive stream.

The KafkaTransactions#withTransactionAndAck method acks and nacks the message but will not return a failing Uni. Nacked messages will be handled by the failure strategy of the incoming channel, (see エラー処理戦略). Configuring failure-strategy=ignore simply resets the Kafka consumer to the last committed offsets and resumes the consumption from there.

Redpanda does not yet support producer scalability for exactly-once processing. In order to use Kafka exactly-once processing with Quarkus you can configure Dev Services for Kafka to use Strimzi images instead of Redpanda.

7. Accessing Kafka clients directly

In rare cases, you may need to access the underlying Kafka clients. KafkaClientService provides thread-safe access to Producer and Consumer.

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;

import org.apache.kafka.clients.producer.ProducerRecord;

import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.kafka.KafkaClientService;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;

@ApplicationScoped
public class PriceSender {

    @Inject
    KafkaClientService clientService;

    void onStartup(@Observes StartupEvent startupEvent) {
        KafkaProducer<String, Double> producer = clientService.getProducer("generated-price");
        producer.runOnSendingThread(client -> client.send(new ProducerRecord<>("prices", 2.4)))
            .await().indefinitely();
    }
}

The KafkaClientService is an experimental API and can change in the future.

You can also get the Kafka configuration injected to your application and create Kafka producer, consumer and admin clients directly:

import io.smallrye.common.annotation.Identifier;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;

@ApplicationScoped
public class KafkaClients {

    @Inject
    @Identifier("default-kafka-broker")
    Map<String, Object> config;

    @Produces
    AdminClient getAdmin() {
        Map<String, Object> copy = new HashMap<>();
        for (Map.Entry<String, Object> entry : config.entrySet()) {
            if (AdminClientConfig.configNames().contains(entry.getKey())) {
                copy.put(entry.getKey(), entry.getValue());
            }
        }
        return KafkaAdminClient.create(copy);
    }

}

The default-kafka-broker configuration map contains all application properties prefixed with kafka. or KAFKA_. For more configuration options check out Kafka Configuration Resolution.

8. JSON シリアライゼーション

Quarkus には、JSON Kafka メッセージを扱う機能が組み込まれています。

Imagine we have a Fruit data class as follows:

public class Fruit {

    public String name;
    public int price;

    public Fruit() {
    }

    public Fruit(String name, int price) {
        this.name = name;
        this.price = price;
    }
}

そして、Kafka からメッセージを受信して、何らかの価格変換を行い、Kafka にメッセージを送り返すために使いたいと考えています。

import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;

/**
* A bean consuming data from the "fruit-in" channel and applying some price conversion.
* The result is pushed to the "fruit-out" channel.
*/
@ApplicationScoped
public class FruitProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("fruit-in")
    @Outgoing("fruit-out")
    @Broadcast
    public Fruit process(Fruit fruit) {
        fruit.price = fruit.price * CONVERSION_RATE;
        return fruit;
    }

}

To do this, we will need to set up JSON serialization with Jackson or JSON-B.

JSON シリアライゼーションが正しく設定されていれば、 Publisher<Fruit>Emitter<Fruit> も利用できます。

8.1. Jackson によるシリアライゼーション

Quarkus has built-in support for JSON serialization and deserialization based on Jackson. It will also generate the serializer and deserializer for you, so you do not have to configure anything. When generation is disabled, you can use the provided ObjectMapperSerializer and ObjectMapperDeserializer as explained below.

There is an existing ObjectMapperSerializer that can be used to serialize all data objects via Jackson. You may create an empty subclass if you want to use Serializer/deserializer autodetection.

By default, the ObjectMapperSerializer serializes null as the "null" String, this can be customized by setting the Kafka configuration property json.serialize.null-as-null=true which will serialize null as null. This is handy when using a compacted topic, as null is used as a tombstone to know which messages delete during compaction phase.

The corresponding deserializer class needs to be subclassed. So, let’s create a FruitDeserializer that extends the ObjectMapperDeserializer.

package com.acme.fruit.jackson;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

Finally, configure your channels to use the Jackson serializer and deserializer.

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jackson.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Now, your Kafka messages will contain a Jackson serialized representation of your Fruit data object. In this case, the deserializer configuration is not necessary as the Serializer/deserializer autodetection is enabled by default.

If you want to deserialize a list of fruits, you need to create a deserializer with a Jackson TypeReference denoted the generic collection used.

package com.acme.fruit.jackson;

import java.util.List;
import com.fasterxml.jackson.core.type.TypeReference;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class ListOfFruitDeserializer extends ObjectMapperDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        super(new TypeReference<List<Fruit>>() {});
    }
}

8.2. JSON-B によるシリアライズ

First, you need to include the quarkus-jsonb extension.

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-jsonb</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-jsonb")

There is an existing JsonbSerializer that can be used to serialize all data objects via JSON-B. You may create an empty subclass if you want to use Serializer/deserializer autodetection.

By default, the JsonbSerializer serializes null as the "null" String, this can be customized by setting the Kafka configuration property json.serialize.null-as-null=true which will serialize null as null. This is handy when using a compacted topic, as null is used as a tombstone to know which messages delete during compaction phase.

The corresponding deserializer class needs to be subclassed. So, let’s create a FruitDeserializer that extends the generic JsonbDeserializer.

package com.acme.fruit.jsonb;

import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class FruitDeserializer extends JsonbDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

Finally, configure your channels to use the JSON-B serializer and deserializer.

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jsonb.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

Now, your Kafka messages will contain a JSON-B serialized representation of your Fruit data object.

If you want to deserialize a list of fruits, you need to create a deserializer with a Type denoted the generic collection used.

package com.acme.fruit.jsonb;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class ListOfFruitDeserializer extends JsonbDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        super(new ArrayList<MyEntity>() {}.getClass().getGenericSuperclass());
    }
}
If you don’t want to create a deserializer for each data object, you can use the generic io.vertx.kafka.client.serialization.JsonObjectDeserializer that will deserialize to a io.vertx.core.json.JsonObject. The corresponding serializer can also be used: io.vertx.kafka.client.serialization.JsonObjectSerializer.

9. Avro Serialization

This is described in a dedicated guide: Using Apache Kafka with Schema Registry and Avro.

10. Serializer/deserializer autodetection

When using SmallRye Reactive Messaging with Kafka (io.quarkus:quarkus-smallrye-reactive-messaging-kafka), Quarkus can often automatically detect the correct serializer and deserializer class. This autodetection is based on declarations of @Incoming and @Outgoing methods, as well as injected @Channels.

For example, if you declare

@Outgoing("generated-price")
public Multi<Integer> generate() {
    ...
}

and your configuration indicates that the generated-price channel uses the smallrye-kafka connector, then Quarkus will automatically set the value.serializer to Kafka’s built-in IntegerSerializer.

Similarly, if you declare

@Incoming("my-kafka-records")
public void consume(KafkaRecord<Long, byte[]> record) {
    ...
}

and your configuration indicates that the my-kafka-records channel uses the smallrye-kafka connector, then Quarkus will automatically set the key.deserializer to Kafka’s built-in LongDeserializer, as well as the value.deserializer to ByteArrayDeserializer.

Finally, if you declare

@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;

and your configuration indicates that the price-create channel uses the smallrye-kafka connector, then Quarkus will automatically set the value.serializer to Kafka’s built-in DoubleSerializer.

The full set of types supported by the serializer/deserializer autodetection is:

  • short and java.lang.Short

  • int and java.lang.Integer

  • long and java.lang.Long

  • float and java.lang.Float

  • double and java.lang.Double

  • byte[]

  • java.lang.String

  • java.util.UUID

  • java.nio.ByteBuffer

  • org.apache.kafka.common.utils.Bytes

  • io.vertx.core.buffer.Buffer

  • io.vertx.core.json.JsonObject

  • io.vertx.core.json.JsonArray

  • classes for which a direct implementation of org.apache.kafka.common.serialization.Serializer<T> / org.apache.kafka.common.serialization.Deserializer<T> is present.

    • the implementation needs to specify the type argument T as the (de-)serialized type.

  • classes generated from Avro schemas, as well as Avro GenericRecord, if Confluent or Apicurio Registry serde is present

    • in case multiple Avro serdes are present, serializer/deserializer must be configured manually for Avro-generated classes, because autodetection is impossible

    • see Using Apache Kafka with Schema Registry and Avro for more information about using Confluent or Apicurio Registry libraries

  • classes for which a subclass of ObjectMapperSerializer / ObjectMapperDeserializer is present, as described in Jackson によるシリアライゼーション

    • it is technically not needed to subclass ObjectMapperSerializer, but in such case, autodetection isn’t possible

  • classes for which a subclass of JsonbSerializer / JsonbDeserializer is present, as described in JSON-B によるシリアライズ

    • it is technically not needed to subclass JsonbSerializer, but in such case, autodetection isn’t possible

If a serializer/deserializer is set by configuration, it won’t be replaced by the autodetection.

In case you have any issues with serializer autodetection, you can switch it off completely by setting quarkus.reactive-messaging.kafka.serializer-autodetection.enabled=false. If you find you need to do this, please file a bug in the Quarkus issue tracker so we can fix whatever problem you have.

11. JSON Serializer/deserializer generation

Quarkus automatically generates serializers and deserializers for channels where:

  1. the serializer/deserializer is not configured

  2. the auto-detection did not find a matching serializer/deserializer

It uses Jackson underneath.

This generation can be disabled using:

quarkus.reactive-messaging.kafka.serializer-generation.enabled=false
Generation does not support collections such as List<Fruit>. Refer to Jackson によるシリアライゼーション to write your own serializer/deserializer for this case.

12. Using Schema Registry

This is described in a dedicated guide: Using Apache Kafka with Schema Registry and Avro.

13. Health Checks

Quarkusは、Kafkaのヘルスチェックをいくつか提供しています。これらのチェックは、 quarkus-smallrye-health エクステンションと組み合わせて使用します。

13.1. Kafka Broker Readiness Check

When using the quarkus-kafka-client extension, you can enable readiness health check by setting the quarkus.kafka.health.enabled property to true in your application.properties. This check reports the status of the interaction with a default Kafka broker (configured using kafka.bootstrap.servers). It requires an admin connection with the Kafka broker, and it is disabled by default. If enabled, when you access the /q/health/ready endpoint of your application, you will have information about the connection validation status.

13.2. Kafka Reactive Messaging Health Checks

When using Reactive Messaging and the Kafka connector, each configured channel (incoming or outgoing) provides startup, liveness and readiness checks.

  • The startup check verifies that the communication with Kafka cluster is established.

  • The liveness check captures any unrecoverable failure happening during the communication with Kafka.

  • The readiness check verifies that the Kafka connector is ready to consume/produce messages to the configured Kafka topics.

For each channel, you can disable the checks using:

# Disable both liveness and readiness checks with `health-enabled=false`:

# Incoming channel (receiving records form Kafka)
mp.messaging.incoming.your-channel.health-enabled=false
# Outgoing channel (writing records to Kafka)
mp.messaging.outgoing.your-channel.health-enabled=false

# Disable only the readiness check with `health-readiness-enabled=false`:

mp.messaging.incoming.your-channel.health-readiness-enabled=false
mp.messaging.outgoing.your-channel.health-readiness-enabled=false
You can configure the bootstrap.servers for each channel using mp.messaging.incoming|outgoing.$channel.bootstrap.servers property. Default is kafka.bootstrap.servers.

Reactive Messaging startup and readiness checks offer two strategies. The default strategy verifies that an active connection is established with the broker. This approach is not intrusive as it’s based on built-in Kafka client metrics.

Using the health-topic-verification-enabled=true attribute, startup probe uses an admin client to check for the list of topics. Whereas the readiness probe for an incoming channel checks that at least one partition is assigned for consumption, and for an outgoing channel checks that the topic used by the producer exist in the broker.

Note that to achieve this, an admin connection is required. You can adjust the timeout for topic verification calls to the broker using the health-topic-verification-timeout configuration.

14. Kafka Streams

This is described in a dedicated guide: Using Apache Kafka Streams.

15. Using Snappy for message compression

outgoing チャンネルでは、 compression.type 属性を snappy に設定することで、Snappy 圧縮を有効にすることができます。

mp.messaging.outgoing.fruit-out.compression.type=snappy

JVMモードでは、変更なしで動作します。しかし、アプリケーションをネイティブ実行可能ファイルにコンパイルするには、以下のことが必要です。

  1. GraalVM 21.+の使用

  2. application.properties に `quarkus.kafka.snappy.enabled=true`を追加

ネイティブモードでは、Snappyはデフォルトで無効になっています。Snappyを使用するには、ネイティブライブラリを埋め込み、アプリケーションの起動時にそれを解凍する必要があるからです。

16. Authentication with OAuth

Kafka ブローカーが認証メカニズムとして OAuth を使用している場合は、この認証プロセスを有効にするために Kafka コンシューマーを設定する必要があります。まず、以下の依存関係をアプリケーションに追加します。

pom.xml
<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-client</artifactId>
</dependency>
build.gradle
implementation("io.strimzi:kafka-oauth-client")

この依存関係は、OAuth ワークフローを処理するために必要なコールバックハンドラーを提供します。そして、application.properties で追加します。

mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT
mp.messaging.connector.smallrye-kafka.sasl.mechanism=OAUTHBEARER
mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.client.id="team-a-client" \
  oauth.client.secret="team-a-client-secret" \
  oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" ;
mp.messaging.connector.smallrye-kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

quarkus.ssl.native=true

oauth.client.idoauth.client.secretoauth.token.endpoint.uri の値を更新します。

OAuth authentication works for both JVM and native modes. Since SSL in not enabled by default in native mode, quarkus.ssl.native=true must be added to support JaasClientOauthLoginCallbackHandler, which uses SSL. (See the Using SSL with Native Executables guide for more details.)

17. Kafka アプリケーションのテスト

17.1. ブローカーなしでのテスト

Kafka ブローカーを起動しなくてもアプリケーションをテストできるのは便利です。これを行うには、Kafka コネクターで管理しているチャンネルを インメモリー切り替え できます。

このアプローチは、JVM テストでのみ機能します。インジェクションには対応していないため、ネイティブテストには使用できません。

Let’s say we want to test the following processor application:

@ApplicationScoped
public class BeverageProcessor {

    @Incoming("orders")
    @Outgoing("beverages")
    Beverage process(Order order) {
        System.out.println("Order received " + order.getProduct());
        Beverage beverage = new Beverage();
        beverage.setBeverage(order.getProduct());
        beverage.setCustomer(order.getCustomer());
        beverage.setOrderId(order.getOrderId());
        beverage.setPreparationState("RECEIVED");
        return beverage;
    }

}

First, add the following test dependency to your application:

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-reactive-messaging-in-memory</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")

そして、以下のように Quarkus Test Resource を作成します。

public class KafkaTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {

    @Override
    public Map<String, String> start() {
        Map<String, String> env = new HashMap<>();
        Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("orders");     (1)
        Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("beverages");  (2)
        env.putAll(props1);
        env.putAll(props2);
        return env;  (3)
    }

    @Override
    public void stop() {
        InMemoryConnector.clear();  (4)
    }
}
1 Switch the incoming channel orders (expecting messages from Kafka) to in-memory.
2 Switch the outgoing channel beverages (writing messages to Kafka) to in-memory.
3 インメモリーチャネルを使用するためのアプリケーション設定に必要なすべてのプロパティを含む Map をビルドして返します。
4 テストが停止したら、InMemoryConnector をクリアします (受信したメッセージと送信したメッセージをすべて破棄してください)。

上記で作成したテストリソースを使用して Quarkus テストを作成します。

@QuarkusTest
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {

    @Inject
    InMemoryConnector connector; (1)

    @Test
    void testProcessOrder() {
        InMemorySource<Order> ordersIn = connector.source("orders");     (2)
        InMemorySink<Beverage> beveragesOut = connector.sink("beverages");  (3)

        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");

        ordersIn.send(order);  (4)

        await().<List<? extends Message<Beverage>>>until(beveragesOut::received, t -> t.size() == 1); (5)

        Beverage queuedBeverage = beveragesOut.received().get(0).getPayload();
        Assertions.assertEquals(Beverage.State.READY, queuedBeverage.getPreparationState());
        Assertions.assertEquals("coffee", queuedBeverage.getBeverage());
        Assertions.assertEquals("Coffee lover", queuedBeverage.getCustomer());
        Assertions.assertEquals("1234", queuedBeverage.getOrderId());
    }

}
1 テストクラスにインメモリーコネクタ-を挿入します。
2 受信チャンネルを取得します (orders) - テストリソース内でチャンネルがインメモリーに切り替えられている必要があります。
3 Retrieve the outgoing channel (beverages) - the channel must have been switched to in-memory in the test resource.
4 Use the send method to send a message to the orders channel. The application will process this message and send a message to beverages channel.
5 Use the received method on beverages channel to check the messages produced by the application.

With in-memory channels we were able to test application code processing messages without starting a Kafka broker. Note that different in-memory channels are independent, and switching channel connector to in-memory does not simulate message delivery between channels configured to the same Kafka topic.

17.2. Testing using a Kafka broker

If you are using Dev Services for Kafka, a Kafka broker will be started and available throughout the tests, unless it is disabled in %test profile. While it is possible to connect to this broker using Kafka Clients API, Kafka Companion Library proposes an easier way of interacting with a Kafka broker and, creating consumer, producer and admin actions inside tests.

For using KafkaCompanion API in tests, start by adding the following dependency:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-test-kafka-companion</artifactId>
    <scope>test</scope>
</dependency>

which provides io.quarkus.test.kafka.KafkaCompanionResource - an implementation of io.quarkus.test.common.QuarkusTestResourceLifecycleManager.

Then use @QuarkusTestResource to configure the Kafka Companion in tests, for example:

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.UUID;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.kafka.InjectKafkaCompanion;
import io.quarkus.test.kafka.KafkaCompanionResource;
import io.smallrye.reactive.messaging.kafka.companion.ConsumerTask;
import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;

@QuarkusTest
@QuarkusTestResource(KafkaCompanionResource.class)
public class OrderProcessorTest {

    @InjectKafkaCompanion (1)
    KafkaCompanion companion;

    @Test
    void testProcessor() {
        companion.produceStrings().usingGenerator(i -> new ProducerRecord<>("orders", UUID.randomUUID().toString())); (2)

        // Expect that the tested application processes orders from 'orders' topic and write to 'orders-processed' topic

        ConsumerTask<String, String> orders = companion.consumeStrings().fromTopics("orders-processed", 10); (3)
        orders.awaitCompletion(); (4)
        assertEquals(10, orders.count());
    }
}
1 @InjectKafkaCompanion injects the KafkaCompanion instance, configured to access the Kafka broker created for tests.
2 Use KafkaCompanion to create producer task which writes 10 records to 'orders' topic.
3 Create consumer task which subscribes to 'orders-processed' topic and consumes 10 records.
4 Await completion of the consumer task.

If the Kafka Dev Service is available during tests, KafkaCompanionResource uses the created Kafka broker, otherwise it creates a Kafka broker using Strimzi Test Container.

The configuration of the created Kafka broker can be customized using @ResourceArg, for example:

@QuarkusTestResource(value = KafkaCompanionResource.class, initArgs = {
        @ResourceArg(name = "strimzi.kafka.image", value = "quay.io/strimzi/kafka:0.28.0-kafka-3.0.0"), // Image name
        @ResourceArg(name = "kafka.port", value = "9092"), // Fixed port for kafka, by default it will be exposed on a random port
        @ResourceArg(name = "kraft", value = "true"), // Enable Kraft mode
        @ResourceArg(name = "num.partitions", value = "3"), // Other custom broker configurations
})
public class OrderProcessorTest {
    // ...
}

17.2.1. Custom test resource

あるいは、テストリソースで Kafka ブローカを起動することもできます。次のスニペットは、 Testcontainers を使用して Kafka ブローカを起動するテストリソースを示しています。

public class KafkaResource implements QuarkusTestResourceLifecycleManager {

    private final KafkaContainer kafka = new KafkaContainer();

    @Override
    public Map<String, String> start() {
        kafka.start();
        return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());  (1)
    }

    @Override
    public void stop() {
        kafka.close();
    }
}
1 アプリケーションがこのブローカーに接続するように、Kafka ブートストラップの場所を設定します。

18. Dev Services for Kafka

Kafka関連のエクステンション(例: quarkus-smallrye-reactive-messaging-kafka )が存在する場合、Dev Services for Kafkaは、開発モードやテストの実行時に自動的にKafkaブローカーを起動します。そのため、ブローカーを手動で起動する必要はありません。アプリケーションは自動的に構成されます。

Kafkaブローカーの起動には時間がかかるため、Dev Services for Kafkaでは、1秒程度で起動するKafka互換ブローカーの Redpandaを使用しています。

18.1. Dev Services for Kafkaの有効化/無効化

以下の場合を除き、Dev Services for Kafkaが自動的に有効になります:

  • quarkus.kafka.devservices.enabledfalse に設定されている場合

  • kafka.bootstrap.servers が設定されている場合

  • すべてのReactive Messaging Kafkaチャンネルに bootstrap.servers 属性が設定されている場合

Dev Services for Kafkaでは、ブローカーの起動にDockerを使用しています。お使いの環境でDockerがサポートされていない場合は、ブローカーを手動で起動するか、すでに稼働しているブローカーに接続する必要があります。ブローカーのアドレスは、 kafka.bootstrap.servers を使用して設定できます。

18.2. 共有ブローカー

通常は、アプリケーション間でブローカーを共有する必要があります。Dev Services for Kafkaは、 dev モードで動作する複数のQuarkusアプリケーションが1つのブローカーを共有するための サービスディスカバリー メカニズムを実装しています。

Dev Services for Kafka は、コンテナを識別するために使用される quarkus-dev-service-kafka のラベルでコンテナを開始します。

複数の(共有)ブローカーが必要な場合は、 quarkus.kafka.devservices.service-name 属性を設定し、ブローカー名を示します。同じ値のコンテナを探し、見つからない場合は新しいコンテナを開始します。デフォルトのサービス名は kafka です。

共有は、dev モードではデフォルトで有効ですが、testモードでは無効です。 quarkus.kafka.devservices.shared=false で共有を無効に設定可能です。

18.3. ポートの設定

デフォルトでは、Dev Services for Kafka はランダムなポートを選択してアプリケーションを構成します。ポートは、 quarkus.kafka.devservices.port プロパティを構成することで設定できます。

Kafkaのアドバタイズされたアドレスは、選択したポートで自動的に設定されることに注意してください。

18.4. イメージの設定

Dev Services for Kafkaは、 RedpandaStrimziKraft モード)をサポートしています。

RedpandaはKafkaと互換性のあるイベントストリーミングプラットフォームです。 起動時間を高速にするため、Dev Services for Kafka はデフォルトで vectorized/redpanda イメージを使用します。 https://hub.docker.com/r/vectorized/redpanda から任意のバージョンを選択することができます。

Strimziは、Kubernetes上でApache Kafkaを動作させるためのコンテナイメージとOperatorを提供します。StrimziはKubernetesに最適化されていますが、イメージは従来のコンテナ環境でも完全に動作します。Strimziのコンテナイメージは、JVM上で “純正” のKafkaブローカーを動作させますが、起動が遅くなっています。

Strimzi では、Kafka のバージョンが Kraft に対応しているもの(2.8.1 以上)であれば、 https://quay.io/repository/strimzi-test-container/test-container?tab=tags から任意のイメージを選択することができます。

quarkus.kafka.devservices.image-name=quay.io/strimzi-test-container/test-container:0.100.0-kafka-3.1.0

18.5. Kafkaトピックの設定

ブローカーの起動時にトピックを作成するように、Dev Services for Kafka を構成することができます。トピックは、指定された数のパーティションと1つのレプリカで作成されます。

次の例では、 test という名前のトピックを3つのパーティションで作成し、 messages という名前の2つ目のトピックを2つのパーティションで作成しています。

quarkus.kafka.devservices.topic-partitions.test=3
quarkus.kafka.devservices.topic-partitions.messages=2

指定された名前のトピックがすでに存在する場合、既存のトピックを異なる数のパーティションに再分割しようとはせず、作成はスキップされます。

quarkus.kafka.devservices.topic-partitions-timeout を使用して、トピック作成時に使用される Kafka admin クライアントコールのタイムアウトを設定できます。デフォルトは 2 秒です。

18.6. トランザクションの有効化

デフォルトでは、Red Pandaブローカーは、トランザクションコーディネーターとしては機能しません。トランザクションを有効にするには、次を設定します:

quarkus.kafka.devservices.redpanda.transaction-enabled=true
これは、プロデューサーの冪等性のサポートも有効化します。

19. Kubernetes Service Bindings

Quarkus Kafka extension supports Service Binding Specification for Kubernetes. You can enable this by adding the quarkus-kubernetes-service-binding extension to your application.

When running in appropriately configured Kubernetes clusters, Kafka extension will pull its Kafka broker connection configuration from the service binding available inside the cluster, without the need for user configuration.

20. 実行モデル

Reactive Messaging invokes user’s methods on an I/O thread. Thus, by default, the methods must not block. As described in ブロッキング処理, you need to add the @Blocking annotation on the method if this method will block the caller thread.

See the Quarkus Reactive Architecture documentation for further details on this topic.

21. 設定リファレンス

More details about the SmallRye Reactive Messaging configuration can be found in the SmallRye Reactive Messaging - Kafka Connector Documentation.

Each channel can be disabled via configuration using:

mp.messaging.[incoming|outgoing].[channel].enabled=false

The most important attributes are listed in the tables below:

21.1. Incoming チャネル設定 (Kafkaからのポーリング)

以下の属性は以下のように設定します:

mp.messaging.incoming.your-channel-name.attribute=value

一部のプロパティには、グローバルに設定可能なエイリアスがあります。

kafka.bootstrap.servers=...

You can also pass any property supported by the underlying Kafka consumer.

For example, to configure the max.poll.records property, use:

mp.messaging.incoming.[channel].max.poll.records=1000

Some consumer client properties are configured to sensible default values:

If not set, reconnect.backoff.max.ms is set to 10000 to avoid high load on disconnection.

If not set, key.deserializer is set to org.apache.kafka.common.serialization.StringDeserializer.

The consumer client.id is configured according to the number of clients to create using mp.messaging.incoming.[channel].partitions property.

  • If a client.id is provided, it is used as-is or suffixed with client index if partitions property is set.

  • If a client.id is not provided, it is generated as kafka-consumer-[channel][-index].

Table 1. 'smallrye-kafka' connector の Incoming 属性
属性 (alias) 説明 必須 デフォルト

bootstrap.servers

(kafka.bootstrap.servers)

Kafka クラスターへの初期接続を確立するために使用する host:port のコンマ区切りリスト。

Type: string

false

localhost:9092

topic

消費/投入されるKafkaトピック。このプロパティも topics のプロパティも設定されていない場合は、チャネル名が使用されます。

Type: string

false

health-enabled

ヘルスレポートが有効(デフォルト)か無効か

Type: boolean

false

true

health-readiness-enabled

レディネスレポートが有効(デフォルト)か無効か

Type: boolean

false

true

health-readiness-topic-verification

deprecated - レディネスチェックでトピックがブローカーに存在することを確認する必要があるかどうか。デフォルトは false です。有効にするには、管理者接続が必要です。非推奨: 代わりに health-topic-verification-enabled を使用します。

Type: boolean

false

health-readiness-timeout

deprecated - レディネスヘルスチェック中に、コネクターはブローカーに接続し、トピックのリストを取得します。この属性は、取得の最大期間 (ミリ秒単位) を指定します。超過した場合、チャネルは準備ができていないと見なされます。非推奨: 代わりに health-topic-verification-timeout を使用します。

Type: long

false

health-topic-verification-enabled

ブローカーにトピックが存在するかどうかをスタートアップおよび レディネスチェックで確認するかどうか。デフォルトは false です。これを有効にするには、admin 接続が必要です。

Type: boolean

false

false

health-topic-verification-timeout

スタートアップおよび Readines チェックの間、コネクタはブローカーに接続し、トピックのリストを取得します。この属性では、検索にかける最大時間 (ms) を指定します。これを超えると、チャネルは準備ができていないとみなされます。

Type: long

false

2000

tracing-enabled

トレースを有効(デフォルト)にするか、無効にするか

Type: boolean

false

true

cloud-events

クラウド イベント サポートを有効(デフォルト)または無効にします。 incoming チャネルで有効にすると、コネクタは受信レコードを分析し、Cloud Event メタデータの作成を試みます。 outgoing 側で有効にすると、メッセージに Cloud Event Metadata が含まれている場合、コネクタはoutgoingメッセージを Cloud Event として送信します。

Type: boolean

false

true

kafka-configuration

このチャネルのデフォルトの Kafka コンシューマー/プロデューサー設定を提供する CDIBean の ID。チャネル設定は、引き続き任意の属性をオーバーライドできます。Bean には、ある種のマップ<String, Object> が必要です。また、識別子を設定するには、@io.smallrye.common.annotation.Identifier 修飾子を使用する必要があります。

Type: string

false

topics

消費されるトピックのコンマ区切りのリスト。 topic または pattern のプロパティとは併用できません。

Type: string

false

pattern

topic プロパティが正規表現であることを示す。 topic プロパティと併用する必要があります。 topics プロパティとは併用できません。

Type: boolean

false

false

key.deserializer

レコードのキーをデシリアライズするために使用されるデシリアライザのクラス名

Type: string

false

org.apache.kafka.common.serialization.StringDeserializer

value.deserializer

レコードの値のデシリアライズに使用されるデシリアライザのクラス名

Type: string

true

fetch.min.bytes

フェッチ・リクエストに対してサーバーが返すべきデータの最小量。デフォルトの1バイトの設定は、1バイトのデータが利用可能になるか、データの到着を待ってフェッチリクエストがタイムアウトするとすぐにフェッチリクエストに応答することを意味します。

Type: int

false

1

[.no-hyphens]#group.id

アプリケーションが所属するコンシューマーグループを識別するための一意の文字列。

設定されていない場合、デフォルトでは、 quarkus.application.name 構成プロパティで設定されたアプリケーション名になります。

それも設定されていない場合は、生成された一意のIDが使用されます。

常に group.id を定義することをお勧めします。自動生成は、開発用の便利機能にすぎません。 このプロパティを ${quarkus.uuid} に設定することで、自動的に生成される一意の ID を明示的に要求することができます。

Type: string

false

enable.auto.commit

この設定を有効にすると、コンシューマーのオフセットは、レコードの実際の処理結果を無視して、基礎となるKafkaクライアントによってバックグラウンドで定期的にコミットされます。この設定を有効にしないで、Reactive Messaging にコミットを任せることをお勧めします。

Type: boolean

false

false

retry

Whether the connection to the broker is re-attempted in case of failure

Type: boolean

false

true

retry-attempts

失敗するまでの最大再接続回数を指定します。-1は無限再試行を意味します。

Type: int

false

-1

retry-max-wait

2回の再接続の間の最大遅延時間(秒)

Type: int

false

30

broadcast

Kafka レコードを複数のコンシューマーにディスパッチする必要があるかどうか

Type: boolean

false

false

auto.offset.reset

Kafka に初期オフセットがない場合の対処方法受け入れられる値は、earliest、latest、none

Type: string

false

latest

failure-strategy

レコードから生成されたメッセージが否定的に確認された(nack)場合に適用する失敗戦略を指定します。値は、 fail (デフォルト)、 ignore 、または dead-letter-queue

Type: string

false

fail

commit-strategy

レコードから生成されたメッセージが確認されたときに適用するコミットストラテジーを指定します。値は、 latestignorethrottled のいずれかです。 enable.auto.commit がtrueであれば、デフォルトは ignore です。そうでなければ throttled です。

Type: string

false

throttled.unprocessed-record-max-age.ms

throttled commit-strategy を使用している場合の、コネクタが不健全であるとマークされるまでの未処理メッセージの最大時間をミリ秒単位で指定します。この属性を 0 に設定すると、このモニタリングが無効になります。

Type: int

false

60000

dead-letter-queue.topic

failure-strategydead-letter-queue が設定されている場合、どのトピックにレコードが送信されるかを示します。デフォルトは dead-letter-topic-$channel

Type: string

false

dead-letter-queue.key.serializer

failure-strategydead-letter-queue が設定されている場合、 使用するキーシリアライザを示します。設定されていない場合は、キーデシリアライザに関連付けられたシリアライザが使用されます。

Type: string

false

dead-letter-queue.value.serializer

failure-strategydead-letter-queue が設定されている場合、使用する値のシリアライザを示します。設定されていない場合は、値のデシリアライザに関連付けられたシリアライザが使用されます。

Type: string

false

partitions

同時に消費されるパーティションの数です。コネクタは、指定された数のKafkaコンシューマーを作成します。これは、対象となるトピックのパーティション数と一致する必要があります。

Type: int

false

1

requests

When partitions is greater than 1, this attribute allows configuring how many records are requested by each consumer every time.

Type: int

false

128

consumer-rebalance-listener.name

io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener を実装する Bean の @Identifier で設定された名前です。設定された場合、このリバランスリスナーはコンシューマーに適用されます。

Type: string

false

key-deserialization-failure-handler

io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler を実装する Beanの @Identifier で設定された名前です。設定されている場合、キーをデシリアライズする際に起こるデシリアライズの失敗は、フォールバック値を再試行または提供することができるこのハンドラに委ねられます。

Type: string

false

value-deserialization-failure-handler

io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler を実装する Beanの @Identifier で設定された名前です。設定されている場合、値をデシリアライズする際に起こるデシリアライズの失敗は、フォールバック値を再試行または提供することができるこのハンドラに委ねられます。

Type: string

false

fail-on-deserialization-failure

デシリアライズ失敗ハンドラーが設定されておらず、デシリアライズ失敗が発生した場合は、失敗を報告し、アプリケーションを異常としてマークします。false に設定され、逆シリアル化の失敗が発生した場合、null 値が転送されます。

Type: boolean

false

true

graceful-shutdown

Whether a graceful shutdown should be attempted when the application terminates.

Type: boolean

false

true

poll-timeout

ミリ秒単位のポーリングタイムアウト。レコードをポーリングする場合、ポーリングは最大でその期間待機してからレコードを返します。デフォルトは 1000ms です

Type: int

false

1000

pause-if-no-requests

アプリケーションがアイテムを要求しないときにポーリングを一時停止し、要求したときに再開する必要があるかどうか。これにより、アプリケーションの容量に基づいてバックプレッシャを実装できます。ポーリングは停止されませんが、一時停止されたときにレコードを取得しないことに注意してください。

Type: boolean

false

true

batch

Kafka レコードがバッチで消費されるかどうか。チャネルインジェクションポイントは、List<Payload> または KafkaRecordBatch<Payload> などの互換性のあるタイプを消費する必要があります。

Type: boolean

false

false

max-queue-size-factor

max.poll.records * max-queue-size-factor を使用して、処理のためにキューに入れられるレコードの最大数を決定する乗数係数。デフォルトは 2 です。batch モードでは、max.poll.records1 と見なされます。

Type: int

false

2

21.2. outgoingチャンネルの設定(Kafkaへの書き込み)

以下の属性は以下のように設定します:

mp.messaging.outgoing.your-channel-name.attribute=value

一部のプロパティには、グローバルに設定可能なエイリアスがあります。

kafka.bootstrap.servers=...

You can also pass any property supported by the underlying Kafka producer.

For example, to configure the max.block.ms property, use:

mp.messaging.incoming.[channel].max.block.ms=10000

Some producer client properties are configured to sensible default values:

If not set, reconnect.backoff.max.ms is set to 10000 to avoid high load on disconnection.

If not set, key.serializer is set to org.apache.kafka.common.serialization.StringSerializer.

If not set, producer client.id is generated as kafka-producer-[channel].

Table 2. Outgoing Attributes of the 'smallrye-kafka' connector
Attribute (alias) Description Mandatory Default

acks

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. Accepted values are: 0, 1, all

Type: string

false

1

bootstrap.servers

(kafka.bootstrap.servers)

A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster.

Type: string

false

localhost:9092

buffer.memory

The total bytes of memory the producer can use to buffer records waiting to be sent to the server.

Type: long

false

33554432

close-timeout

The amount of milliseconds waiting for a graceful shutdown of the Kafka producer

Type: int

false

10000

cloud-events

Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.

Type: boolean

false

true

cloud-events-data-content-type

(cloud-events-default-data-content-type)

Configure the default datacontenttype attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the datacontenttype attribute itself

Type: string

false

cloud-events-data-schema

(cloud-events-default-data-schema)

Configure the default dataschema attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the dataschema attribute itself

Type: string

false

cloud-events-insert-timestamp

(cloud-events-default-timestamp)

Whether the connector should insert automatically the time attribute into the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the time attribute itself

Type: boolean

false

true

cloud-events-mode

The Cloud Event mode (structured or binary (default)). Indicates how are written the cloud events in the outgoing record

Type: string

false

binary

cloud-events-source

(cloud-events-default-source)

Configure the default source attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the source attribute itself

Type: string

false

cloud-events-subject

(cloud-events-default-subject)

Configure the default subject attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the subject attribute itself

Type: string

false

cloud-events-type

(cloud-events-default-type)

Configure the default type attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the type attribute itself

Type: string

false

health-enabled

Whether health reporting is enabled (default) or disabled

Type: boolean

false

true

health-readiness-enabled

Whether readiness health reporting is enabled (default) or disabled

Type: boolean

false

true

health-readiness-timeout

deprecated - During the readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready. Deprecated: Use 'health-topic-verification-timeout' instead.

Type: long

false

health-readiness-topic-verification

deprecated - Whether the readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin connection. Deprecated: Use 'health-topic-verification-enabled' instead.

Type: boolean

false

health-topic-verification-enabled

Whether the startup and readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin client connection.

Type: boolean

false

false

health-topic-verification-timeout

During the startup and readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready.

Type: long

false

2000

kafka-configuration

Identifier of a CDI bean that provides the default Kafka consumer/producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier.

Type: string

false

key

A key to used when writing the record

Type: string

false

key-serialization-failure-handler

The name set in @Identifier of a bean that implements io.smallrye.reactive.messaging.kafka.SerializationFailureHandler. If set, serialization failure happening when serializing keys are delegated to this handler which may provide a fallback value.

Type: string

false

key.serializer

The serializer classname used to serialize the record’s key

Type: string

false

org.apache.kafka.common.serialization.StringSerializer

max-inflight-messages

The maximum number of messages to be written to Kafka concurrently. It limits the number of messages waiting to be written and acknowledged by the broker. You can set this attribute to 0 remove the limit

Type: long

false

1024

merge

Whether the connector should allow multiple upstreams

Type: boolean

false

false

partition

The target partition id. -1 to let the client determine the partition

Type: int

false

-1

propagate-headers

A comma-separating list of incoming record headers to be propagated to the outgoing record

Type: string

false

propagate-record-key

Propagate incoming record key to the outgoing record

Type: boolean

false

false

retries

If set to a positive number, the connector will try to resend any record that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled. If not set, the connector tries to resend any record that failed to be delivered (because of a potentially transient error) during an amount of time configured by delivery.timeout.ms.

Type: long

false

2147483647

topic

The consumed / populated Kafka topic. If neither this property nor the topics properties are set, the channel name is used

Type: string

false

tracing-enabled

Whether tracing is enabled (default) or disabled

Type: boolean

false

true

value-serialization-failure-handler

The name set in @Identifier of a bean that implements io.smallrye.reactive.messaging.kafka.SerializationFailureHandler. If set, serialization failure happening when serializing values are delegated to this handler which may provide a fallback value.

Type: string

false

value.serializer

The serializer classname used to serialize the payload

Type: string

true

waitForWriteCompletion

Whether the client waits for Kafka to acknowledge the written record before acknowledging the message

Type: boolean

false

true

21.3. Kafka Configuration Resolution

Quarkus exposes all Kafka related application properties, prefixed with kafka. or KAFKA_ inside a configuration map with default-kafka-broker name. This configuration is used to establish the connection with the Kafka broker.

In addition to this default configuration, you can configure the name of the Map producer using the kafka-configuration attribute:

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.kafka-configuration=my-configuration

In this case, the connector looks for the Map associated with the my-configuration name. If kafka-configuration is not set, an optional lookup for a Map exposed with the channel name (my-channel in the previous example) is done.

@Produces
@ApplicationScoped
@Identifier("my-configuration")
Map<String, Object> outgoing() {
    return Map.ofEntries(
            Map.entry("value.serializer", ObjectMapperSerializer.class.getName())
    );
}
If kafka-configuration is set and no Map can be found, the deployment fails.

Attribute values are resolved as follows:

  1. the attribute is set directly on the channel configuration (mp.messaging.incoming.my-channel.attribute=value),

  2. if not set, the connector looks for a Map with the channel name or the configured kafka-configuration (if set) and the value is retrieved from that Map

  3. If the resolved Map does not contain the value the default Map is used (exposed with the default-kafka-broker name)

22. Kafkaとの連携 - 一般的なパターン

22.1. HTTPエンドポイントからKafkaへの書き込み

To send messages to Kafka from an HTTP endpoint, inject an Emitter (or a MutinyEmitter) in your endpoint:

package org.acme;

import java.util.concurrent.CompletionStage;

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<String> emitter;          (1)

    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) { (2)
        return emitter.send(payload);                   (3)
    }
}
1 Emitter<String> を注入
2 HTTPメソッドはメッセージがKafkaに書き込まれると、ペイロードを受け取り、CompletionStage の完了を返します。
3 メッセージをKafkaに送信し、 send メソッドは CompletionStage を返却します。

The endpoint sends the passed payload (from a POST HTTP request) to the emitter. The emitter’s channel is mapped to a Kafka topic in the application.properties file:

mp.messaging.outgoing.kafka.connector=smallrye-kafka
mp.messaging.outgoing.kafka.topic=my-topic

The endpoint returns a CompletionStage indicating the asynchronous nature of the method. The emitter.send method returns a CompletionStage<Void> . The returned future is completed when the message has been written to Kafka. If the writing fails, the returned CompletionStage is completed exceptionally.

If the endpoint does not return a CompletionStage, the HTTP response may be written before the message is sent to Kafka, and so failures won’t be reported to the user.

If you need to send a Kafka record, use:

package org.acme;

import java.util.concurrent.CompletionStage;

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import io.smallrye.reactive.messaging.kafka.Record;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<Record<String,String>> emitter;  (1)


    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) {
        return emitter.send(Record.of("my-key", payload));    (2)
    }
}
1 Note the usage of an Emitter<Record<K, V>>
2 Create the record using Record.of(k, v)

22.2. Persisting Kafka messages with Hibernate with Panache

To persist objects received from Kafka into a database, you can use Hibernate with Panache.

If you use Hibernate Reactive, look at Persisting Kafka messages with Hibernate Reactive.

Let’s imagine you receive Fruit objects. For simplicity purposes, our Fruit class is pretty simple:

package org.acme;

import javax.persistence.Entity;

import io.quarkus.hibernate.orm.panache.PanacheEntity;

@Entity
public class Fruit extends PanacheEntity {

    public String name;

}

To consume Fruit instances stored on a Kafka topic, and persist them into a database, you can use the following approach:

package org.acme;

import javax.enterprise.context.ApplicationScoped;
import javax.transaction.Transactional;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.common.annotation.Blocking;

@ApplicationScoped
public class FruitConsumer {

    @Incoming("fruits")                                     (1)
    @Transactional                                          (2)
    public void persistFruits(Fruit fruit) {                (3)
        fruit.persist();                                    (4)
    }
}
1 Configuring the incoming channel. This channel reads from Kafka.
2 As we are writing in a database, we must be in a transaction. This annotation starts a new transaction and commits it when the method returns. Quarkus automatically considers the method as blocking. Indeed, writing to a database using classic Hibernate is blocking. So, Quarkus calls the method on a worker thread you can block (and not an I/O thread).
3 The method receives each Fruit. Note that you would need a deserializer to reconstruct the Fruit instances from the Kafka records.
4 Persist the received fruit object.

As mentioned in <4>, you need a deserializer that can create a Fruit from the record. This can be done using a Jackson deserializer:

package org.acme;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

The associated configuration would be:

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

Check Jackson によるシリアライゼーション for more detail about the usage of Jackson with Kafka. You can also use Avro.

22.3. Persisting Kafka messages with Hibernate Reactive

To persist objects received from Kafka into a database, you can use Hibernate Reactive with Panache.

Let’s imagine you receive Fruit objects. For simplicity purposes, our Fruit class is pretty simple:

package org.acme;

import javax.persistence.Entity;

import io.quarkus.hibernate.reactive.panache.PanacheEntity;  (1)

@Entity
public class Fruit extends PanacheEntity {

    public String name;

}
1 Make sure to use the reactive variant

To consume Fruit instances stored on a Kafka topic, and persist them into a database, you can use the following approach:

package org.acme;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class FruitStore {

    @Inject
    Mutiny.Session session;                    (1)

    @Incoming("in")
    public Uni<Void> consume(Fruit fruit) {
        return session.withTransaction(t -> {  (2)
            return entity.persistAndFlush()    (3)
                    .replaceWithVoid();        (4)
        }).onTermination().call(() -> session.close()); (5)
    }

}
1 Inject the Hibernate Reactive Session
2 Requests a new transaction. The transaction completes when the passed action completes.
3 Persist the entity. It returns a Uni<Fruit>.
4 Switch back to a Uni<Void>.
5 Close the session - this is close the connection with the database. The connection can then be recycled.

Unlike with classic Hibernate, you can’t use @Transactional. Instead, we use session.withTransaction and persist our entity. The map is used to return a Uni<Void> and not a Uni<Fruit>.

You need a deserializer that can create a Fruit from the record. This can be done using a Jackson deserializer:

package org.acme;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

The associated configuration would be:

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

Check Jackson によるシリアライゼーション for more detail about the usage of Jackson with Kafka. You can also use Avro.

22.4. Writing entities managed by Hibernate to Kafka

Let’s imagine the following process:

  1. You receive an HTTP request with a payload,

  2. You create an Hibernate entity instance from this payload,

  3. You persist that entity into a database,

  4. You send the entity to a Kafka topic

If you use Hibernate Reactive, look at Writing entities managed by Hibernate Reactive to Kafka.

Because we write to a database, we must run this method in a transaction. Yet, sending the entity to Kafka happens asynchronously. The operation returns a CompletionStage (or a Uni if you use a MutinyEmitter) reporting when the operation completes. We must be sure that the transaction is still running until the object is written. Otherwise, you may access the object outside the transaction, which is not allowed.

To implement this process, you need the following approach:

package org.acme;

import java.util.concurrent.CompletionStage;

import javax.transaction.Transactional;
import javax.ws.rs.POST;
import javax.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<Fruit> emitter;

    @POST
    @Path("/fruits")
    @Transactional                                                      (1)
    public CompletionStage<Void> storeAndSendToKafka(Fruit fruit) {     (2)
        fruit.persist();
        return emitter.send(fruit);                                     (3)
    }
}
1 As we are writing to the database, make sure we run inside a transaction
2 The method receives the fruit instance to persist. It returns a CompletionStage which is used for the transaction demarcation. The transaction is committed when the return CompletionStage completes. In our case, it’s when the message is written to Kafka.
3 Send the managed instance to Kafka. Make sure we wait for the message to complete before closing the transaction.

22.5. Writing entities managed by Hibernate Reactive to Kafka

To send to Kafka entities managed by Hibernate Reactive, we recommend using:

  • RESTEasy Reactive to serve HTTP requests

  • A MutinyEmitter to send message to a channel, so it can be easily integrated with the Mutiny API exposed by Hibernate Reactive or Hibernate Reactive with Panache.

The following example demonstrates how to receive a payload, store it in the database using Hibernate Reactive with Panache, and send the persisted entity to Kafka:

package org.acme;

import javax.ws.rs.POST;
import javax.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/")
public class ReactiveGreetingResource {

    @Channel("kafka") MutinyEmitter<Fruit> emitter;     (1)

    @POST
    @Path("/fruits")
    public Uni<Void> sendToKafka(Fruit fruit) {         (2)
        return Panache.withTransaction(() ->            (3)
            fruit.<Fruit>persist()
        )
            .chain(f -> emitter.send(f));               (4)
    }
}
1 Inject a MutinyEmitter which exposes a Mutiny API. It simplifies the integration with the Mutiny API exposed by Hibernate Reactive with Panache.
2 The HTTP method receiving the payload returns a Uni<Void>. The HTTP response is written when the operation completes (the entity is persisted and written to Kafka).
3 We need to write the entity into the database in a transaction.
4 Once the persist operation completes, we send the entity to Kafka. The send method returns a Uni<Void>.

22.6. Streaming Kafka topics as server-sent events

Streaming a Kafka topic as server-sent events (SSE) is straightforward:

  1. You inject the channel representing the Kafka topic in your HTTP endpoint

  2. You return that channel as a Publisher or a Multi from the HTTP method

以下のコードはその一例です。

@Channel("fruits")
Multi<Fruit> fruits;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<Fruit> stream() {
    return fruits;
}

Some environment cuts the SSE connection when there is not enough activity. The workaround consists of sending ping messages (or empty objects) periodically.

@Channel("fruits")
Multi<Fruit> fruits;

@Inject
ObjectMapper mapper;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
    return Multi.createBy().merging()
            .streams(
                    fruits.map(this::toJson),
                    emitAPeriodicPing()
            );
}

Multi<String> emitAPeriodicPing() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(10))
            .onItem().transform(x -> "{}");
}

private String toJson(Fruit f) {
    try {
        return mapper.writeValueAsString(f);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}

The workaround is a bit more complex as besides sending the fruits coming from Kafka, we need to send pings periodically. To achieve this we merge the stream coming from Kafka and a periodic stream emitting {} every 10 seconds.

22.7. Chaining Kafka Transactions with Hibernate Reactive transactions

By chaining a Kafka transaction with a Hibernate Reactive transaction you can send records to a Kafka transaction, perform database updates and commit the Kafka transaction only if the database transaction is successful.

The following example demonstrates:

  • Receive a payload by serving HTTP requests using RESTEasy Reactive,

  • Limit concurrency of that HTTP endpoint using Smallrye Fault Tolerance,

  • Start a Kafka transaction and send the payload to Kafka record,

  • Store the payload in the database using Hibernate Reactive with Panache,

  • Commit the Kafka transaction only if the entity is persisted successfully.

package org.acme;

import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.hibernate.reactive.mutiny.Mutiny;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;

@Path("/")
public class FruitProducer {

    @Channel("kafka") KafkaTransactions<Fruit> kafkaTx; (1)

    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1) (2)
    public Uni<Void> post(Fruit fruit) { (3)
        return kafkaTx.withTransaction(emitter -> { (4)
            emitter.send(fruit); (5)
            return Panache.withTransaction(() -> { (6)
                return fruit.<Fruit>persist(); (7)
            });
        }).replaceWithVoid();
    }
}
1 Inject a KafkaTransactions which exposes a Mutiny API. It allows the integration with the Mutiny API exposed by Hibernate Reactive with Panache.
2 Limit the concurrency of the HTTP endpoint to "1", preventing starting multiple transactions at a given time.
3 The HTTP method receiving the payload returns a Uni<Void>. The HTTP response is written when the operation completes (the entity is persisted and Kafka transaction is committed).
4 Begin a Kafka transaction.
5 Send the payload to Kafka inside the Kafka transaction.
6 Persist the entity into the database in a Hibernate Reactive transaction.
7 Once the persist operation completes, and there is no errors, the Kafka transaction is committed. The result is omitted and returned as the HTTP response.

In the previous example the database transaction (inner) will commit followed by the Kafka transaction (outer). If you wish to commit the Kafka transaction first and the database transaction second, you need to nest them in the reverse order.

The next example demonstrates that using the Hibernate Reactive API (without Panache):

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.hibernate.reactive.mutiny.Mutiny;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

@Path("/")
public class FruitProducer {

    @Channel("kafka") KafkaTransactions<Fruit> kafkaTx;

    @Inject Mutiny.SessionFactory sf; (1)

    @POST
    @Path("/fruits")
    @Consumes(MediaType.APPLICATION_JSON)
    @Bulkhead(1)
    public Uni<Void> post(Fruit fruit) {
        Context context = Vertx.currentContext(); (2)
        return sf.withTransaction(session -> (3)
                kafkaTx.withTransaction(emitter -> (4)
                        session.persist(fruit).invoke(() -> emitter.send(fruit)) (5)
                ).emitOn(context::runOnContext) (6)
        );
    }
}
1 Inject the Hibernate Reactive SessionFactory.
2 Capture the caller Vert.x context.
3 Begin a Hibernate Reactive transaction.
4 Begin a Kafka transaction.
5 Persist the payload and send the entity to Kafka.
6 The Kafka transaction terminates on the Kafka producer sender thread. We need to switch to the Vert.x context previously captured in order to terminate the Hibernate Reactive transaction on the same context we started it.

23. ロギング

Kafkaクライアントによって書き込まれるログの量を減らすために、Quarkusは以下のログカテゴリーのレベルを WARNING に設定しています。

  • org.apache.kafka.clients

  • org.apache.kafka.common.utils

  • org.apache.kafka.common.metrics

以下の行を application.properties に追加することで、設定を上書きすることができます。

quarkus.log.category."org.apache.kafka.clients".level=INFO
quarkus.log.category."org.apache.kafka.common.utils".level=INFO
quarkus.log.category."org.apache.kafka.common.metrics".level=INFO

24. Connecting to Managed Kafka clusters

This section explains how to connect to notorious Kafka Cloud Services.

24.1. Azure Event Hub

Azure Event Hub provides an endpoint compatible with Apache Kafka.

Azure Event Hubs for Kafka is not available in the basic tier. You need at least the standard tier to use Kafka. See Azure Event Hubs Pricing to see the other options.

To connect to Azure Event Hub, using the Kafka protocol with TLS, you need the following configuration:

kafka.bootstrap.servers=my-event-hub.servicebus.windows.net:9093 (1)
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ (2)
    username="$ConnectionString" \ (3)
    password="<YOUR.EVENTHUBS.CONNECTION.STRING>"; (4)
1 The port is 9093.
2 You need to use the JAAS PlainLoginModule.
3 The username is the $ConnectionString string.
4 The Event Hub connection string given by Azure.

Replace <YOUR.EVENTHUBS.CONNECTION.STRING> with the connection string for your Event Hubs namespace. For instructions on getting the connection string, see Get an Event Hubs connection string. The result would be something like:

kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://my-event-hub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

This configuration can be global (as above), or set in the channel configuration:

mp.messaging.incoming.$channel.bootstrap.servers=my-event-hub.servicebus.windows.net:9093
mp.messaging.incoming.$channel.security.protocol=SASL_SSL
mp.messaging.incoming.$channel.sasl.mechanism=PLAIN
mp.messaging.incoming.$channel.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://my-event-hub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...";

24.2. Red Hat OpenShift Streams for Apache Kafka

Red Hat OpenShift Streams for Apache Kafka provides managed Kafka brokers. First, follow the instructions from Getting started with the rhoas CLI for Red Hat OpenShift Streams for Apache Kafka to create your Kafka broker instance. Make sure you copied the client id and client secret associated with the ServiceAccount you created.

Then, you can configure the Quarkus application to connect to the broker as follows:

kafka.bootstrap.servers=<connection url> (1)
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="${KAFKA_USERNAME}" \ (2)
  password="${KAFKA_PASSWORD}"; (3)
1 The connection string, given on the admin console, such as demo-c—​bjsv-ldd-cvavkc-a.bf2.kafka.rhcloud.com:443
2 The kafka username (the client id from the service account)
3 the kafka password (the client secret from the service account)
In general, these properties are prefixed using %prod to enable them only when running in production mode.
As explained in Getting started with the rhoas CLI for Red Hat OpenShift Streams for Apache Kafka, to use Red Hat OpenShift Streams for Apache Kafka, you must create the topic beforehand, create a Service Account, and provide permissions to read and write to your topic from that service account. The authentication data (client id and secret) relates to the service account, which means you can implement fine-grain permissions and restrict access to the topic.

When using Kubernetes, it is recommended to set the client id and secret in a Kubernetes secret:

apiVersion: v1
kind: Secret
metadata:
  name: kafka-credentials
stringData:
  KAFKA_USERNAME: "..."
  KAFKA_PASSWORD: "..."

To allow your Quarkus application to use that secret, add the following line to the application.properties file:

%prod.quarkus.openshift.env.secrets=kafka-credentials

24.2.1. Red Hat OpenShift Service Registry

Red Hat OpenShift Service Registry provides fully managed service registry for handling Kafka schemas.

You can follow the instructions from Getting started with Red Hat OpenShift Service Registry, or use the rhoas CLI to create a new service registry instance:

rhoas service-registry create --name my-schema-registry

Make sure to note the Registry URL of the instance created. For authentication, you can use the same ServiceAccount you created previously. You need to make sure that it has the necessary permissions to access the service registry.

For example, using the rhoas CLI, you can grant the MANAGER role to the service account:

rhoas service-registry role add --role manager --service-account [SERVICE_ACCOUNT_CLIENT_ID]

Then, you can configure the Quarkus application to connect to the schema registry as follows:

mp.messaging.connector.smallrye-kafka.apicurio.registry.url=${RHOAS_SERVICE_REGISTRY_URL} (1)
mp.messaging.connector.smallrye-kafka.apicurio.auth.service.token.endpoint=${RHOAS_OAUTH_TOKEN_ENDPOINT} (2)
mp.messaging.connector.smallrye-kafka.apicurio.auth.client.id=${RHOAS_CLIENT_ID} (3)
mp.messaging.connector.smallrye-kafka.apicurio.auth.client.secret=${RHOAS_CLIENT_ID} (4)
1 The service registry URL, given on the admin console, such as https://bu98.serviceregistry.rhcloud.com/t/0e95af2c-6e11-475e-82ee-f13bd782df24/apis/registry/v2
2 The OAuth token endpoint URL, such as https://identity.api.openshift.com/auth/realms/rhoas/protocol/openid-connect/token
3 The client id (from the service account)
4 The client secret (from the service account)

24.2.2. Binding Red Hat OpenShift managed services to Quarkus application using the Service Binding Operator

If your Quarkus application is deployed on a Kubernetes or OpenShift cluster with Service Binding Operator and OpenShift Application Services operators installed, configurations necessary to access Red Hat OpenShift Streams for Apache Kafka and Service Registry can be injected to the application using Kubernetes Service Binding.

In order to set up the Service Binding, you need first to connect OpenShift managed services to your cluster. For an OpenShift cluster you can follow the instructions from Connecting a Kafka and Service Registry instance to your OpenShift cluster.

Once you’ve connected your cluster with the RHOAS Kafka and Service Registry instances, make sure you’ve granted necessary permissions to the newly created service account.

Then, using the Kubernetes Service Binding extension, you can configure the Quarkus application to generate ServiceBinding resources for those services:

quarkus.kubernetes-service-binding.detect-binding-resources=true

quarkus.kubernetes-service-binding.services.kafka.api-version=rhoas.redhat.com/v1alpha1
quarkus.kubernetes-service-binding.services.kafka.kind=KafkaConnection
quarkus.kubernetes-service-binding.services.kafka.name=my-kafka

quarkus.kubernetes-service-binding.services.serviceregistry.api-version=rhoas.redhat.com/v1alpha1
quarkus.kubernetes-service-binding.services.serviceregistry.kind=ServiceRegistryConnection
quarkus.kubernetes-service-binding.services.serviceregistry.name=my-schema-registry

For this example Quarkus build will generate the following ServiceBinding resources:

apiVersion: binding.operators.coreos.com/v1alpha1
kind: ServiceBinding
metadata:
  name: my-app-kafka
spec:
  application:
    group: apps.openshift.io
    name: my-app
    version: v1
    kind: DeploymentConfig
  services:
    - group: rhoas.redhat.com
      version: v1alpha1
      kind: KafkaConnection
      name: my-kafka
  detectBindingResources: true
  bindAsFiles: true
---
apiVersion: binding.operators.coreos.com/v1alpha1
kind: ServiceBinding
metadata:
  name: my-app-serviceregistry
spec:
  application:
    group: apps.openshift.io
    name: my-app
    version: v1
    kind: DeploymentConfig
  services:
    - group: rhoas.redhat.com
      version: v1alpha1
      kind: ServiceRegistryConnection
      name: my-schema-registry
  detectBindingResources: true
  bindAsFiles: true

You can follow Deploying to OpenShift to deploy your application, including generated ServiceBinding resources. The configuration properties necessary to access the Kafka and Schema Registry instances will be injected to the application automatically at deployment.

25. さらに詳しく

このガイドでは、Quarkus を使用して Kafka とやりとりする方法を示しました。SmallRye Reactive Messaging を利用して、データストリーミングアプリケーションを構築します。

詳細は、Quarkus で使用される実装 SmallRye Reactive Messaging のドキュメントを確認してください。