The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Concurrent asynchronous actions with Mutiny

This week, I’ve been asked about a widespread use case around concurrency. This user wanted to call two microservices in parallel, and when both results are received, join them and continue the processing. Basically, the following pattern:

pattern

In a non-reactive approach, both calls would block the caller thread, and, except if you use a worker thread pool, the calls are not concurrent. Even if you use a worker thread pool, these threads are likely blocked, consuming resources for nothing.

But no worries, Quarkus reactive nature and Mutiny have everything to handle this scenario.

Let’s call two services

In this post, I’m going to use the Vert.x Web Client, a reactive HTTP client. It leverages non-blocking I/O to be highly performant and truly non-blocking. It does not rely on a hidden thread pool. You can also use the Quarkus Rest Client, but, at the moment, it still uses worker threads.

No matter which client we use, we need some remote services to call. Let’s use:

First thing first, let’s see the code required to retrieve our quotes: While both services are similar, the structure of the response differs a little bit. So we end up with:

private static Uni<String> getProgrammingQuote(WebClient client) {
    return client.getAbs(PROGRAMMING_QUOTE)
            .as(BodyCodec.jsonObject())
            .send()
            .onItem().transform(r -> r.body().getString("en") + " (" + r.body().getString("author") + ")");
}

private static Uni<String> getChuckNorrisQuote(WebClient client) {
    return client.getAbs(CHUCK_NORRIS_QUOTE)
            .as(BodyCodec.jsonObject())
            .send()
            .onItem().transform(r -> r.body().getString("value"));
}

These two methods receive a WebClient, invoke the services, retrieve the JSON responses, and extract them. They both return a Uni. So they are asynchronous. The result (the quote) is provided later when available. Also, returning a Uni means that the services will only be invoked when someone subscribes to the returned Uni. If you subscribe multiple times, you will call the service multiple times.

Combining Unis

So far, we have two methods to call our services. But we want to call them concurrently, as depicted above.

Mutiny provides a way to combine items produced by Unis:

Uni<Tuple2<String, String>> tuple = Uni.combine().all()
    .unis(getProgrammingQuote(client), getChuckNorrisQuote(client))
    .asTuple();

When someone subscribes to the Uni tuple, it subscribes to the getProgrammingQuote(client) and getChuckNorrisQuote(client) Unis, which invoke the services. So the requests are emitted, and the services are invoked concurrently.

When both responses are available, it combines them into a Tuple, a simple structure carrying multiple items.

In other words, concurrently calling our services is pretty straightforward. Just create the Unis representing the services or the asynchronous action you want to achieve and combine them using Uni.combine().all() You can decide to combine the results using tuples or to use a combinator function.

Putting everything together

// Create a Web Client
WebClient client = WebClient.create(vertx);

// Combine the result of our 2 Unis in a tuple
Uni.combine().all()
        .unis(getProgrammingQuote(client), getChuckNorrisQuote(client))
        .asTuple()

        // Subscribe (which will trigger the calls)
        .subscribe().with(tuple -> {
    System.out.println("Programming Quote: " + tuple.getItem1());
    System.out.println("Chuck Norris Quote: " + tuple.getItem2());
});

That’s it! If you want to see this code in action, check this gist. You even can run it directly with JBang:

jbang https://gist.github.com/cescoffier/1ed68bef12b798529e10350f77686e9a

楽しんで!