The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Mutiny - Flow control and Back-pressure

Reactive Programming offers an elegant, flexible, and powerful way to write asynchronous code. It supports failure handling, both sequential and parallel operation composition, and plenty of operators. Reactive programming promotes data streams as a primary construct, and your code is observing streams and reacting to signals.

However, using data streams as primary constructs does not come without some issues. One of the main problems is the need for flow control.

The producer/consumer problem

Let’s imagine a fast producer and a slow consumer. The producer sends events too quickly for the consumer that can’t keep up. That phenomenon is remarkably well illustrated in the following picture.

Let’s see some code. Imagine a producer emitting an event every ten milliseconds, while the consumer can only consume one per second.

Multi.createFrom().ticks().every(Duration.ofMillis(10))
    .emitOn(Infrastructure.getDefaultExecutor())
    .onItem().transform(BackPressureExample::canOnlyConsumeOneItemPerSecond)
    .subscribe().with(
        item -> System.out.println("Got item: " + item),
        failure -> System.out.println("Got failure: " + failure)
);

If you run that code, you will see that the subscriber gets a MissingBackPressureFailure, indicating that the downstream cannot keep up:

Got item: 0
Got failure: io.smallrye.mutiny.subscription.BackPressureFailure: Could not emit tick 16 due to lack of requests
emitOn

In the previous code, you may wonder about the emitOn. It’s a Mutiny operator allowing controlling on which thread the Subscriber receives the events. Back-pressure is often required when multiple threads are involved, as in a single thread approach, blocking the thread would block the source, which may have dramatic consequences.

So, what can we do to handle this overflow?

Buffering items

The first natural solution uses buffers. The consumer can buffer the events, so it does not fail:

back pressure buffer
Figure 3. Buffering to avoid overwhelming downstream consumers

Using a buffer allows handling small bumps, but it’s not a long term solution. If we update the code to use a large buffer, the consumer can handle more events but eventually fails:

Multi.createFrom().ticks().every(Duration.ofMillis(10))
    .onOverflow().buffer(250)
    .emitOn(Infrastructure.getDefaultExecutor())
    .onItem().transform(BufferingExample::canOnlyConsumeOneItemPerSecond)
    .subscribe().with(
        item -> System.out.println("Got item: " + item),
        failure -> System.out.println("Got failure: " + failure)
);
Got item: 0
Got item: 1
Got item: 2
Got failure: io.smallrye.mutiny.subscription.BackPressureFailure: Buffer is full due to lack of downstream consumption

You can imagine increasing the buffer’s size, but it’s hard to anticipate the optimal value. What about unbounded buffers? In general, that’s a terrible idea as you may run out of memory.

Dropping items

Another solution consists of dropping items:

back pressure drop
Figure 4. Dropping items to avoid overwhelming downstream consumers

You can drop the newest received items or oldest ones:

Multi.createFrom().ticks().every(Duration.ofMillis(10))
    .onOverflow().drop(x -> System.out.println("Dropping item " + x))
    .emitOn(Infrastructure.getDefaultExecutor())
    .onItem().transform(DropExample::canOnlyConsumeOneItemPerSecond)
    .transform().byTakingFirstItems(10)
    .subscribe().with(
        item -> System.out.println("Got item: " + item),
        failure -> System.out.println("Got failure: " + failure)
);
// ....
Dropping item 997
Dropping item 998
Dropping item 999
Dropping item 1000
Dropping item 1001
Dropping item 1002
Dropping item 1003
Got item: 9

Dropping items provides a sustainable solution to our problem, but we are losing events! As we can see in the previous output, we may drop the high majority of items. In many cases, this is not acceptable.

So, we need another solution, a solution where the overall pace is adjusted to satisfy the pipeline’s slowest element. We need flow control.

What the heck is Back-Pressure?

You may have seen this term many times, and often associated with Reactive. In mechanics, back-pressure is a way to control the flow of fluid through pipes, leading to a pressure drop. That control can use reducers or bends. While very interesting, if you are a plumber, it’s not clear how it can help us here?

We can see our streams as a flow of fluid, and the set of stages (operator or subscriber) forms a pipe. We want to make the fluid flows as frictionless as possible without swirls and waves.

An interesting characteristic of fluid mechanics is how a downstream reduction of the throughput affects the upstream. Essentially, that’s what we need: a way for the downstream operators and subscribers to reduce the throughput, not only locally but also upstream.

Don’t be mistaken; back-pressure is not something new in the IT world and is not limited to Reactive. One of the most brilliant usages of back-pressure is in TCP. A reader, receiving data, can block the writer, on the other side of the wire, if it does not read the sent data. That way, the reader is never overwhelmed. But, the consequence need to be understood: blocking the writer may not be without side-effects.

There are other back-pressure implementations. AMQP uses a credit-based flow control, where you can only send if you have enough credit. Eclipse Vert.x back-pressure is based on pause/resume, where a consumer can pause the upstream until it catches up and resume it once back on track.

Introducing Reactive Streams

Let’s now focus on another back-pressure protocol: Reactive Streams. It defines an asynchronous and back-pressure protocol suiting to our fast producer/slow consumer problem. With Reactive Streams, the consumer, named Subscriber, requests items from the producer, named Publisher. The Publisher cannot send more than the requested amount of items:

800

When the items are received and processed, the consumer can request more items, and so on. Thus, the consumer controls the flow.

back pressure flow control
Figure 6. Using flow control to avoid overwhelming consumers

Reactive Streams entities

To implement that protocol, Reactive Streams define a set of entities. First, a Subscriber is a consumer. It subscribes to a Publisher, producing items. Then, the Publisher sends, asynchronously, a Subscription object to the Subscriber. This Subscription is a contract. With this Subscription, the Subscriber can request items and cancels the subscription when it does not want any more items.

back pressure sequence
Figure 7. Example of interactions between a Subscriber and a Publisher

A Publisher cannot send more items than requested to the Subscriber, and the Subscriber can request more items at any time.

It is essential to understand that the requests and emissions are not necessarily happening synchronously. A Subscriber can request three items, and the Publisher will send one by one when they are available.

Reactive Streams introduces another entity named Processor. A Processor is a Subscriber and a Publisher simultaneously. In other words, it’s a link in our pipeline:

pipeline with processor
Figure 8. Example of interactions between a Subscriber, a Processor and a Publisher

A Subscriber calls subscribe on a Processor. Before receiving a Subscription, the Processor subscribes to its own upstream (the Publisher in the picture above). When that upstream provides a Subscription to our Processor, it can give a Subscription to the Subscriber. All these interactions are asynchronous. When this handshake completes, the Subscriber can start requesting items. The Processor is responsible for mediating the Subscriber requests with its upstream. For example, as illustrated in the picture above, if the Subscriber requires two items, the Processor also requests two items to its own upstream. Of course, depending on the Processor code, it may not be that simple. What’s fundamental is that each Publisher and Processor enforces the flowing requests never to overload downstream Subscribers.

Be warned; it’s a trap!

If you look at the Reactive Streams API, you will find it straightforward. Be warned; it’s a trap! Behind this apparent simplicity, implementing Reactive Streams entities yourself is a nightmare. Reactive Streams come with a broad set of rules and a strict TCK.

But, no worries, Mutiny implements the Reactive Streams protocol for you. In other words, when using Multi, you are using a Publisher following the Reactive Streams protocol. All the subscription handshakes and requests negotiations are done for you.

Looking under the hood

I knew it! You are curious! You can use onSubscribe().invoke() and onRequest().invoke() to check what’s going on:

Multi.createFrom().range(0, 10)
    .onSubscribe().invoke(sub -> System.out.println("Received subscription: " + sub))
    .onRequest().invoke(req -> System.out.println("Got a request: " + req))
    .transform().byFilteringItemsWith(i -> i % 2 == 0)
    .onItem().transform(i -> i * 100)
    .subscribe().with(
            i -> System.out.println("i: " + i)
);

You can also go a step further and not only observing but also controlling the flow yourself:

Multi.createFrom().range(0, 10)
    .onSubscribe().invoke(sub -> System.out.println("Received subscription: " + sub))
    .onRequest().invoke(req -> System.out.println("Got a request: " + req))
    .onItem().transform(i -> i * 100)
    .subscribe().with(
            subscription -> {
                // Got the subscription
                upstream.set(subscription);
                subscription.request(1);
            },
            i -> {
                System.out.println("i: " + i);
                upstream.get().request(1);
            },
            f -> System.out.println("Failed with " + f),
            () -> System.out.println("Completed")
);

And, if you are a bit insane, you can even implement a Subscriber directly:

Multi.createFrom().range(0, 10)
    .onSubscribe().invoke(sub -> System.out.println("Received subscription: " + sub))
    .onRequest().invoke(req -> System.out.println("Got a request: " + req))
    .onItem().transform(i -> i * 100)
    .subscribe().withSubscriber(new Subscriber<Integer>() {

        private Subscription subscription;

        @Override
        public void onSubscribe(Subscription s) {
            this.subscription = s;
            s.request(1);
        }

        @Override
        public void onNext(Integer item) {
            System.out.println("Got item " + item);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable t) {
            // ...
        }

        @Override
        public void onComplete() {
            // ...
        }
    }
);

まとめ

This post described the different approaches offered by Mutiny to handle back-pressure. The Reactive Streams protocol works well when you can control the pace of the upstream. But it’s not always the case. Streams representing physical entities are rarely controllable. Imagine a stream emitting user’s keystrokes. You cannot ask the users to slow down. That would be a terrible user experience. As we have seen above, time is also not something we can slow down, unfortunately…​ In this case, the onOverflow() group lets you decide the mitigation, such as using buffers or dropping items.

It’s critical to avoid overwhelming downstream subscribers. It is the small crack that ripples in your system with dreadful consequences.