Technical Look at Using Solace as a Channel for Apache Flume

In a previous blog I explained what the Solace channel for Apache Flume was and how it could be used to streamline data workflow from enterprise message bus to big data infrastructure and vice-versa.  In this blog I will describe how the Solace channel works and what are the technical advantages compared to memory or file channels.

Solace Flume channel receiving Flume Events from Source.

solace-flume-channel-post_1The Flume Source interacts with the Flume channel with 4 methods calls:createTransaction(), doPut(Event) and finally doCommit() or doRollback().

The Flume Channel uses the Solace message router session based transactions to implement the channel transactions.  For more information on Solace session based transactions see Using Local Transactions

If the channel receives doPut(Flume.Event) it takes the Event and transposes it into a Solace message.  This is done by taking the Event header name value pairs and placing them into a Solace message header map, then taking the Events body and writing it as a byte buffer into the body of the Solace message.  The channel then sends the message to a Solace queue inside a session based transaction. If a message fails to write to Solace then it will raise an exception in the doPut() call.

If the channel receives a doCommit() or a stop(), it will send a commit message to the Solace message router.   This makes the messages available for any message consumer, based on message routing rules.

If the channel receives a doRollback() it will send a rollback message to the Solace message router.  This will cause the messages to be deleted from the Solace message router.

Some things to note:

  1. The Persistent messaging pattern here offers a persistence level of reliability equivalent to multiple disk writes without incurring the overhead of disk I/O
  2. Each event is written as a Solace message. This means the event is now accessible to interested consumers on the Solace message bus, not just Flume Sinks.  This can unlock other uses for the data without impacting the overall performance of the Flume data flow.

The important code section for this exchange is the doPut(event) which sends the event to the Solace message router after checking capacity limits; and doCommit() which commits the entire transaction to Solace message router.  The code is as follows:

void doPut(Event event) throws InterruptedException {
    if (txCurrentTransactionCount.incrementAndGet() == transactionCapacity) {
        LOG.warn("Source has exceeded channel transaction size of: " + transactionCapacity
        + " Calling commit to keep event flow functioning.  Consider lowering source"
        + " transaction size");
        this.doCommit();
        }
    try {
        solaceSession.publishToQueue(event);
    } catch (JCSMPException e) {
        LOG.error("PUT attempt failed Message lost.. Execution will continue" 
        + e.getMessage());
    }
}

void doCommit() throws InterruptedException {
  if (transType.equals(TransactionType.PUT)) {
    try {
        solaceSession.commitPublish();
        channelCounter.addToEventPutSuccessCount(txCurrentTransactionCount.get());
            txCurrentTransactionCount.set(0);
    } catch (Exception e) {
        LOG.error("Commit for PUT tx failed: " + e.getMessage());
        throw new ChannelException(
            "Put commit to Solace failed due to error "
            + channelNameDescriptor,  e);
    }
  }
}

Solace Flume channel sending Flume Events to Sink.

solace-flume-channel-post_2Similar to the Source, the Flume Sink interacts with the Flume channel with 4 methods calls: createTransaction(), doTake(Event) and finally doCommit() or doRollback().

For each doTake() the Flume channel does a synchronous get call into the Solace API and forwards the resultant message transposed back into a Flume Event.  This operation does not incur a get/reply round trip time latency hit because the Solace API pre-fetches messages into its internal queue.

When the channel receives a doCommit() it sends a commit message back to the Solace message router which allows the router to delete it’s copy of the message.

If the channel receives a doRollback(), sends a rollback message back to the Solace message router which causes all messages to be available for re-delivered.

On the topic of re-delivered,   the Solace API also tracks delivered messages and will de-dup any message that has already been delivered and not re-requested, as well the Solace message router tracks delivered messages and will make each message it redelivers as possible-duplicate so it is possible for the application to know if re-delivery has occurred.

The important code section for this exchange is the doTake(), this method takes the Solace Message and transposes it into a Flume Event and delivers it to the Sink client.  The dotake() and doCommit() code is as follows:

Event doTake() throws InterruptedException {
    takeThread = Thread.currentThread().getId();
    LOG.debug("SolaceBackedTransaction doTake called from thread: " + takeThread);
    channelCounter.incrementEventTakeAttemptCount();
    transType = TransactionType.TAKE;
			
    if (rxCurrentTransactionCount.get() == getTransactionCapacity()) {
        LOG.warn("Sink hace exceeded channel transaction size of: " + transactionCapacity
          + " Calling commit to keep event flow functioning.  Consider lowering sink"
          + " transaction size");
          this.doCommit();
    }

    Event flumeEvent = null;
    // Set the current message to this message,  when commit is
    // called we will commit all taken messages
    LOG.debug("About to do a blocking receive");
    lastMessage = (BytesMessage) solaceSession.getMessageReceiver().receive(TIMEOUT);
    // We have timed out waiting for messages,  lets commit what we have.
    if (lastMessage == null) {
        LOG.debug("doTake did not return a message");
    } else {
        rxCurrentTransactionCount.incrementAndGet();
        flumeEvent = FlumeEventToSolaceMessageConverter.solaceToFlume(lastMessage);
	 return flumeEvent;
    }
}

void doCommit() throws InterruptedException {
  if (transType.equals(TransactionType.TAKE)) {
    try {
        solaceSession.commitConsumer();
        channelCounter.addToEventTakeSuccessCount(rxCurrentTransactionCount.get());
          rxCurrentTransactionCount.set(0);
          } catch (Exception e) {
          LOG.error("Commit for TAKE tx failed: " + e.getMessage());
         throw new ChannelException("Take commit to Solace failed due to error "
               + channelNameDescriptor,  e);
     }
  }
}

Not shown in the code sections above is the actual connection interactions with the Solace message router, which can be seen in the complete code attached.  These sections are very light weight as they take advantage of many default setting within the Solace session, but is very important as they provide the fault tolerance and resiliency of this solution.  If you would like to learn more about Solace Sessions and how to make fault tolerant connections from your application to a Solace messaging router, please see the following getting started guide, Java persistence with queues.  This will show that much of the high availability functionality is held within the Solace message router and APIs and not coded at the application level.   Solace Message Router fault tolerance  section of Solace documentation talks about Solace message router fault tolerance and how clients can leverage the high availability of the Solace message router via the Solace APIS without any client code or state knowledge.  The Solace Flume channel is a Java client that takes advantage of these Solace message router features.

This blog should have given you a basic understanding of the inner working of the Solace Flume channel as well pointers Solace documentations and examples that illustrate the fault tolerance and high availability of Java clients using persistent messaging.

Resources