Batch processing in messaging usually means that the code is designed or optimized to process messages in batches instead of one at a time. Usually, the underlying messaging system and the interfaces need to support producing and consuming batches of messages. This makes the system less chatty because data is coming in chunks.

What does a “batch” mean in messaging, especially publishing and consuming? It is the ability to accumulate multiple messages and publish them at the same time instead of having to call a separate publish method for each message. In the case of message consumption, the messaging provider can accumulate multiple messages and deliver them to your application all at once for processing.

In this blog post, I will explain how Spring Cloud Stream binder for Solace PubSub+ supports batch publishing and batch consumption.

Batch Publishing

Sometimes when following the Supplier or the Function pattern of Spring Cloud Stream, you may need to send more than one output message for each one you process. You can use StreamBridge to send messages whenever you’d like, but there is also another option. That option is to return a Collection<Message<?>> object in your Function. When doing this, Spring Cloud Stream will send each member of the collection as its own message.

Batch Publish to Default Binding Destination

Let’s check it out! Review the following code snippet – the batchPublish function takes in a String and returns a Collection<Message<String>>.

@Bean 
public Function<String, Collection<Message<String>>> batchPublish() { 
    return v -> { 
        logger.info("Received: " + v); 
         
        ArrayList<Message<String>> msgList = new ArrayList<Message<String>>(); 
        msgList.add(MessageBuilder.withPayload("Payload 1").build()); 
        msgList.add(MessageBuilder.withPayload("Payload 2").build()); 
        msgList.add(MessageBuilder.withPayload("Payload 3").build()); 
         
        return msgList; 
    }; 
} 

This code will result in 3 messages being sent to the output binding destination each time a message is received on the input binding.

That’s cool! But what’s even cooler? You can combine this batch publishing functionality with dynamic publishing, where individual messages can be set with a dynamic destination.

Batch Publish to Dynamic Destinations

An image to show the batch publishing feature to fan out messages to multiple topics (inventory check, address validation, payment mode validation)
The following code will produce 3 messages, each being sent to a distinct destination whenever a message is received on the input binding.

@Bean 
public Function<String, Collection<Message<String>>> batchPublishDynamicDestination(StreamBridge sb) { 
    return v -> { 
        logger.info("Received: " + v); 
                 
        // Do some more processing and create a list of messages to send upon returning 
        ArrayList<Message<String>> msgList = new ArrayList<Message<String>>(); 
        // Send to default topic 
        msgList.add(MessageBuilder.withPayload("Payload 1").build()); 
        // Send to dynamic topics using BinderHeaders.TARGET_DESTINATION 
        msgList.add(MessageBuilder.withPayload("Payload 2").setHeader(BinderHeaders.TARGET_DESTINATION, "some/other/topic/2").build()); 
        msgList.add(MessageBuilder.withPayload("Payload 3").setHeader(BinderHeaders.TARGET_DESTINATION, "some/other/topic/3").build()); 
         
        return msgList; 
    }; 
} 

You can use the batch publishing feature to implement fanning out messages to multiple topics. This is useful when there are different types of workloads to be triggered from a single producer.

You can check out the batch publisher sample in our samples Git repository.

Batch Publishersby SolaceSamplesBatch Publishers publish messages in batches. A batch of messages is just a single Spring Message whose payload is a list of individual message payloads. Batch publishing does not require any additional configuration – it is just how the messages are bundled and published as a collection.Open on GitHub

Batch Consumers

Depending on the type of processing you’re doing, batch processing can be ideal for consumers when dealing with a large volume of messages. It can increase efficiency by allowing your application to process messages in bulk rather than one at a time, potentially improving throughput when doing things like inserting into a database.

An image showing the relationship between Spring Cloud Stream applications, the Spring Cloud Stream Binder, and the Solace PubSub+ event broker

In Spring Cloud Stream batching of messages is applied only when the spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode is set to true. The number of messages in a batch is dependent on the availability of messages in the queue and the timeout to receive messages from the queue.

batchMaxSize
The maximum number of messages per batch.
Only applicable when batchMode is true.
Default: 255

batchTimeout
The maximum wait time in milliseconds to receive a batch of messages. If this timeout is reached, then the messages that have already been received will be used to create the batch. A value of 0 means wait forever.
Only applicable when batchMode is true.
Default: 5000

A batch of messages is just a single Spring Message whose payload is a list of individual message payloads. The solace_scst_batchedHeaders message header contains the consolidated list of message headers of each of the individual messages in the batch.

Batched messages may be consumed as follows:

@Bean
Consumer<Message<List<Payload>>> batchConsume() {
    	return batchMsg -> { // (1)
        		List<Map<String, Object>> batchedHeaders = 
                			(List<Map<String, Object>>)batchMsg.getHeaders()
                        .get(SolaceBinderHeaders.BATCHED_HEADERS); // (2)

        		List<Payload> batchedPayloads = batchMsg.getPayload();

        // Process individual message header and payload 	
        		// iterating through the list
    	};
}

NOTE: Message batches are non-transacted. A batch is a collection of individual messages and must not be treated as a single consistent unit.

A sample application configuration for batch consumer:

spring: 
  cloud: 
    function: 
      definition: batchConsume 
    stream: 
      bindings: 
        batchConsume-in-0: 
          destination: 'batch/consume/topic' 
          group: batch 
          consumer: 
            batch-mode: true 
      binders: 
        solace-broker: 
          type: solace 
          environment: 
            solace: # (1) 
              java: 
                host: tcp://localhost:55555 
                msgVpn: default 
                clientUsername: default 
                clientPassword: default 

You can check out the batch consumer sample in our samples Git repository.
Batch Consumersby SolaceSamplesBatch consumers handle a large number of messages as a batch. It can process data quickly, minimize or eliminate the need for multiple round-trips, and improve the efficiency of message processing. It can be ideal for bulk processing involving high-volume activities such as managing database updates, sensor data, etc.Open on GitHub

Conclusion

As a developer, this feature empowers you to produce and consume messages in batches using Spring Cloud Stream binder for Solace PubSub+.

If you’re new to Spring Cloud Stream and would like more help getting up and running these are great places to check out for more tutorials.

And last but not least Solace Community, where developers and experts hang out exchanging notes, sharing thoughts, and discussing related topics. Join the community and enjoy the perks of knowledge sharing and exchange.

Giri Venkatesan
Giri Venkatesan

Giri is a developer advocate with extensive experience in various technical domains, including integration and master data management. He started his engineering journey in the classic EAI & B2B Integration space and has been a part of the integration evolution culminating in modern EDA, microservices, Spring, and other low-code/no-code frameworks. He has a keen interest in building and promoting creative solutions and is a huge fan of open-source standards and applications. He is excited to identify and explore tools and frameworks to aid businesses in their quest for achieving efficiency and increased productivity.