How to implement a job queue with Redis
In how to cache with Redis, we implemented a simple cache backed by Redis.
That’s just one use case of Redis.
Redis is also used as a messaging server to implement the processing of background jobs or other kinds of messaging tasks.
This post explores implementing this pattern with Quarkus and the new Redis data source API.
Job Queues and Supes!
A job queue is a data structure storing execution requests. Job dispatchers submit the tasks they want to execute in that data structure. On the other side, job consumers poll the requests and execute them.
There are plenty of variants of that pattern, so let’s focus on the following application. We have an application managing heroes and villains. The application offers the possibility to simulate a fight between a random hero and a random villain. The fight simulation is delegated to fight simulators, applications dedicated to that task.
In this context, the main application submits the fight request to the job queue. Then, the fight simulators poll the submitted fight request and execute them.
The fight outcomes are communicated using another Redis feature: pub/sub communication. The simulators send the outcome to a channel consumed by the application. The application then broadcasts these outcomes to a web page.
This post only discusses the interaction with Redis. The rest of the application is straightforward and just uses RESTEasy Reactive and Hibernate ORM with Panache. You can find the full code of the application on https://github.com/cescoffier/quarkus-redis-job-queue-demo.
Submitting jobs
The first task is to model the job queue. We are using a Redis list to store the FightRequest.
package me.escoffier.quarkus.redis.fight;
public record FightRequest(String id, Hero hero, Villain villain) {
}
Redis lists distinguish the left side of the list from the right side of the list. This distinction allows implementing a FIFO queue where we write on the left side and consume from the right side.
To manipulate a Redis list, we need the group of commands associated with this data structure.
In the SupesService class, we inject the RedisDataSource
and retrieve the group of commands:
public SupesService(RedisDataSource dataSource, ...) {
commands = dataSource.list(FightRequest.class);
// ...
}
Let’s now look at the submitAFight
method:
public FightRequest submitAFight() {
var hero = Hero.getRandomHero();
var villain = Villain.getRandomVillain();
var id = UUID.randomUUID().toString();
var request = new FightRequest(id, hero, villain);
commands.lpush("fight-requests", request);
return request;
}
The submitAFight
method retrieves the random fighters, computes an id, builds the FightRequest
instance, and executes the LPUSH
command.
The LPUSH
command writes the given item to the left side of the list stored at the given key (fight-requests
).
Receiving the job requests
Let’s now look at the other side: the fight simulators.
The simulators poll the FightRequests
from the Redis list representing our job queue and simulate the fight.
The simulator is implemented in me.escoffier.quarkus.redis.fight.FightSimulator
.
The constructor receives a configured name (to distinguish multiple simulators) and the Redis data source.
It creates the objects to emit the Redis commands to read from a Redis list:
public FightSimulator(@ConfigProperty(name = "simulator-name") String name, RedisDataSource ds) {
this.name = name;
this.queue = ds.list(FightRequest.class);
// ...
}
The simulator polls the fight requests and for each of them simulate the fight.
The implementation is an infinite loop (it only stops when the application is shut down).
In each iteration, it reads the pending FightRequest
from the right side of the queue with the BRPOP
command.
If there is no pending request, it restarts from the beginning of the loop.
If it has a request, it simulates the fight:
@Override
public void run() {
logger.infof("Simulator %s starting", name);
while ((!stopped)) {
KeyValue<String, FightRequest> item =
queue.brpop(Duration.ofSeconds(1), "fight-requests");
if (item != null) {
var request = item.value();
var result = simulate(request);
//...
}
}
}
The BRPOP
command retrieves and removes the last (right) element of the list.
Unlike the RPOP
, it waits for a given amount of time (1 second in the code above) if there are no elements in the list.
So, if the list contains an element, it gets it.
Otherwise, it waits up to one second before giving up.
It returns null
in this case.
The BRPOP
command returns a KeyValue
composed of the key of the list and the FightRequest
.
It uses that structure because you can pass multiple keys, which is convenient when you have lists with priorities.
The BRPOP
command also avoids spinning indefinitely if the list is empty, as it waits for 1 second during each iteration.
Finally, the BRPOP
command is atomic.
It means that if you have multiple simulators, they cannot retrieve the same item.
It dispatches each item once.
Sending the fight outcome
The pool loop retrieves the FightRequests
from the queue and simulates the fights, but how to communicate the results?
For this, we use another Redis feature: pub/sub communication.
In simple words, we are going to send the FightResult
to a channel.
Applications subscribing to that channel will receive the emitted FightResult
.
A FightResult
contains the request id, the two fighters, and the name of the winner:
package me.escoffier.quarkus.redis.fight;
public record FightResult(String id, Hero hero, Villain villain, String winner) {
}
To use Redis pub/sub commands, we need the object associated with this group.
In the FightSimulator
, we also uses the pubsub
method to get that object:
public FightSimulator(@ConfigProperty(name = "simulator-name") String name, Logger logger, RedisDataSource ds) {
this.name = name;
this.logger = logger;
this.queue = ds.list(FightRequest.class);
this.publisher = ds.pubsub(FightResult.class); // <--- this is it!
}
Now, we can use this publisher
to send the FightResults
.
After each fight, we call publisher.publish
to send the FightResult
instance to the fight-results
channel:
@Override
public void run() {
logger.infof("Simulator %s starting", name);
while ((!stopped)) {
KeyValue<String, FightRequest> item = queue.brpop(Duration.ofSeconds(1), "fight-requests");
if (item != null) {
var request = item.value();
var result = simulate(request);
publisher.publish("fight-results", result); // Send the outcome
}
}
}
Receiving the fight outcome
At that point:
-
we submit the fight request into the job queue,
-
we consume that queue and simulate the fight,
-
we send the outcome to the
fight-results
channel.
So, the only missing piece is the consumption of that channel.
Let’s return to the me.escoffier.quarkus.redis.supes.SupesService
class.
In the constructor, we also inject the ReactiveRedisDataSource
, the reactive variant of the Redis data source.
Then, in the constructor code, we subscribe to the fight-results
.
public SupesService(RedisDataSource dataSource, ReactiveRedisDataSource reactiveRedisDataSource) {
commands = dataSource.list(FightRequest.class);
stream = reactiveRedisDataSource.pubsub(FightResult.class).subscribe("fight-results")
.broadcast().toAllSubscribers();
}
Because we use the reactive data source, this subscription returns a Multi<FightResult>
, ready to be served by Quarkus and an SSE (see SupesResource.java):
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<FightResult> fights() {
return supes.getFightResults();
}
.broadcast().toAllSubscribers() instructs Quarkus to broadcast all the received FightResult to all the connected SSE.
So, the browser filters out unrequested results.
|
Running the system
The circle is complete! The full code source is available from https://github.com/cescoffier/quarkus-redis-job-queue-demo. To run the system, open three terminals.
First, we start the supes-application
.
In the first terminal, navigate to the supes-application
and run mvn quarkus:dev
Quarkus automatically starts the PostgreSQL and Redis instances (if your machine can run containers).
In the console, hit h
and then c
.
It displays the running dev services.
Look for the redis one, and copy the quarkus.redis.hosts
injected configuration:
redis-client - Up About a minute
Container: 348edec50f80/trusting_jennings docker.io/redis:7-alpine
Network: bridge - 0.0.0.0:53853->6379/tcp
Exec command: docker exec -it 348edec50f80 /bin/bash
Injected Config: quarkus.redis.hosts=redis://localhost:53853
In the previous snippet, copy: quarkus.redis.hosts=redis://localhost:53853
.
This is the address of the redis server.
We need to configure to the simulators with that address.
If you go to http://localhost:8080, the web page is served.
You can hit the fights!
button a few times.
The fight won’t happen as we have no simulator. However, the fight requests have been submitted and stored in the list. So they are not lost.
Now, in the second terminal, navigate to the fight-simulator
directory, and run:
mvn package
java -Dsimulator-name=A -Dquarkus.redis.hosts=redis://localhost:53853 -jar target/quarkus-app/quarkus-run.jar
IMPORTANT: update the quarkus.redis-hosts
with the one copied above.
As soon as you start it, it processes the pending fight requests:
2022-09-11 15:31:58,914 INFO [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Pakku and Tulon Voidgazer
2022-09-11 15:31:59,786 INFO [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Comet Zuko and Arishem The Judge (Knullified)
2022-09-11 15:32:01,809 INFO [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Ms. America and Kazumi (Devil Form)
If you go back to the web page, the winners get a halo:
Now, in the third terminal, navigate to the fight-simulator
directory, and run:
java -Dsimulator-name=B -Dquarkus.redis.hosts=redis://localhost:53853 -jar target/quarkus-app/quarkus-run.jar
IMPORTANT: as in the previous command, update the quarkus.redis-hosts
with the one copied above.
Go back to the web page and click on the fight!
button a few times.
Check the logs of both simulators to see that the fight requests are now dispatched beween the two simulators.
まとめ
This posts explains how you can implement a job queue with Redis and the Quarkus Redis datasource API.
Learn more about the Redis data source API from the Quarkus documentation. We will publish more content about Redis patterns, so stay tuned!