Skip to main content

How to Consume Persistent Events: A Guide for Boomi Process Developers

In this Post

    Introduction

    The Solace PubSub+ Connector for Boomi allows Boomi Process developers to easily build  integrations that are asynchronously triggered by events, creating a very scalable event-driven architecture. One common way of consuming those events is in the ‘Persistent Transacted’ mode for the Connector’s Listen operation. This mode delivers events to the Boomi process through a queue, and those events are only removed from the queue when the Boomi process has executed successfully to ‘acknowledge’ it’s consumption. i.e. For high value events that need to be processed even if a Boomi process or Atom fails unexpectedly, this is the safest form of event consumption.

    When operating in this extra safe and cautious mode, developers have to consider what to do in the following two scenarios:

    1. An event has been taken off the queue, some processing has taken place, but a failure prevented the acknowledgement to be generated. (e.g. The Atom terminated unexpectedly.)
    2. An event has been taken off the queue, the payload is found to be invalid and needs to be rejected/removed.

    Fortunately there are various PubSub+ Event Broker features exposed through the connector that Boomi Developers can leverage when building integrations to handle this. With this blog post I will demonstrate these features through a worked example.

    Scenario 1: Handling the “in doubt” events. (De-duplication logic.)

    To favour processing throughput, the PubSub+ Event Broker delivers multiple events to connected Boomi queue consumers so that the next event is ready locally in the Connector, to dispatch immediately to the process as the currently worked on event is acknowledged. (The exact count is determined by the “Batch Size” setting and has a minimum allowed value of 1.)

    If a process ends with an exception, or that active connection suddenly goes away (network or Atom failure), the Event Broker will consider that whole batch of messages as failed for processing and will redeliver them to the next available (or returning) consumer to the queue. It is now the responsibility of the consuming process to determine what processing is pending for those redelivered events.

    Consider the use-case of each event representing a new order to be processed, and in that workflow stock levels are decremented in an inventory system. If an earlier ‘new order’ event was partially processed to have updated the inventory, but the Atom failed before acknowledging it, repeat processing will result in incorrect inventory should it be decremented again.

    To signal being in this rare condition the broker will mark that batch of events with a ‘redelivered’ flag that a Boomi process can test for. (e.g. Using a Route Shape in the flow.) Appropriate de-duplication logic should be implemented specifically to apply to events with this flag set.

    Scenario 2: Handling the “invalid” event (Poison message exclusion.)

    If an event on a queue results in the Boomi process to end with an exception, say if the payload was malformed and did not match the expected profile, this message will not result in an acknowledgement to the Event Broker. Instead it will return to the queue and will be delivered again to attempt re-processing. This is often described as the “poison message” that needs to be taken out of the processing queue.

    An ideal Boomi process design would catch such exceptions that can be anticipated and explicitly drop or re-route such messages. For all other exceptions that cannot be handled within the process, settings can be applied on the queue to have the event broker assist in mitigating impact:

    1. Max Redelivery Count: Cap the number of times delivery attempts should be made before giving up on the event
    2. Dead Message Queue: Move the event to another named queue so that it can be inspected and processed by a dedicated process.

    The worked example to follow below will combine both these approaches to create a robust invalid event handling solution.

    Example Boomi Process: Test Message Validation and Echo Service

    In a ‘pipeline processing’ fashion, this simple Boomi process begins with a Listen operation for a particular input event and is responsible for creating an output event at the end of its execution. The idea being that multiple such processes can be daisy-chained together so the output of one process is the input of another, choreographing together to execute a larger goal.

    The service has a simple function: To receive JSON formatted ‘test’ messages as an event, validate it to have the right fields present, then create a new ‘echo’ event based on the received contents as the output.

    If the test message is found to be invalid then a new ‘error’ event is created as the output and both the original payload and validation failure message are attached within it.

    The first iteration of the design looks like this:

    For a properly formatted test message as input, an echo event is duly created on an output topic that is also constructed using elements from the processed payload:

    Likewise, if the payload is valid JSON but has the msgId field missing, the Business Rules Shape directs it down the error path and a different output event is created on a different topic:


    That output event can be attracted to a dedicated queue for another error handling service to investigate what went wrong.

    With the process design as it stands, there are a couple of issues with it:

    1. When the input message fails to parse as valid JSON for the Business Rules shape, an exception is raised and the process ends abruptly.
    2. That invalid message returns as a redelivered message to trigger the exception again.

    For the next iteration of the design the following need to be added:

    1. Try-Catch Shape to catch the parse error and send that exception down the existing ‘Invalid payload’ execution path
    2. A check for the ‘Redelivered’ flag on the received message

    Improving the Test Message Validation and Echo Service, Version 2.

    With the changes added, the next version of the service design looks like this:

    This improved process better handles parsing errors that throw an exception to capture the reason and embed that to the error event generated. Secondly, a Route Shape is used to inspect whether the ‘Redelivered’ Document Property is set to true and there is additional logging to report this. For the messages flagged as redelivered there are two possible outcomes:

    1. The message has been seen before and can be safely acknowledged with no further processing required, or
    2. The message may have been in a previous ‘delivery batch’ to the Connector, but it has not been seen within any process execution itself.

    For outcome (1), the execution path goes straight to a Stop Shape, in effect acknowledging the message so it can be removed from the queue. For outcome (2), additional information is required to effectively de-duplicate the message. A real world process may inspect the message contents for business data that can help determine this. If that is not possible, metadata from the event broker can be used to create a more general solution.

    A “Message ID” document property is available on each received event that identifies a message and also any subsequent redeliveries of it. Saving this ID at the end of each successful process execution for comparison would appear to offer a simple de-duplication solution.

    For example, if the Message ID of ‘1082414’ is saved as a Boomi Process Property at the end of the execution, if a message arrives from the queue with the same ID and Redelivered flag set, it can be safely determined as a duplicate to ignore. Operationally it has the added advantage of showing clearly in Process Runtime view what was the last Message ID processed by a particular Process-Atom combination. (i.e. If the process is deployed on multiple Atoms and the event consumption is moving between them.)

    The limitation with this approach is that while a Process Property persists across multiple executions, it will not be consistent across Processes on different Atoms. Consider the scenario of the network connection between the Atom and the Event Broker getting interrupted. If there is only one Atom running, redelivered messages will come back to that Process when the network resumes, and the saved Process Property can help with de-duplication. If however queue delivery moves to another connected Atom, a read of the Process Property there will either be empty or stale with a much older value.

    A proposed solution to this synchronisation of the ‘last processed Message ID’ is to create an additional output event as the ‘checkpoint’ and let that coordinate de-duplication across Processes on multiple Atoms. This event can be attracted to a special ‘Last Value’ queue that only holds the single most recent event seen. Additionally all deployments of this Process can ‘browse’ the queue for this checkpoint event in a non-destructive manner. (i.e. The checkpoint is never taken off the queue, only ever replaced with a newer one.)

    Important Note: This Message ID based de-duplication and approach is only usable if the queue is in an ‘Exclusive’ mode, where only one consumer is active at any one moment. If the queue is in ‘Non-Exclusive’ mode for round-robin message consumption across many active processes, this ID no longer identifies a particular message with the same value across redelivery attempts.

    Adding State Synchronization to the Test Message Validation and Echo Service, Version 3.

    This third and final version of the design adds a new event output operation on the ‘default’ execution path. It is an event on a topic that can be considered ‘private’ in that it is only useful for deployed instances of this particular process.

    Similarly for the de-deduplication logic, a Process Call shape is used to firstly perform a Browse-Only Get Operation with the Connector on the Last-Value-Queue.

    The checkpoint message is read to then update a Process Property. Finally a Business Rules Shape is used to compare the Message ID from the checkpoint event, and the Message ID as reported by the Document Property on the ‘Redelivered’ flagged event, and route the document appropriately.

    For completeness, the final set of queues involved for this service look like the following:

    • The primary input queue: q.Boomi-Message-Echo-Service
      • Subscribed to topic: boomi/test/>
    • The associated dead letter queue for any events not gracefully handled: q.Boomi-Message-Echo-Service
      • No topic subscription, events only get moved in by the broker
    • The last value queue to hold the single most recent checkpoint event: lvq.Boomi-Message-Echo-Service
      • Subscribed to topic: boomi/checkpoint/echo-service
    • Finally an errors queue to hold output from gracefully handled errors: q.Boomi-Message-Echo-Service.Errors
      • Subscribed to topic: boomi/errors/echo-service/>

    Conclusion

    And that’s it! If you have any questions or comments feel free to start a discussion at solace.community.

    The post How to Consume Persistent Events: A Guide for Boomi Process Developers appeared first on Solace.