In this post, I’d like to introduce an open source project I developed to delay redelivery attempts when messages aren’t successfully delivered. I’ll introduce its internal design and then dive into how I took advantage of an interesting Java construct and Java Streams to build out the functionality I was looking for more quickly and efficiently.
About Message Queues
Message queues are a fundamental and important construct of Solace PubSub+ Event Broker. They keep track of which events are consumed by microservices and can detect redeliveries to a consumer. In addition, they can be used to implement a store-and-forward architecture for RESTful endpoints via REST delivery points.
Here are the four queue delivery patterns you can choose from:
Message Processing Failures
When a message fails to be processed by an RDP, or a transaction that involves consuming a message is rolled back, PubSub+ immediately attempts to redeliver it. You can configure the number of times it will try to deliver the messages before it is either discarded or sent to a dead message queue .
But what if the endpoint was suffering from load or just slow to respond? By immediately retrying, you may be exacerbating the problem. You could always implement a back-off algorithm in your queue consumer code – but that would hold up messages in your queue for further processing, and complicate your microservice code base. Redelivery services aim to solve this problem by abstracting that complexity into a separate code base.
Application Design
The stack I chose for this application was:
- Java 11
- Spring Boot
- Solace PubSub+ New Java API
When designing the application, here the logical steps it would need to take:
- Consume a message from a Dead Message Queue
- Calculate a delay factor with the following formula:
calculatedDelay = configuredDelay + pow(backOffFactor,number of redeliveries)
- If the calculated delay is greater than the max allowable delay, discard the message or send to an error queue if required
- Otherwise, send it back to the original queue after the calculatedDelay
Satisfying point #4 presented three interesting challenges to overcome:
- A construct that allows for delayed processing of tasks
- Ensuring that this construct does not block tasks that are ready for processing by tasks that are not ready for processing
- Implement parallel processing of tasks to increase throughput
There are many ways I could have implemented the above construct – some form of a Queue, an iterator, and a ThreadPool that would be used to consume from the queue – but it’s always good to understand what Java gives you before building something yourself…
DelayQueue
A DelayQueue in Java is an extension of a BlockingQueue that adds the ability to set an expiry on an item in the queue when you submit it. When you attempt to take an item from the DelayQueue, only items that have expired will be eligible.
This fit the use case I was looking for perfectly. I would add an item to the DelayQueue and an iterator would constantly iterate over the queue and simply attempt to pull a message off of it. The DelayQueue would take care of only returning messages that exceeded the set Delay Time.
Java Streams
Java Streams is an often overlooked part of the Java programming language that simplifies the rather mundane task of iterating over a collection, processing them in parallel and emitting the result achievable with a few lines of code, like this:
Stream.generate(() -> { ... return delayQueue.take(); ... }).parallel().forEach(d -> { ... process(d) ... }); });
Combining Java Streams with the DelayQueue allowed me to implement the required architecture:
Conclusion
I enjoyed building this application by breaking it down into a series of steps that could be independently solved. Furthermore, it always helps to figure out what a language/framework gives you before attempting to implement it yourself.
An important point to note here is that FIFO (First-in-First-Out) ordering is not preserved on both the SOURCE_QUEUE and the processing of the DMQ with this architecture. Since the reason for implementing this microservice was to “unblock” the SOURCE_QUEUE and to also introduce an exponential delay to the events that fail processing, losing FIFO is an expected outcome.
I hope you found this post informative. If you want to see the associated code base, it’s at https://github.com/solacecommunity/solace-redelivery-delayer
Explore other posts from category: For Developers