The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

How to implement a job queue with Redis

In how to cache with Redis, we implemented a simple cache backed by Redis.
That’s just one use case of Redis. Redis is also used as a messaging server to implement the processing of background jobs or other kinds of messaging tasks. This post explores implementing this pattern with Quarkus and the new Redis data source API.

Job Queues and Supes!

A job queue is a data structure storing execution requests. Job dispatchers submit the tasks they want to execute in that data structure. On the other side, job consumers poll the requests and execute them.

pattern

There are plenty of variants of that pattern, so let’s focus on the following application. We have an application managing heroes and villains. The application offers the possibility to simulate a fight between a random hero and a random villain. The fight simulation is delegated to fight simulators, applications dedicated to that task.

application

In this context, the main application submits the fight request to the job queue. Then, the fight simulators poll the submitted fight request and execute them.

The fight outcomes are communicated using another Redis feature: pub/sub communication. The simulators send the outcome to a channel consumed by the application. The application then broadcasts these outcomes to a web page.

This post only discusses the interaction with Redis. The rest of the application is straightforward and just uses RESTEasy Reactive and Hibernate ORM with Panache. You can find the full code of the application on https://github.com/cescoffier/quarkus-redis-job-queue-demo.

Submitting jobs

The first task is to model the job queue. We are using a Redis list to store the FightRequest.

package me.escoffier.quarkus.redis.fight;

public record FightRequest(String id, Hero hero, Villain villain) {

}

Redis lists distinguish the left side of the list from the right side of the list. This distinction allows implementing a FIFO queue where we write on the left side and consume from the right side.

To manipulate a Redis list, we need the group of commands associated with this data structure. In the SupesService class, we inject the RedisDataSource and retrieve the group of commands:

public SupesService(RedisDataSource dataSource, ...) {
    commands = dataSource.list(FightRequest.class);
  // ...
}

Let’s now look at the submitAFight method:

public FightRequest submitAFight() {
    var hero = Hero.getRandomHero();
    var villain = Villain.getRandomVillain();
    var id = UUID.randomUUID().toString();
    var request = new FightRequest(id, hero, villain);
    commands.lpush("fight-requests", request);
    return request;
}

The submitAFight method retrieves the random fighters, computes an id, builds the FightRequest instance, and executes the LPUSH command. The LPUSH command writes the given item to the left side of the list stored at the given key (fight-requests).

Receiving the job requests

Let’s now look at the other side: the fight simulators. The simulators poll the FightRequests from the Redis list representing our job queue and simulate the fight.

The simulator is implemented in me.escoffier.quarkus.redis.fight.FightSimulator. The constructor receives a configured name (to distinguish multiple simulators) and the Redis data source. It creates the objects to emit the Redis commands to read from a Redis list:

public FightSimulator(@ConfigProperty(name = "simulator-name") String name, RedisDataSource ds) {
    this.name = name;
    this.queue = ds.list(FightRequest.class);
    // ...
}

The simulator polls the fight requests and for each of them simulate the fight. The implementation is an infinite loop (it only stops when the application is shut down). In each iteration, it reads the pending FightRequest from the right side of the queue with the BRPOP command. If there is no pending request, it restarts from the beginning of the loop. If it has a request, it simulates the fight:

@Override
public void run() {
    logger.infof("Simulator %s starting", name);
    while ((!stopped)) {
        KeyValue<String, FightRequest> item =
            queue.brpop(Duration.ofSeconds(1), "fight-requests");
        if (item != null) {
            var request = item.value();
            var result = simulate(request);
            //...
        }
    }
}

The BRPOP command retrieves and removes the last (right) element of the list. Unlike the RPOP, it waits for a given amount of time (1 second in the code above) if there are no elements in the list. So, if the list contains an element, it gets it. Otherwise, it waits up to one second before giving up. It returns null in this case. The BRPOP command returns a KeyValue composed of the key of the list and the FightRequest. It uses that structure because you can pass multiple keys, which is convenient when you have lists with priorities.

The BRPOP command also avoids spinning indefinitely if the list is empty, as it waits for 1 second during each iteration. Finally, the BRPOP command is atomic. It means that if you have multiple simulators, they cannot retrieve the same item. It dispatches each item once.

Sending the fight outcome

The pool loop retrieves the FightRequests from the queue and simulates the fights, but how to communicate the results? For this, we use another Redis feature: pub/sub communication.

In simple words, we are going to send the FightResult to a channel. Applications subscribing to that channel will receive the emitted FightResult.

A FightResult contains the request id, the two fighters, and the name of the winner:

package me.escoffier.quarkus.redis.fight;

public record FightResult(String id, Hero hero, Villain villain, String winner) {

}

To use Redis pub/sub commands, we need the object associated with this group. In the FightSimulator, we also uses the pubsub method to get that object:

public FightSimulator(@ConfigProperty(name = "simulator-name") String name, Logger logger, RedisDataSource ds) {
    this.name = name;
    this.logger = logger;
    this.queue = ds.list(FightRequest.class);
    this.publisher = ds.pubsub(FightResult.class);  // <--- this is it!
}

Now, we can use this publisher to send the FightResults. After each fight, we call publisher.publish to send the FightResult instance to the fight-results channel:

@Override
public void run() {
    logger.infof("Simulator %s starting", name);
    while ((!stopped)) {
        KeyValue<String, FightRequest> item = queue.brpop(Duration.ofSeconds(1), "fight-requests");
        if (item != null) {
            var request = item.value();
            var result = simulate(request);
            publisher.publish("fight-results", result);  // Send the outcome
           }
    }
}

Receiving the fight outcome

At that point:

  • we submit the fight request into the job queue,

  • we consume that queue and simulate the fight,

  • we send the outcome to the fight-results channel.

So, the only missing piece is the consumption of that channel. Let’s return to the me.escoffier.quarkus.redis.supes.SupesService class. In the constructor, we also inject the ReactiveRedisDataSource, the reactive variant of the Redis data source. Then, in the constructor code, we subscribe to the fight-results.

public SupesService(RedisDataSource dataSource, ReactiveRedisDataSource reactiveRedisDataSource) {
    commands = dataSource.list(FightRequest.class);
    stream = reactiveRedisDataSource.pubsub(FightResult.class).subscribe("fight-results")
            .broadcast().toAllSubscribers();
}

Because we use the reactive data source, this subscription returns a Multi<FightResult>, ready to be served by Quarkus and an SSE (see SupesResource.java):

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<FightResult> fights() {
    return supes.getFightResults();
}
.broadcast().toAllSubscribers() instructs Quarkus to broadcast all the received FightResult to all the connected SSE. So, the browser filters out unrequested results.

Running the system

The circle is complete! The full code source is available from https://github.com/cescoffier/quarkus-redis-job-queue-demo. To run the system, open three terminals.

First, we start the supes-application. In the first terminal, navigate to the supes-application and run mvn quarkus:dev Quarkus automatically starts the PostgreSQL and Redis instances (if your machine can run containers). In the console, hit h and then c. It displays the running dev services. Look for the redis one, and copy the quarkus.redis.hosts injected configuration:

redis-client - Up About a minute
  Container:        348edec50f80/trusting_jennings  docker.io/redis:7-alpine
  Network:          bridge - 0.0.0.0:53853->6379/tcp
  Exec command:     docker exec -it 348edec50f80 /bin/bash
  Injected Config:  quarkus.redis.hosts=redis://localhost:53853

In the previous snippet, copy: quarkus.redis.hosts=redis://localhost:53853. This is the address of the redis server. We need to configure to the simulators with that address.

If you go to http://localhost:8080, the web page is served. You can hit the fights! button a few times.

screenshot

The fight won’t happen as we have no simulator. However, the fight requests have been submitted and stored in the list. So they are not lost.

Now, in the second terminal, navigate to the fight-simulator directory, and run:

mvn package
java -Dsimulator-name=A -Dquarkus.redis.hosts=redis://localhost:53853 -jar target/quarkus-app/quarkus-run.jar

IMPORTANT: update the quarkus.redis-hosts with the one copied above.

As soon as you start it, it processes the pending fight requests:

2022-09-11 15:31:58,914 INFO  [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Pakku and Tulon Voidgazer
2022-09-11 15:31:59,786 INFO  [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Comet Zuko and Arishem The Judge (Knullified)
2022-09-11 15:32:01,809 INFO  [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Ms. America and Kazumi (Devil Form)

If you go back to the web page, the winners get a halo:

screenshot winner

Now, in the third terminal, navigate to the fight-simulator directory, and run:

java -Dsimulator-name=B -Dquarkus.redis.hosts=redis://localhost:53853 -jar target/quarkus-app/quarkus-run.jar

IMPORTANT: as in the previous command, update the quarkus.redis-hosts with the one copied above.

Go back to the web page and click on the fight! button a few times. Check the logs of both simulators to see that the fight requests are now dispatched beween the two simulators.

まとめ

This posts explains how you can implement a job queue with Redis and the Quarkus Redis datasource API.

Learn more about the Redis data source API from the Quarkus documentation. We will publish more content about Redis patterns, so stay tuned!