Kafka is often used to aggregate massive amounts of log data and stream it to analytics engines and big data repositories. In a scenario of processing large amount of collected data, the analysis may result in actionable events that need to be delivered to specific destinations. Using only Kafka, it’s hard to achieve scalable, targeted distribution to many individual consumers. For more information on this, I will refer you to part 2 of Ken Barr’s “Why You Need to Look Beyond Kafka for Operational Use Cases” series.
Especially in IoT, consumers and the network infrastructure may not have the throughput and processing capability of picking out relevant events from a stream, so delivery must be filtered and targeted for efficiency. A good example is the case of collecting and analyzing real-time vehicle mechanical IoT data from a fleet of city buses and reacting to results by delivering commands back to individual buses. The Solace PubSub+ Platform inherently helps to overcome this limitation. The goal of this blog is to present a practical solution and to demonstrate how to integrate the Kafka platform with the PubSub+ event mesh (a network of PubSub+ event brokers) using the PubSub+ Connector for Kafka: Sink.
In the solution, one will notice that Kafka topics are coarse-grained and cannot be used to address individual vehicles, but PubSub+ allows fine-grained filtering because topics aren’t actually configured on the broker – they are defined by the publishing application. PubSub+ topics can be thought of as a property of a message, or metadata, and not like “writing to a file” or “sending to a destination.” Each and every message could be published with a unique topic. For more information on Solace topics vs. Kafka topics, read this blog post.
As the Kafka Connector Developer Guide suggests, PubSub+ event messages must be modeled as a partitioned stream to Kafka records and vice-versa. The PubSub+ Connectors have been architected such that this mapping is located in Processors (MessageProcessors in the Source Connector and RecordProcessors in the Sink Connector). This approach enables developers to easily customize the data conversion code to their specific application needs and the rest is taken care of by the PubSub+ Connector code, making use of the tunable, high-performance Solace-Java messaging API.
The Source and Sink Connector projects include sample Processors: simple ones that just map PubSub+ message contents to Kafka record values, and more advanced ones that can additionally map message properties to Kafka records keys.
As an example, the following scenario is easy to implement with the Kafka Sink Connector using the DynamicDestinations record processor. DynamicDestinations makes use of a unique Solace connector feature that supports dynamic generation of topics that overrides the static topics in the connector configuration. The source code is available from the Sink Connector GitHub location.
In our city bus IoT scenario – similar to this demo – analytics produce Kafka records that may represent commands to a fleet of buses, each listening to a PubSub+ topic structured as
<ctrl/bus//. In this case, a command of “stop” may mean an emergency request to take the bus out of service and “start” may mean allowing the bus back into service.
Below is an example of fine-grained filtered delivery to consumers. With the Kafka Sink Connector deployed (1.), a single Kafka topic can dynamically create any destination topics as PubSub+ event message attributes, individually routing events to the correct physical destination (2.).
In this scenario, it is possible to add any number of new destinations (i.e. buses) anytime, without addition of a new Kafka topic or any other change in the systems. Note that once the message is in the PubSub+ event mesh, it is converted to MQTT for sending to the appropriate bus without the need for an additional adapter.
The following main components will be required for this demo (all networked):
For this demo we will use a free-tier of the PubSub+ Event Broker: Cloud service, with all other components locally deployed.
Once the service is active, obtain connection details for the connecting clients:
Note: the followings assume Linux is running locally, you can adjust commands to reflect your environment.
Kafka requires Java JRE 8 or later, so ensure it is already installed locally.
Follow the Apache Kafka quickstart to:
Tip: if you see issues with starting the server, try to clear the Zookeeper (default /tmp/zookeeper) and Kafka logs (default /tmp/kafka-logs) then try again.
Obtain the download link, then download and expand the connector to a location accessible by Kafka. In this example we will place the connector into the
$ mkdir -p /opt/apache/kafka_2.12-2.5.0/connectors $ cd /opt/apache/kafka_2.12-2.5.0/connectors $ wget https://solaceproducts.github.io/pubsubplus-connector-kafka-sink/downloads/pubsubplus-connector-kafka-sink-<version>.tar $ tar -xvf pubsubplus-connector-kafka-sink-<version>.tar ; rm ./*.tar
Add the connector’s location to the Kafka plugin search path. We will use Kafka Connect in standalone mode, so we will edit this properties file:
$ cd /opt/apache/kafka_2.12-2.5.0 $ vi config/connect-standalone.properties
Edit the last line, ensure it is not commented out:
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins: plugin.path=/opt/apache/kafka_2.12-2.5.0/connectors
Make the necessary configurations in the PubSub+ Sink Connector’s properties:
$ cd /opt/apache/kafka_2.12-2.5.0 $ vi connectors/pubsubplus-connector-kafka-sink-<version>/etc/solace_sink.properties
Set followings correctly in the property file:
: : # Kafka topics to read from topics=test # PubSub+ connection information sol.host= tcps://mrbkvuibog5lt.messaging.solace.cloud:55443 sol.username=solace-cloud-client sol.password=vkghqm3aobegnmn6r2eu3manem sol.vpn_name=kafkatest # PubSub+ Kafka Sink connector record processor sol.record_processor_class=com.solace.connector.kafka.connect.sink.recordprocessor.SolDynamicDestinationRecordProcessor # Set to true only if using SolDynamicDestinationRecordProcessor and dynamic destinations sol.dynamic_destination=true # Connector TLS session to PubSub+ message broker properties sol.ssl_trust_store=/usr/lib/jvm/java-8-openjdk-amd64/lib/security/cacerts sol.ssl_trust_store_password=changeit : :
In the code snipped above:
Start the connector deployment in standalone mode, providing the property files just edited:
$ cd /opt/apache/kafka_2.12-2.5.0 $ bin/connect-standalone.sh \ config/connect-standalone.properties \ connectors/pubsubplus-connector-kafka-sink-<version>/etc/solace_sink.properties In the console logs you should see similar message to: INFO ================ JCSMPSession Connected
At this point, PubSub+ Connector for Kafka: Sink is up and running, ready to interpret, convert, and forward new records from the test Kafka topic to PubSub+ as events.
There are many MQTT clients around that can be used, or one could easily build one following the MQTT tutorial from Solace Samples. Here we will use the ready-to-go, open-source, third-party client: MQTT Explorer.
Configure the following advanced settings:
ctrl/bus/1234/startso set it to listen to any messages sent to the topic starting with
ctrl/bus/1234– add wildcard topic
#and only keep the ones seen below.
We have one of our “buses”, bus 1234 listening!
Let’s set up a second “bus” – start a second instance of MQTT Explorer.
It should already offer the saved connection to your PubSub+ Cloud broker.
Go to the advanced settings and configure:
Hit BACK, then SAVE, and CONNECT.
Now we have our second “bus” 3456 listening!
You are now ready to run the scenario!
From a command-line session start the Kafka console publisher tool and manually emulate an Analytics Engine creating Kafka control records to be delivered to the buses:
$ cd /opt/apache/kafka_2.12-2.5.0 $ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test >1234 start >3456 start >3456 stop >
Checking the MQTT clients, you can see that the Connector has converted these records to PubSub+ events and set the event destinations to the topics the buses are listening to. PubSub+ Event Broker has delivered them to the consumer using the MQTT protocol:
The demonstration above showed how the Kafka Sink Connector enabled fine-grained filtering, protocol conversion, and scalable delivery to endpoints by connecting Kafka to PubSub+. This would have been difficult to achieve by only using Kafka.
DynamicDestinations record processor is an example how a PubSub+ Kafka Connector can be easily extended to implement protocol conversion and business logic, tailored to a specific use case.
Check out these additional resources to discover the possibilities unleashed by PubSub+ Platform and Kafka integration:
The Kafka page in the Solace Resource Hub.