apache-spark-mqttPreviously I had published an integration guide to integrate Spark Streaming into the Solace message bus via JMS.  This method of integration provides a plush set of integration options including Topic and Queue messaging patterns as well as enhanced security with TLS.  Integration with Spark Streaming via JMS does require extending the Spark custom receiver to support JMS.

If you are looking for a simple integration without having to write any code, then MQTT might be a better fit for your needs, as the Apache Spark that is distributed with Hortonworks, Cloudera and other Hadoop releases comes with a streaming MQTT interface built in.  As of SolOS 7.1.1, Solace makes it easy to integrate MQTT clients into the Solace message bus.   Spark provides the MQTTUtils Class that wraps up and provides a simple interface to receive messages from an MQTT broker and maps received messages into a Spark context.  The MQTTWordCount example that comes with Spark 1.4 and above shows how this is done.

In this blog I will show you what you need to do to configure the Solace Message Router to use this example and therefore map Solace messages into Spark with only Spark-provided code.

The following resources on the Solace Message Router will be required to complete this integration.

Resource Value Description
Solace Message Router IP __HOST__ The IP address of the Solace Message Router message backbone. This is the address that clients use when connecting to the Solace Message Router to send and receive message. This document uses a value of __HOST__.
Message VPN Solace_Spark_VPN A Message VPN, or virtual message broker, to scope the integration on the Solace Message Router.
Message VPN MQTT port __PORT__ An unused port will have to be selected for the Solace Message Router to listen for this message-vpn MQTT client connections.  “show service” on the Solace Message Router will show all ports currently in use. This document uses a value of __PORT__.

 

First we need to configure the Solace message router.  We will use the same basic setup as per the JMS integration guide.  For convenience I have reposted the relevant configuration here.  If you need more details it is best to go back and read the original integration guide, linked above:

(configure)# create message-vpn Solace_Spark_VPN
(configure/message-vpn)# authentication
(configure/message-vpn/authentication)# user-class client
(...message-vpn/authentication/user-class)# basic auth-type none
(...message-vpn/authentication/user-class)# exit
(configure/message-vpn/authentication)# exit
(configure/message-vpn)# no shutdown
(configure/message-vpn)# exit
(configure)#
(configure)# client-username default message-vpn Solace_Spark_VPN
(configure/client-username)# no shutdown
(configure/client-username)# exit

Now we need to add in the MQTT configuration.  First validate the MQTT feature is licenced.  For the Solace VMR, MQTT is licenced by default.  For the Solace messaging router appliance you may need to contact support_request@solace.com for steps to obtain a MQTT licence key.  Licence should look like this:

solace# show product-key
Product Key : xxxxxxxxxx-xxxxxxxxxxx-xxxxxxxxxx-MQTT-G-<HOST>
Unlocked Features : 1
Message Queuing Telemetry Transport

Next configure the MQTT feature:

(configure)# service mqtt
(configure/service/mqtt)# no shut
(configure/service/mqtt)# exit
(configure/service)# exit
(configure)#
(configure)# message-vpn Solace_Spark_VPN
(configure/message-vpn)# service mqtt
(configure/message-vpn/service/mqtt)# plain-text shutdown
(configure/message-vpn/service/mqtt)# listen-port __PORT__
(configure/message-vpn/service/mqtt)# no plain-text shutdown
(configure/message-vpn/service/mqtt)# end

That’s it; we’re done with the configuration.  Let’s test.

Here I am using an example that comes with Hortonworks 2.3.2, the same examples ship with Cloudera 5.4.

At the command prompt on the Hadoop name node:

cd /usr/hdp/2.3.2.0-2950/spark/bin

Start a simple MQTT publisher publishing to the Solace message router (let’s ignore the logs for now):

./run-example streaming.MQTTPublisher  tcp://__HOST__:__PORT__ testTopic > /dev/null &amp;

Start a Spark job running MQTTWordCount example connected to the Solace message router:

./run-example streaming.MQTTWordCount tcp://__HOST__:__PORT__ testTopic 2> /dev/null

Output will look like this:

-------------------------------------------
Time: 1454455924000 ms
-------------------------------------------

-------------------------------------------
Time: 1454455926000 ms
-------------------------------------------
(mqtt, 1166)
(spark, 1166)
(for, 1166)
(hello, 1166)
(streaming, 1166)
(demo, 1166)

-------------------------------------------
Time: 1454455928000 ms
-------------------------------------------
(mqtt, 2035)
(spark, 2035)
(for, 2035)
(hello, 2035)
(streaming, 2035)
(demo, 2035)

From the Spark Web UI you can see the map function splits off 10 tasks on each message and the print splits off one.
apache-mqtt-utilities-1

Looking at the print task will show a single execution that prints out the results as shown above.  While the map function takes the message, makes it serializable in the RDD and splits off 10 tasks as shown below to map and reduce the message body string into word counts.

apache-mqtt-utilities-2

Since the Solace message bus takes in messages from a variety of sources: enterprise, web, IoT, REST and delivers them on a uniform interface.  So you now have a simple, robust mechanism to ingest all these message types into your Spark context for analysis.

Ken Barr

Ken Barr is a Senior Product Integration Architect working with the Solace CTO group. He's focused on exploring areas in which our customers would benefit from Solace innovation, then defining how these new technologies fit into Solace’s product lines.

Prior to joining Solace, Ken was a Technical Lead at Cisco Systems holding several roles in carrier core routing business units including QA team lead for initial implementations of IPv6 and bringing next generation IOS called IOS-XR to the core routing platforms. Preceding Cisco, Ken was a Communications Electronics Engineering Officer for the Royal Canadian Air Force responsible for operational management of the National Defence Headquarters Metropolitan Area Network.