You may have 99 problems, but publish confirmation and message acknowledgment need not be among them!
Messaging applications that use event brokers are distributed in nature and need a mechanism for delivery and processing confirmations. Yes, it is not just the consumer that needs to confirm the processing; the publisher also requires confirmation of successful publishing operation – it is a shared responsibility between the application and the broker to provide a consistent confirmation logic, the absence of which will result in duplication, out-of-band processing, and other logical issues that are hard to diagnose and correct.
In this blog post, I will explain how the Spring Cloud Stream Binder for Solace PubSub+ supports message publish confirmation and message acknowledgment.
Confirming the Successful Publication of Messages
While in most cases messages are published successfully, occasionally a message may not reach the messaging system or its intended destinations. As such, there needs to be a mechanism that confirms the success of a publish operation to avoid a “war of systems” disputing what went wrong. This mechanism is called a publish confirmation.
There are several reasons a Solace event broker might reject and discard a message. For example, if the client username of the sending application does not have the proper ACL permissions to send the message to the defined topic. In such cases, the Solace broker sends a message rejection to the publisher, a.k.a. negative acknowledgment or “nack”, indicating that the message could not be persisted and was discarded. In situations like this, the publishing application may want to know about the error and act. When using the Spring Cloud Stream framework with the Solace binder there are two options available to the developer: one handles it in an asynchronous manner and the other synchronous.
To handle producer errors asynchronously, the framework provides the errorChannelEnabled producer config option. When errorChannelEnabled is set to true, failures are sent to a designated error handler. To accomplish this, you would have to make the following changes.
- Set
cloud.stream.bindings.<binding-name-out-0>.producer.error-channel-enabled
to “true” in the Spring Cloud Stream configuration.spring: cloud: function: definition: supplier stream: bindings: supplier-out-0: destination: 'community/consume/topic' producer: error-channel-enabled: true
- Add an error handler function with
ServiceActivator
annotation for the channel.@ServiceActivator(inputChannel="community/consume/topic.errors") public void handlePublishError(ErrorMessage message) { System.out.println("Message Publish Failed"); ... }
IMPORTANT: In the current version of the Spring Cloud Stream binder for Solace PubSub+, this annotation-based error handler specification approach is valid and fully supported. However, this will be deprecated in future releases and adopt a configuration property at the binding level to specify the error handler.
The second option is to handle publisher errors synchronously. This is accomplished using the Message Correlation feature. The publisher sends a message with a correlation data set on the header. The broker, upon receipt and successful persistence of the message, responds with a publish confirmation on the set correlation data.
How do we go about this?
You can create a new CorrelationData
instance for each message and set it as the value of your message’s SolaceBinderHeaders.CONFIRM_CORRELATION
header. CorrelationData
can be extended to add more details to help in the correlation.
Here is the flow of interaction between the publishing application and the Solace PubSub+ Broker with Spring Cloud Stream binder for Solace PubSub+ mediating the exchange.
Here is a sample code demonstrating publish confirmation in action:
@Autowired private StreamBridge streamBridge; public void send(String payload, long timeout, TimeUnit unit) { CorrelationData correlationData = new CorrelationData(); Message<SensorReading> message = MessageBuilder .withPayload(payload) .setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, correlationData) .build(); streamBridge.send("publish/topic/fail", message); try { correlationData.getFuture().get(30, TimeUnit.SECONDS); // Execute business logic } catch (InterruptedException | ExecutionException | TimeoutException e) { // Handle failure and corrective action as desired } }
Note that the SolaceBinderHeaders.CONFIRM_CORRELATION
header is not reflected in the actual message published to the broker.
The synchronous wait on the Java Future may fail with any of the following exceptions:
InterruptedException
indicates that the waiting thread got interruptedTimeoutException
, when there is a response within the specified timeout periodExecutionException
indicates that the task was aborted due to application error or other reasons. It is a wrapped exception holding the actual exception.
For example, when publish permission is denied on the topic publish/topic/fail – the getFuture()
call fails with an ExecutionException
, which is a wrapper around Solace JCSMP Exception showing the actual ACL Denied error. Here is an example:
java.util.concurrent.ExecutionException: org.springframework.messaging.MessagingException: Producer received error for message 530 (Spring message 1e46821a-d7d1-062f-d36a-c710b38f4ee7) at 1659526110898; nested exception is ((Client name: 115.110.68.169.static-Banglore.v/33435/001d0001/OTN9wMhZyW Local addr: 127.0.0.1 Local port: 59577 Remote addr: localhost Remote port: 55554) - ) com.solacesystems.jcsmp.JCSMPErrorResponseException: 403: Publish ACL Denied - Topic 'publish/topic/fail' [Subcode:28], failedMessage=GenericMessage [payload=byte[10], headers={solace_scst_confirmCorrelation=com.solace.spring.cloud.stream.binder.util.CorrelationData@76915dfc, id=1e46821a-d7d1-062f-d36a-c710b38f4ee7, contentType=application/json, target-protocol=streamBridge, timestamp=1659526110890}]
Acknowledging the Delivery/Receipt of Messages
Many event brokers, such as Solace PubSub+, offer a guaranteed quality of service. To guarantee delivery, brokers need the help of consuming applications to acknowledge that they have received and successfully processed the message. This is done via consumer acknowledgments in your application code, commonly called “acks”.
This consumer acknowledgment concept also exists within the Spring Cloud Stream framework, but it may not be obvious at first glance. This is because the default behavior is to automatically acknowledge the received messages, aka auto-acknowledgment, when the function exits successfully. However, a consumer can override the default auto-acknowledgment and engage manual-acknowledgment to handle message acknowledgment programmatically.
Let us look at these two modes closely.
Automatic Acknowledgement
The default functionality of the Spring Cloud Stream binder for Solace PubSub+ is to automatically acknowledge messages received on the Message handler (function).
The following diagram illustrates the flow when using imperative functions where functions are triggered on each individual event. For the reactive functions, refer to the Spring Cloud Stream documentation and, more specifically, for implementing consumer here.
In automatic acknowledgment mode, a message is successfully delivered after returning from the message handler function. Of course, if an exception is thrown, the message will not be acknowledged.
Manual Acknowledgement
Manual acknowledgement, also known as client acknowledgement, can be used with Spring Cloud Stream when using Imperative functions. In manual acknowledgment mode, every message is processed and acknowledged individually in the message handler function. The decision of acknowledging, rejecting, or requesting redelivery is left to the consumer based on message content and other logical conditions.
The Spring Cloud Stream binder supports three distinct acknowledgment actions for Solace PubSub+: ACCEPT, REJECT, and REQUEUE.
Acknowledgment Status: ACCEPT
Here is what happens when the consumer acknowledges the message with ACCEPT status.
- Consumer acknowledges the Message.
- The message is removed from the queue.
Acknowledgment Status: REJECT
Here is what happens when the consumer acknowledges the message with REJECT status.
- Consumer rejects the message.
- The message will be removed from the queue, but:
- If bound to an Error Queue, the message is REPUBLISHED to Error Queue
- For consumer in defined group or an anonymous group, the message is DISCARDED
Acknowledgment Status: REQUEUE
Here is what happens when the consumer acknowledges the message with REQUEUE status.
- Consumer rejects and request for REQUEUE
- The message is untouched and left in the queue, but will be REDELIVERED to the next available consumer
Here is a sample code demonstrating manual acknowledgment in action:
@Bean public Function<Message<String>, String> myFunction() { return v -> { // Disable Auto-Ack AcknowledgmentCallback ackCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(v); ackCallback.noAutoAck(); // The acknowledgement action is triggered based on the value // set on for solace_correlationId field in the message header String cid = (String) v.getHeaders().get("solace_correlationId"); if (cid == null) cid = "none"; // Acknowledge action! try { if (cid.equals("accept")) { System.out.println("Accepting the Message"); AckUtils.accept(ackCallback); } else if (cid.equals("requeue")) { System.out.println("Requeuing the Message"); AckUtils.requeue(ackCallback); } else { System.out.println("Rejecting the Message"); AckUtils.reject(ackCallback); } } catch (SolaceAcknowledgmentException | InterruptedException e) { System.out.println("Warning, exception occurred but message will be re-queued on broker and re-delivered\n" + e); return null; //Don't send an output message } return "My Payload"; }; }
The happy path, where you “accept” the message, is straightforward, but let us examine two critical features that come into play in the case of a REJECT or REQUEUE acknowledgment action.
Message Redelivery
What happens when a message is acknowledged with REQUEUE status? It gets placed back on the front of the queue on the broker and could be delivered again.
When it comes to message redelivery, the broker may or may not redeliver the message based on queue configuration. The two items to consider are the message already maxed out it’s max redeliveries, configurable via queueMaxMsgRedelivery
? And, if the queue respects time to live (TTL), has the message expired? If the max redeliveries have not been reached and the message has not expired, then the broker would redeliver the message to the next available consumer. Note that if you have multiple consumers, the message may or may not be redelivered to the same consumer.
Refer to the diagram message flow diagram for REQUEUE acknowledgment status when there is a single consumer.
When the number of redelivery attempts on the queue is exhausted, or if the message expires, if a dead message queue (DMQ) is configured for the Solace queue, the message will be delivered to the DMQ.
Error Queue Republishing
How do we capture the messages successfully consumed from the message broker but rejected by the application? The following conditions qualify for a message to be republished to an error queue:
- A message was rejected
- An error occurred in the application
Spring Cloud Stream binder for Solace PubSub+ Broker supports configuring an error queue to receive error messages at a binding level by setting spring.cloud.stream.solace.bindings.<binding-name-in-0>.consumer.autoBindErrorQueue
to “true” in the application configuration file. An error queue is durable in nature.
For more information on Spring Cloud Stream error channel support, refer to their documentation.
An error queue is different from a DMQ, which captures failed messages due to expiry or exceeding a message’s max redelivery count. In contrast, the error queue capture messages successfully consumed from the message broker but rejected by the application.
NOTE: Since the names of anonymous consumer groups are randomly generated at runtime, they would be applied to error queues as well. You can use a well-known queue by specifying the error queue name in the configuration file wherever applicable. It is possible to have multiple anonymous groups configured to send errors to a single error queue.
You can check out the manual message acknowledgment sample in our samples Git repository:
Manual Message Acknowledgementby SolaceSamplesTo demonstrate all the acknowledgement options in action, we will work with correlationId which shall carry the intent of acknowledgement action. In our code, we will appropriately ACCEPT, REJECT or REQUEUE based on the set correlationId and observe the outcome.
Summary
As a developer, publish confirmations and consumer message acknowledgments empower you to take control over messaging error scenarios in a programmatic manner when using Spring Cloud Stream binder for Solace PubSub+.
If you’re new to Spring Cloud Stream and want more help getting up and running, these are great places to check out for more tutorials.
- Spring Cloud Stream binder for Solace PubSub+
- Getting Started with Spring Cloud Stream using Spring Initializr
- Getting Started with Spring Cloud Stream using start.spring.io and a PubSub+ Event Broker
- Samples for Spring Cloud Stream binder for Solace PubSub+
- Tutorials & Codelabs for Spring Cloud Stream binder for Solace PubSub+
And last but not least Solace Community, where developers and experts exchange ideas, share thoughts, and discussing topics related to event-driven architecture.
Explore other posts from category: For Developers