Solace Integration with Apache Spark MQTT Utilities

apache-spark-mqttPreviously I had published an integration guide to integrate Spark Streaming into the Solace message bus via JMS.  This method of integration provides a plush set of integration options including Topic and Queue messaging patterns as well as enhanced security with TLS.  Integration with Spark Streaming via JMS does require extending the Spark custom receiver to support JMS.

If you are looking for a simple integration without having to write any code, then MQTT might be a better fit for your needs, as the Apache Spark that is distributed with Hortonworks, Cloudera and other Hadoop releases comes with a streaming MQTT interface built in.  As of SolOS 7.1.1, Solace makes it easy to integrate MQTT clients into the Solace message bus.   Spark provides the MQTTUtils Class that wraps up and provides a simple interface to receive messages from an MQTT broker and maps received messages into a Spark context.  The MQTTWordCount example that comes with Spark 1.4 and above shows how this is done.

In this blog I will show you what you need to do to configure the Solace Message Router to use this example and therefore map Solace messages into Spark with only Spark-provided code.

The following resources on the Solace Message Router will be required to complete this integration.

ResourceValueDescription
Solace Message Router IP__HOST__The IP address of the Solace Message Router message backbone. This is the address that clients use when connecting to the Solace Message Router to send and receive message. This document uses a value of __HOST__.
Message VPNSolace_Spark_VPNA Message VPN, or virtual message broker, to scope the integration on the Solace Message Router.
Message VPN MQTT port__PORT__An unused port will have to be selected for the Solace Message Router to listen for this message-vpn MQTT client connections.  “show service” on the Solace Message Router will show all ports currently in use. This document uses a value of __PORT__.

 

First we need to configure the Solace message router.  We will use the same basic setup as per the JMS integration guide.  For convenience I have reposted the relevant configuration here.  If you need more details it is best to go back and read the original integration guide, linked above:

(configure)# create message-vpn Solace_Spark_VPN
(configure/message-vpn)# authentication
(configure/message-vpn/authentication)# user-class client
(...message-vpn/authentication/user-class)# basic auth-type none
(...message-vpn/authentication/user-class)# exit
(configure/message-vpn/authentication)# exit
(configure/message-vpn)# no shutdown
(configure/message-vpn)# exit
(configure)#
(configure)# client-username default message-vpn Solace_Spark_VPN
(configure/client-username)# no shutdown
(configure/client-username)# exit

Now we need to add in the MQTT configuration.  First validate the MQTT feature is licenced.  For the Solace VMR, MQTT is licenced by default.  For the Solace messaging router appliance you may need to contact support_request@solace.com for steps to obtain a MQTT licence key.  Licence should look like this:

solace# show product-key
Product Key : xxxxxxxxxx-xxxxxxxxxxx-xxxxxxxxxx-MQTT-G-<HOST>
Unlocked Features : 1
Message Queuing Telemetry Transport

Next configure the MQTT feature:

(configure)# service mqtt
(configure/service/mqtt)# no shut
(configure/service/mqtt)# exit
(configure/service)# exit
(configure)#
(configure)# message-vpn Solace_Spark_VPN
(configure/message-vpn)# service mqtt
(configure/message-vpn/service/mqtt)# plain-text shutdown
(configure/message-vpn/service/mqtt)# listen-port __PORT__
(configure/message-vpn/service/mqtt)# no plain-text shutdown
(configure/message-vpn/service/mqtt)# end

That’s it; we’re done with the configuration.  Let’s test.

Here I am using an example that comes with Hortonworks 2.3.2, the same examples ship with Cloudera 5.4.

At the command prompt on the Hadoop name node:

cd /usr/hdp/2.3.2.0-2950/spark/bin

Start a simple MQTT publisher publishing to the Solace message router (let’s ignore the logs for now):

./run-example streaming.MQTTPublisher  tcp://__HOST__:__PORT__ testTopic > /dev/null &amp;

Start a Spark job running MQTTWordCount example connected to the Solace message router:

./run-example streaming.MQTTWordCount tcp://__HOST__:__PORT__ testTopic 2> /dev/null

Output will look like this:

-------------------------------------------
Time: 1454455924000 ms
-------------------------------------------

-------------------------------------------
Time: 1454455926000 ms
-------------------------------------------
(mqtt, 1166)
(spark, 1166)
(for, 1166)
(hello, 1166)
(streaming, 1166)
(demo, 1166)

-------------------------------------------
Time: 1454455928000 ms
-------------------------------------------
(mqtt, 2035)
(spark, 2035)
(for, 2035)
(hello, 2035)
(streaming, 2035)
(demo, 2035)

From the Spark Web UI you can see the map function splits off 10 tasks on each message and the print splits off one.
apache-mqtt-utilities-1

Looking at the print task will show a single execution that prints out the results as shown above.  While the map function takes the message, makes it serializable in the RDD and splits off 10 tasks as shown below to map and reduce the message body string into word counts.

apache-mqtt-utilities-2

Since the Solace message bus takes in messages from a variety of sources: enterprise, web, IoT, REST and delivers them on a uniform interface.  So you now have a simple, robust mechanism to ingest all these message types into your Spark context for analysis.