Batch processing in messaging usually means that the code is designed or optimized to process messages in batches instead of one at a time. Usually, the underlying messaging system and the interfaces need to support producing and consuming batches of messages. This makes the system less chatty because data is coming in chunks.
What does a “batch” mean in messaging, especially publishing and consuming? It is the ability to accumulate multiple messages and publish them at the same time instead of having to call a separate publish method for each message. In the case of message consumption, the messaging provider can accumulate multiple messages and deliver them to your application all at once for processing.
In this blog post, I will explain how Spring Cloud Stream binder for Solace PubSub+ supports batch publishing and batch consumption.
Batch Publishing
Sometimes when following the Supplier or the Function pattern of Spring Cloud Stream, you may need to send more than one output message for each one you process. You can use StreamBridge to send messages whenever you’d like, but there is also another option. That option is to return a Collection<Message<?>>
object in your Function. When doing this, Spring Cloud Stream will send each member of the collection as its own message.
Batch Publish to Default Binding Destination
Let’s check it out! Review the following code snippet – the batchPublish
function takes in a String and returns a Collection<Message<String>>
.
@Bean public Function<String, Collection<Message<String>>> batchPublish() { return v -> { logger.info("Received: " + v); ArrayList<Message<String>> msgList = new ArrayList<Message<String>>(); msgList.add(MessageBuilder.withPayload("Payload 1").build()); msgList.add(MessageBuilder.withPayload("Payload 2").build()); msgList.add(MessageBuilder.withPayload("Payload 3").build()); return msgList; }; }
This code will result in 3 messages being sent to the output binding destination each time a message is received on the input binding.
That’s cool! But what’s even cooler? You can combine this batch publishing functionality with dynamic publishing, where individual messages can be set with a dynamic destination.
Batch Publish to Dynamic Destinations
The following code will produce 3 messages, each being sent to a distinct destination whenever a message is received on the input binding.
@Bean public Function<String, Collection<Message<String>>> batchPublishDynamicDestination(StreamBridge sb) { return v -> { logger.info("Received: " + v); // Do some more processing and create a list of messages to send upon returning ArrayList<Message<String>> msgList = new ArrayList<Message<String>>(); // Send to default topic msgList.add(MessageBuilder.withPayload("Payload 1").build()); // Send to dynamic topics using BinderHeaders.TARGET_DESTINATION msgList.add(MessageBuilder.withPayload("Payload 2").setHeader(BinderHeaders.TARGET_DESTINATION, "some/other/topic/2").build()); msgList.add(MessageBuilder.withPayload("Payload 3").setHeader(BinderHeaders.TARGET_DESTINATION, "some/other/topic/3").build()); return msgList; }; }
You can use the batch publishing feature to implement fanning out messages to multiple topics. This is useful when there are different types of workloads to be triggered from a single producer.
You can check out the batch publisher sample in our samples Git repository.
Batch Publishersby SolaceSamplesBatch Publishers publish messages in batches. A batch of messages is just a single Spring Message whose payload is a list of individual message payloads. Batch publishing does not require any additional configuration – it is just how the messages are bundled and published as a collection.Batch Consumers
Depending on the type of processing you’re doing, batch processing can be ideal for consumers when dealing with a large volume of messages. It can increase efficiency by allowing your application to process messages in bulk rather than one at a time, potentially improving throughput when doing things like inserting into a database.
In Spring Cloud Stream batching of messages is applied only when the spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode is set to true. The number of messages in a batch is dependent on the availability of messages in the queue and the timeout to receive messages from the queue.
batchMaxSize
The maximum number of messages per batch.
Only applicable when batchMode is true.
Default: 255
batchTimeout
The maximum wait time in milliseconds to receive a batch of messages. If this timeout is reached, then the messages that have already been received will be used to create the batch. A value of 0 means wait forever.
Only applicable when batchMode is true.
Default: 5000
A batch of messages is just a single Spring Message whose payload is a list of individual message payloads. The solace_scst_batchedHeaders message header contains the consolidated list of message headers of each of the individual messages in the batch.
Batched messages may be consumed as follows:
@Bean Consumer<Message<List<Payload>>> batchConsume() { return batchMsg -> { // (1) List<Map<String, Object>> batchedHeaders = (List<Map<String, Object>>)batchMsg.getHeaders() .get(SolaceBinderHeaders.BATCHED_HEADERS); // (2) List<Payload> batchedPayloads = batchMsg.getPayload(); // Process individual message header and payload // iterating through the list }; }
NOTE: Message batches are non-transacted. A batch is a collection of individual messages and must not be treated as a single consistent unit.
A sample application configuration for batch consumer:
spring: cloud: function: definition: batchConsume stream: bindings: batchConsume-in-0: destination: 'batch/consume/topic' group: batch consumer: batch-mode: true binders: solace-broker: type: solace environment: solace: # (1) java: host: tcp://localhost:55555 msgVpn: default clientUsername: default clientPassword: default
You can check out the batch consumer sample in our samples Git repository.
Batch Consumersby SolaceSamplesBatch consumers handle a large number of messages as a batch. It can process data quickly, minimize or eliminate the need for multiple round-trips, and improve the efficiency of message processing. It can be ideal for bulk processing involving high-volume activities such as managing database updates, sensor data, etc.
Conclusion
As a developer, this feature empowers you to produce and consume messages in batches using Spring Cloud Stream binder for Solace PubSub+.
If you’re new to Spring Cloud Stream and would like more help getting up and running these are great places to check out for more tutorials.
- Docs: Spring Cloud Stream binder for Solace PubSub+
- Samples: Samples for Spring Cloud Stream binder for Solace PubSub+
- Tutorials: Tutorials & Codelabs for Spring Cloud Stream binder for Solace PubSub+
And last but not least Solace Community, where developers and experts hang out exchanging notes, sharing thoughts, and discussing related topics. Join the community and enjoy the perks of knowledge sharing and exchange.
Explore other posts from category: For Developers