The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.
このページを編集

Quarkus Messagingエクステンション

Event-driven messaging systems have become the backbone of most modern applications, enabling the building of message-driven microservices or complex data streaming pipelines.

Quarkus offers a comprehensive suite of messaging extensions designed to synchronize with leading messaging technologies effortlessly. This empowers developers to concentrate on crafting the core application logic, liberating them from the necessity to delve into the complexities of individual APIs and messaging infrastructures.

Quarkus Messaging

This page focuses on common features and the development model for all messaging extensions.

Some of these extensions are maintained in the core Quarkus repository:

Some extensions are contributed and maintained by the community:

Other connectors, such as the JMS Connector or the Google PubSub Connector, do not benefit from the same level of integration and require more manual configuration to set up.

On the other hand, some messaging-related extensions propose low-level provider-specific integrations. The level of support covered on this page DOES NOT involve these low-level extensions. A non-exhaustive list of this kind of extension are the following:

Quarkus Messaging Development Model

Quarkus simplifies message-driven application development by establishing a uniform model for publishing, consuming, and processing messages, regardless of whether the underlying broker technology uses message queuing or event streaming. Built upon the MicroProfile Reactive Messaging specification, Quarkus Messaging extensions ensure seamless integration with these technologies. Importantly, proficiency in reactive programming is NOT a prerequisite for leveraging these capabilities.

The Reactive Messaging specification defines a CDI-based programming model for implementing event-driven and message-driven applications. Using a small set of annotations, CDI beans become building blocks for implementing interactions with message brokers. These interactions happen through channels where application components read and write messages.

Channels are identified by a unique name and declared using a set of annotations.

@Incoming and @Outgoing annotations

@Incoming and @Outgoing method annotations define channels allowing to consume messages from and produce messages to the message broker:

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MessageProcessingBean {

   @Incoming("source")
   @Outgoing("sink")
   public String process(String consumedPayload) {
       // Process the incoming message payload and return an updated payload
       return consumedPayload.toUpperCase();
   }

}

@Outgoing can be used by itself on a method to generate messages:

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MessageGeneratorBean {

   @Outgoing("sink")
   public Multi<String> generate() {
       return Multi.createFrom().items("a", "b", "c");
   }

}

@Incoming can be used by itself to consume messages:

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class MessageProcessingBean {

   @Incoming("source")
   public void process(String consumedPayload) {
       // process the payload
       consumedPayload.toUpperCase();
   }

}

Note that you should not call methods annotated with @Incoming and/or @Outgoing directly from your code. They are invoked by the framework. Having user code invoking them would not have the expected outcome.

You can read more on supported method signatures in the SmallRye Reactive Messaging – Supported signatures.

Emitters and @Channel annotation

An application often needs to combine messaging with other parts of the application, ex. produce messages from HTTP endpoints, or stream consumed messages as a response.

To send messages from imperative code to a specific channel, you need to inject an Emitter object identified by the @Channel annotation:

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@ApplicationScoped
@Path("/")
public class MyImperativeBean {

   @Channel("prices")
   Emitter<Double> emitter;

   @GET
   @Path("/send")
   public CompletionStage<Void> send(double d) {
       return emitter.send(d);
   }
}

The @Channel annotation lets you indicate to which channel you will send your payloads or messages. The Emitter allows buffering messages sent to the channel.

For more control, using Mutiny APIs, you can use the MutinyEmitter emitter interface:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.MutinyEmitter;

@ApplicationScoped
@Path("/")
public class MyImperativeBean {

   @Channel("prices")
   MutinyEmitter<Double> emitter;

   @GET
   @Path("/send")
   public void send(double d) {
       emitter.sendAndAwait(d);
   }

}

The @Channel annotation can also be used to inject the stream of messages from an incoming channel:

import org.eclipse.microprofile.reactive.messaging.Channel;

@ApplicationScoped
@Path("/")
public class SseResource {

    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }

}

When consuming messages with @Channel, the application code is responsible for subscribing to the stream. In the example above, the Quarkus REST (formerly RESTEasy Reactive) endpoint handles that for you.

You can read more on the emitters and channels in the SmallRye Reactive Messaging – Emitter and Channels documentation.

Messages and Metadata

A Message is an envelope around a payload. In the examples above only payloads were used, but every payload is wrapped around a Message internally in Quarkus Messaging.

The Message<T> interface associates a payload of type <T> with Metadata, a set of arbitrary objects and asynchronous actions for acknowledgement (ack) and negative acknowledgement (nack).

import org.eclipse.microprofile.reactive.messaging.Message;

@Incoming("source")
@Outgoing("sink")
public Message<String> process(Message<String> consumed) {
    // Access the metadata
    MyMetadata my = consumed.getMetadata(MyMetadata.class).get();
    // Process the incoming message and return an updated message
    return consumed.withPayload(consumed.getPayload().toUpperCase());
}

A message is acknowledged back to the broker when its processing or reception has been successful. Acknowledgements between messages are chained, meaning that when processing a message, the acknowledgement of an outgoing message triggers the acknowledgement of incoming message(s). In most cases, acks and nacks are managed for you and connectors allow you to configure different strategies per channel. So, you usually don’t need to interact with the Message interface directly. Only advanced use cases require dealing with the Message directly.

Accessing the Metadata, on the other hand, can be practical in many cases. Connectors add specific metadata objects to the message to give access to the message headers, properties, and other connector-specific information. You do not need to interact with the Message interface to access connector-specific metadata. You can simply inject the metadata object as a method parameter after the payload parameter:

import org.eclipse.microprofile.reactive.messaging.Metadata;
@Incoming("source")
@Outgoing("sink")
public String process(String payload, MyMetadata my) {
    // Access the metadata
    Map<String, Object> props = my.getProperties();
    // Process the payload and return an updated payload
    return payload.toUpperCase();
}

Depending on the connector, payload types available to consume in processing methods differ. You can implement a custom MessageConverter to transform the payload to a type that is accepted by your application.

Channel configuration

Channel attributes can be configured using the mp.messaging.incoming.<channel-name> and mp.messaging.outgoing.<channel-name> configuration properties.

For example, to configure the Kafka connector to consume messages from the my-topic topic with a custom deserializer:

mp.messaging.incoming.source.connector=smallrye-kafka
mp.messaging.incoming.source.topic=my-topic
mp.messaging.incoming.source.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.source.auto.offset.reset=earliest

The connector attribute is required for all channels and specifies the connector to use. You can omit this configuration if you have a single connector on your classpath, as Quarkus will automatically select the connector.

Global channel attributes can be configured using the connector name:

mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092

Connector-specific attributes are listed in connector documentation.

Channel wiring and Messaging patterns

At startup time, Quarkus analyzes declared channels to wire them together and verify that all channels are connected. Concretely, each channel creates a reactive stream of messages connected to another channel’s reactive stream of messages. Adhering to the reactive stream protocol, the back-pressure mechanism is enforced between channels, allowing to control application resource usage and not over-commit and overloading part of the system.

On the flip side it is NOT possible to create new channels programmatically at runtime. There are, however, many patterns that let you implement most, if not all, messaging and integration use cases:

Some messaging technologies allow consumers to subscribe to a set of topics or queues, and producers to send messages to a specific topic on message basis. If you are sure you need to configure and create clients dynamically at runtime, you should consider using the low-level clients directly.

Internal Channels

In some use cases, it is convenient to use messaging patterns to transfer messages inside the same application. When you don’t connect a channel to a messaging backend, i.e. a connector, everything happens internally to the application, and the streams are created by chaining methods together. Each chain is still a reactive stream and enforces the back-pressure protocol.

The framework verifies that the producer/consumer chain is complete, meaning that if the application writes messages into an internal channel (using a method with only @Outgoing, or an Emitter), it must also consume the messages from within the application (using a method with only @Incoming or using an unmanaged stream).

Enable/Disable channels

All defined channels are enabled by default, but it is possible to disable a channel with the configuration:

mp.messaging.incoming.my-channel.enabled=false

This can be used alongside Quarkus build profiles to enable/disable channels based on some build-time condition, such as the the target environment. You need to make sure of two things when disabling a channel:

  • the disabled channel usage is located in a bean that can be filtered out at build time,

  • that without the channel, the remaining channels still work correctly.

@ApplicationScoped
@IfBuildProfile("my-profile")
public class MyProfileBean {

    @Outgoing("my-channel")
    public Multi<String> generate() {
        return Multi.createFrom().items("a", "b", "c");
    }

}

Multiple Outgoings and @Broadcast

By default, messages transmitted in a channel are only dispatched to a single consumer. Having multiple consumers is considered an error and is reported at deployment time.

The @Broadcast annotation changes this behavior and indicates that messages transiting in the channel are dispatched to all the consumers. @Broadcast must be used with the @Outgoing annotation:

import org.eclipse.microprofile.reactive.messaging.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@Incoming("in")
@Outgoing("out")
@Broadcast
public int increment(int i) {
    return i + 1;
}

@Incoming("out")
public void consume1(int i) {
    //...
}

@Incoming("out")
public void consume2(int i) {
    //...
}

Similarly to @Broadcast, you can use @Outgoing annotation multiple times on the same method to indicate that the method produces messages to multiple channels:

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
public String process(String s) {
    // send messages from channel in to both channels out1 and out2
    return s.toUpperCase();
}

Using Multiple Outgoings can be useful for implementing fan-out patterns, in which a single message is processed by multiple target channels.

You can selectively dispatch messages to multiple outgoings by returning Targeted from the processing method:

@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
@Outgoing("out3")
public Targeted process(double price) {
    // send messages from channel-in to both channel-out1 and channel-out2
    Targeted targeted = Targeted.of("out1", "Price: " + price, "out2", "Quote: " + price);
    if (price > 90.0) {
        return targeted.with("out3", price);
    }
    return targeted;
}

Multiple Incomings and @Merge

By default, a single producer can transmit messages in a channel. Having multiple producers is considered erroneous and is reported at deployment time. The @Merge annotation changes this behavior and indicates that a channel can have multiple producers. @Merge must be used with the @Incoming annotation:

@Incoming("in1")
@Outgoing("out")
public int increment(int i) {
    return i + 1;
}

@Incoming("in2")
@Outgoing("out")
public int multiply(int i) {
    return i * 2;
}

@Incoming("out")
@Merge
public void getAll(int i) {
    //...
}

Similarly to @Merge, you can use @Incoming annotation multiple times on the same method to indicate that the method consumes messages from multiple channels:

@Incoming("in1")
@Incoming("in2")
public String process(String s) {
    // get messages from channel-1 and channel-2
    return s.toUpperCase();
}

Stream Processing

In some advanced scenarios, you can manipulate directly the stream of messages instead of each individual message.

Using Mutiny APIs in incoming and outgoing signatures allow you to process the stream of messages:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class StreamProcessor {

    @Incoming("source")
    @Outgoing("sink")
    public Multi<String> process(Multi<String> in) {
        return in.map(String::toUpperCase);
    }

}

Execution Model

Quarkus Messaging sits on top of the reactive engine of Quarkus and leverages Eclipse Vert.x to dispatch messages for processing. It supports three execution modes:

  • Event-loop, where messages are dispatched on the Vert.x I/O thread. Remember that you should not perform blocking operations on the event loop.

  • Worker-threads, where messages are dispatched on a worker thread pool.

  • Virtual-threads, where messages are dispatched on a virtual thread (requires Java 21+). As virtual threads are not pooled, a new virtual thread is created for each message. Please refer to the dedicated Quarkus Virtual Thread support guide for more information.

Quarkus chooses the default execution mode based on the method signature. If the method signature is synchronous, messages are dispatched on worker threads otherwise it defaults to event-loop:

メソッドシグネチャ Default execution mode

@Incoming("source") void process(String payload)

Worker-threads

@Incoming("source") Uni<Void> process(String payload)

Event-loop

@Incoming("source") CompletionStage<Void> process(Message<String> message)

Event-loop

@Incoming("source") @Outgoing("sink") Multi<R> process(Multi<T> in)

Stream-processing methods are executed at startup, then each message is dispatched on event loop.

Fine-grained control over the execution model is possible using annotations:

  • @Blocking will force the method to be executed on a worker thread pool. The default pool of worker threads is shared between all channels. Using @Blocking("my-custom-pool") you can configure channels with a custom thread pool. The configuration property smallrye.messaging.worker.my-custom-pool.max-concurrency specifies the maximum number of threads in the pool. You can read more on the blocking processing in SmallRye Reactive Messaging documentation.

  • @NonBlocking will force the method to be executed on the event-loop thread.

  • @RunOnVirtualThread will force the method to be executed on a virtual thread. To leverage the lightweight nature of virtual threads, the default maximum concurrency for methods annotated with @RunOnVirtualThread is 1024. This can be changed by setting the smallrye.messaging.worker.<virtual-thread>.max-concurrency configuration property or using together with the @Blocking("my-custom-pool") annotation.

The presence of @Transactional annotation implies blocking execution.

In messaging applications, produced and consumed messages constitute an ordered stream of events, either enforced by the broker (inside a topic or a queue) or by the order of reception and emission in the application. To preserve this order, Quarkus Messaging dispatches messages sequentially by default. You can override this behavior by using @Blocking(ordered = false) or @RunOnVirtualThread annotation.

Incoming Channel Concurrency

Some connectors support configuring the concurrency level of incoming channels.

mp.messaging.incoming.my-channel.concurrency=4

This creates four copies of the incoming channel under the hood, wiring them to the same processing method. Depending on the broker technology, this can be useful to increase the application’s throughput by processing multiple messages concurrently while still preserving the partial order of messages received in different copies. This is the case, for example, for Kafka, where multiple consumers can consume different topic partitions.

ヘルスチェック

Together with the Smallrye Health extension, Quarkus Messaging extensions provide health check support per channel. The implementation of startup, readiness and liveness checks depends on the connector. Some connectors allow configuring the health check behavior or disabling them completely or per channel.

Channel health checks can be disabled using quarkus.messaging.health.<channel-name>.enabled or per health check type, ex. quarkus.messaging.health.<channel-name>.liveness.enabled.

Setting the quarkus.messaging.health.enabled configuration property to false completely disables the messaging health checks.

Observability

Micrometerメトリクス

Quarkus Messaging extensions provide simple but useful metrics to monitor the health of the messaging system. The Micrometer extension exposes these metrics.

The following metrics can be gathered per channel, identified with the channel tag:

  • quarkus.messaging.message.count : The number of messages produced or received

  • quarkus.messaging.message.acks : The number of messages processed successfully

  • quarkus.messaging.message.failures : The number of messages processed with failures

  • quarkus.messaging.message.duration : The duration of the message processing

For backwards compatibility reasons, channel metrics are not enabled by default and can be enabled with: smallrye.messaging.observation.enabled=true.

OpenTelemetry Tracing

Some Quarkus Messaging connectors integrate out-of-the-box with OpenTelemetry Tracing. When the OpenTelemetry extension is present, outgoing messages propagate the current tracing span. On incoming channels, if a received message contains tracing information, the message processing inherits the message span as parent.

You can disable tracing for a specific channel using the following configuration:

mp.messaging.incoming.data.tracing-enabled=false

テスト

Testing with Dev Services

Most Quarkus Messaging extensions provide a Dev Service to simplify the development and testing of applications. The Dev Service creates a broker instance configured to work out-of-the-box with the Quarkus Messaging extension.

During testing Quarkus creates a separate brok er instance to run the tests against it.

You can read more about Dev Services in the Dev Services guide, including a list of Dev Services provided by platform extensions.

Testing with InMemoryConnector

It can be useful to test the application without starting a broker. To achieve this, you can switch the channels managed by a connector to in-memory.

このアプローチは、JVM テストでのみ機能します。インジェクションには対応していないため、ネイティブテストには使用できません。

Let’s say we want to test the following sample application:

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

@ApplicationScoped
public class MyMessagingApplication {

    @Inject
    @Channel("words-out")
    Emitter<String> emitter;

    public void sendMessage(String out) {
        emitter.send(out);
    }

    @Incoming("words-in")
    @Outgoing("uppercase")
    public Message<String> toUpperCase(Message<String> message) {
        return message.withPayload(message.getPayload().toUpperCase());
    }

}

まず、以下のテスト依存関係をアプリケーションに追加します。

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-reactive-messaging-in-memory</artifactId>
    <scope>test</scope>
</dependency>
build.gradle
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")

そして、以下のように Quarkus Test Resource を作成します。

public class InMemoryConnectorLifecycleManager implements QuarkusTestResourceLifecycleManager {

    @Override
    public Map<String, String> start() {
        Map<String, String> env = new HashMap<>();
        Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("words-in");   (1)
        Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("uppercase");  (2)
        Map<String, String> props3 = InMemoryConnector.switchOutgoingChannelsToInMemory("words-out");  (3)
        env.putAll(props1);
        env.putAll(props2);
        env.putAll(props3);
        return env;  (4)
    }

    @Override
    public void stop() {
        InMemoryConnector.clear();  (5)
    }
}
1 Switch the incoming channel words-in (consumed messages) to in-memory.
2 Switch the outgoing channel words-out (produced messages) to in-memory.
3 Switch the outgoing channel uppercase (processed messages) to in-memory.
4 インメモリーチャネルを使用するためのアプリケーション設定に必要なすべてのプロパティを含む Map をビルドして返します。
5 テストが停止したら、InMemoryConnector をクリアします (受信したメッセージと送信したメッセージをすべて破棄してください)。

Create a @QuarkusTest using the test resource created above:

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import io.smallrye.reactive.messaging.memory.InMemorySink;
import io.smallrye.reactive.messaging.memory.InMemorySource;

import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.junit.jupiter.api.Test;

import jakarta.inject.Inject;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.awaitility.Awaitility.await;

@QuarkusTest
@QuarkusTestResource(InMemoryConnectorLifecycleManager.class)
class MyMessagingApplicationTest {

    @Inject
    @Connector("smallrye-in-memory")
    InMemoryConnector connector; (1)

    @Inject
    MyMessagingApplication app;

    @Test
    void test() {
        InMemorySink<String> wordsOut = connector.sink("words-out"); (2)
        InMemorySource<String> wordsIn = connector.source("words-in"); (3)
        InMemorySink<String> uppercaseOut = connector.sink("uppercase"); (4)

        app.sendMessage("Hello"); (5)
        assertEquals("Hello", wordsOut.received().get(0).getPayload()); (6)

        wordsIn.send("Bonjour"); (7)
        await().untilAsserted(() -> assertEquals("BONJOUR", uppercaseOut.received().get(0).getPayload())); (8)
    }
}
1 Inject the in-memory connector in your test class, using the @Connector or @Any qualifier.
2 Retrieve the outgoing channel (words-out) - the channel must have been switched to in-memory in the test resource.
3 Retrieve the incoming channel (words-in)
4 Retrieve the outgoing channel (uppercase)
5 Use the injected application bean to call sendMessage method to send a message using the emitter with the channel words-out.
6 Use the received method on words-out in-memory channel to check the message produced by the application.
7 Use the send mwthod on words-in in-memory channel to send a message. The application will process this message and send a message to uppercase channel.
8 Use the received method on uppercase channel to check the messages produced by the application.

In-memory connector is solely intended for testing purposes. There are some caveats to consider when using the in-memory connector:

  • The in-memory connector only transmits objects (payloads or configured messages) sent using the InMemorySource#send method. Messages received by the application methods won’t contain connector-specific metadata.

  • By default, in-memory channels dispatch messages on the caller thread of the InMemorySource#send method, which would be the main thread in unit tests. However, most of the other connectors handle context propagation dispatching messages on separate duplicated Vert.x contexts.

The quarkus-test-vertx dependency provides the @io.quarkus.test.vertx.RunOnVertxContext annotation, which when used on a test method, executes the test on a Vert.x context.

If your tests are dependent on context propagation, you can configure the in-memory connector channels with run-on-vertx-context attribute to dispatch events, including messages and acknowledgements, on a Vert.x context. Alternatively you can switch this behaviour using the InMemorySource#runOnVertxContext method.

さらに詳しく

This guide shows the general principles of Quarkus Messaging extensions.

If you want to go further, you can check the SmallRye Reactive Messaging documentation, which has in-depth documentation for each of these concepts and more.

関連コンテンツ