This blog article is a follow-up to my previous post, An Architectural Look at Managed Subscriptions in Solace, where I explain to you a variety of reasons why you might need a subscription manager as part of your solution. I also showed you how to code a subscription manager and client using Solace’s JCSMP API in my Solace Managed Subscriptions in Action blog.
In this blog article, I will introduce MQTT clients into the picture. We’ll then modify the code from my Solace Managed Subscriptions in Action blog. This time, we’ll make the client an MQTT one.
What is MQTT?
The MQ Telemetry Transport, known as MQTT, is an ISO standard “lightweight” messaging protocol for use on top of TCP/IP and WebSockets. It is designed for connections with remote locations where a “small code footprint” is required or the network bandwidth is limited. Have a look in the Getting Started section here on the Solace dev portal for more background information. There are a few articles here that this blog will build upon. The basic Publish/Subscribe (MQTT) tutorial introduces the basics of using MQTT with the Paho Java Client library. The Request/Reply (MQTT) tutorial illustrates the nuances of doing request/reply in Solace over MQTT, where MQTT doesn’t intrinsically support this message exchange pattern.
Why use a subscription manager?
In my previous blog, I discussed some of the reasons for wanting a subscription manager, namely:
- Integration with external permission systems
- Integration with multiple permission systems
- Unbound numbers of ACL exceptions
- Topic abstraction
- Ease of auditability
When it comes to MQTT clients, topic abstraction may become even more important for you. If you have a large number of devices out there, it may be extremely cumbersome to have to configure each of them specific physical topics they need to subscribe to. Additionally, there are a number of reasons why you may want to change the physical topic which devices subscribe to including:
- Changes in the business model on the back end such as an additional level of services.
- Changes surrounding the introduction of shard patterns that simply were not conceived of beforehand.
Using topic abstraction, the devices can simply make requests to the subscription manager for “serviceX”, and let the subscription manager encapsulate the underlying physical topic behind the service. The subscription manager can then make the subscription on behalf of the client using the actual physical name.
API and Protocol mixing with Clients and the Backend
The MQTT interface provides no mechanism for making On Behalf Of subscription requests. No problem, we will simply leave the back end subscription manager very close to the way it was in my earlier Solace Managed Subscriptions in Action blog, which simply uses the Solace Java native interface (JCSMP) as its API to talk to the Solace router. As we learned in the Solace core concepts, Solace messages are compatible across APIs and protocols. From an API perspective, Java, C, C#, JavaScript clients can all communicate with each other. From a protocol perspective REST, HTTP, SMF, MQTT and JMS clients can all communicate with each other. Hence, I will simply build this solution with clients who request the subscriptions using MQTT, while my subscription manager application will process those requests using JCSMP.
In the previous blog, I demonstrated using a Solace MapMessage to send a set of value/pairs to the subscription agent. This time, since the client uses MQTT and MapMessages are not supported in MQTT, we will change the subscription manager so that it expects a BytesMessage request with a JSON payload. MQTT messages are binary messages, so this is a very natural mapping.
The Code
Let’s take a look at the code which accomplishes this in the following lab exercise.
Assumptions
To get the most out of this blog post you should be familiar with the following things:
- You are familiar with Solace core concepts.
- You are familiar with the mechanisms of publishing and consuming direct messages as detailed in the basic Publish/Subscribe (MQTT)
- You are familiar with the specifics of implementing the Request/Reply pattern in MQTT, as outlined in the Request/Reply (MQTT)
- You have read my blog post explaining the purpose of an OBO Subscription Manager.
- You are familiar with the concepts for coding an OBO Subscription Manager and client as illustrated in my previous blog post.
- You have access to a running Solace message router
One simple way to get access to a Solace message router is to start a Solace VMR load as outlined here. By default the Solace VMR will run with the “default” message VPN configured and ready for messaging and the MQTT service enabled on port 1883. Going forward, this blog post will assume that you are using the Solace VMR. If you are using a different Solace message router configuration, adapt the instructions to match your configuration.
Goals
The goal of this blog is to demonstrate a typical interaction between an MQTT client application and an OBO subscription manager using the Solace router. In this tutorial, we will review:
- How to control entitlements natively on the Solace router with Access Control Lists (ACLs).
- How to build and send a message on a “well known” topic to the OBO subscription manager.
- How one application can make subscriptions On-behalf-Of another application.
We will additionally learn:
- How to mix MQTT and JCSMP clients in a request/reply exchange with a JSON payload.
- How to implement topic abstraction.
Solace message router properties
In order to send or receive messages to a Solace message router, you need to know a few details of how to connect to the Solace message router. Specifically you need to know the following:
Resource | Value | Description |
---|---|---|
Host | String of the form tcp://DNS_NAME or tcp://IP:PORT | This is the address clients use when connecting to the Solace message router to send and receive messages.
For a Solace VMR this there is only a single interface so the IP is the same as the management IP address. For Solace message router appliances this is the host address of the message-backbone. |
The MQTT Port number is for MQTT clients to use when connecting to the Message VPN. The port number configured on a Solace VMR for the “default” message-vpn is 1883.
To see the port number configured on your Solace message router and Message VPN for the MQTT service, run the following command in the Solace message router CLI.
solace-vmr> show service
See the VMR getting started tutorial for default credentials and accounts. Then paste the above command into the CLI.
For the purposes of this tutorial, you will connect to the default message VPN of a Solace VMR so the only required information to proceed is the Solace VMR host string which this tutorial accepts as an argument.
Obtaining an MQTT Client Library
When you run the Gradle-based build, Gradle will automatically download the Paho Java Client library for you.
Creating the client usernames for this tutorial
Let’s first create the client username that we will need for this tutorial. To complete this tutorial, you will need to create a client username for the OBO subscription manager. For the MQTT clients, namely the subscription requesting client and the sample content publisher we will simply use the default client username. This is the behavior when MQTT sessions are created dynamically during the connect operation: they use the default username. Using the Solace Command Line interface (CLI), issue the following commands (or just copy and paste from here!).
enable configure create client-username oboManager message-vpn default password default subscription-manager no shutdown exit exit
This is the client username for the OBO subscription manager. We have created a new client username using all default values, with the notable exception of enabling the subscription manager setting. This setting is what allows the OBO subscriptions to be made.
For a complete overview of all configuration options for client applications, follow the Solace documentation that describes how to configure client username accounts.
Connecting the client
Just like the basic Publish/Subscribe (MQTT) tutorial example, the client must first connect to the router. The host address and MQTT port of your router or VMR is passed in on the command line. The MQTT Session Id is hard coded:
final MqttClient mqttClient = new MqttClient("tcp://" + args[0], "HelloWorldBasicRequestor"); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); mqttClient.connect(connOpts);
At this point your OBO client is connected to the Solace message router as an MQTT client. You can use SolAdmin or the CLI to view the client connection and related details.
Making a subscription request
First let’s look at the subscribing client who requests the subscription and then awaits a reply.
Before the client can send the message to the subscription manager, it must do a couple of chores:
- Obtain a unique reply-to topic from the Solace router
- Obtain its own underling Solace client name, which it will include in the subscription request message to the subscription manager.
Using Solace Message Routers, the unique reply-to topic can be obtained by adding a subscription to the designated special topic “$SYS/client/reply-to”. The reply-to topic is received asynchronously through callbacks as a message from the router itself. These callbacks are defined in MQTT by the MqttCallback interface. The same callback is used to receive all messages. In order to distinguish between the messages we inspect the topic string provided in the MqttCallback.messageArrived method.
Using the exact same technique, the Solace client name is obtained by adding a subscription to the designated special topic “$SYS/client/client-name”.
Note the code in the callback to not just handle these callbacks, but also the reply from the subscription manager as well as the content messages sent by the sample content publisher. In a production application, it would be best to not execute a string comparison on the topic name for each message. In that case, it would be better to install callback for the duration of the one-time setup intractions to get the reply-to topic and client name, and then install a different call back for handling application messages:
mqttClient.setCallback(new MqttCallback() { public void messageArrived(String topic, MqttMessage message) throws Exception { // If the topic is "$SYS/client/reply-to" then set our replyToTopic // to with the contents of the message payload received if (topic != null && topic.equals("$SYS/client/reply-to")) { replyToTopic = new String(message.getPayload()); } else if (topic != null && topic.equals("$SYS/client/client-name")) { clientName = new String(message.getPayload()); } else if (topic != null && topic.equals(replyToTopic)) { // Received a response to our request from sub mgr try { // Parse the response payload and convert to a JSONObject Object obj = parser.parse(new String(message.getPayload())); JSONObject jsonPayload = (JSONObject) obj; System.out.println("nReceived a response!" + "ntCorrelation Id: " + (String) jsonPayload.get("correlationId") + "ntResult: " + (String) jsonPayload.get(";result") + "n"); } catch (ParseException ex) { ex.printStackTrace(); } } else { // content from the publisher String time = new Timestamp(System.currentTimeMillis()).toString(); System.out.println("nReceived a Message!" + "ntTime: " + time + "ntTopic: " + topic + "ntMessage: " + new String(message.getPayload()) + "ntQoS: " + message.getQos() + "n"); } }
With this callback setup, the subscribing client can now add the subscription to the special designated topic to get the reply to address:
mqttClient.subscribe("$SYS/client/reply-to", 0);
The subscribing client uses a semaphore to block the main thread until the reply-to message has been received. Once the reply-to message has been received, the topic is obtained from the payload of the message as shown above in the callback. You must then subscribe to the obtained reply-to topic in order to receive responses. This tutorial uses a QoS level of 0 for at most once delivery for our response messages.
mqttClient.subscribe(replyToTopic, 0);
Next, the client can add the subscription on the special designated topic to obtain the client name:
mqttClient.subscribe("$SYS/client/client-name", 0);
After this setup, we need to create and send the request message to the OBO subscription manager, and receive the reply. We will use a JSON formatted request. The reply will either contain ‘ok’ or an error message if the subscription fails for any reason. Note that this reply is being returned from the OBO subscription manager. Note that we are not actually asking for a physical topic. This example asks for “The pub sub demo service”. This illustrates the concept of topic abstraction. Only the subscription manager knows the physical topic that the desired content will be published on; this client only knows it by this abstracted moniker.
After the reply is received, this program waits for user input before it quits, giving you the opportunity to check the Solace router and confirm that the subscription has been made. During this time, messages should be arriving from the publisher, and should be dumped to the screen.
// Create the request payload in JSON format JSONObject obj = new JSONObject(); obj.put("correlationId", UUID.randomUUID().toString()); obj.put("replyTo", replyToTopic); obj.put("clientName", clientName); obj.put("topicRequested", "The pub sub demo service"); String reqPayload = obj.toJSONString(); // Create a request message and set the request payload MqttMessage reqMessage = new MqttMessage(reqPayload.getBytes()); reqMessage.setQos(0); // Publish the request message mqttClient.publish(requestTopic, reqMessage);
Connecting the manager and processing the request
The subscription manager is essentially the same program that it was in the previous incarnation, with a few notable exceptions:
- It has been changed to accept a JSON payload.
- It now expects an abstract service name, rather than a hard coded topic name.
For brevity, will skip the subscription manager’s setup, and look at the code where the subscription manager handles the request.
public void onReceive(BytesXMLMessage msg) { try { BytesMessage bytes = (BytesMessage) msg; byte[] bytesArr = bytes.getData(); String Body = new String(bytesArr, "UTF-8"); Object payloadObj = parser.parse(new String(Body)); JSONObject jsonPayload = (JSONObject) payloadObj; String correlationId = (String) jsonPayload.get("correlationId"); String replyTo = (String) jsonPayload.get("replyTo"); String clientName = (String) jsonPayload.get("clientName"); String topicRequested = (String) jsonPayload.get("topicRequested"); // here we are simulating a topic abstraction service if (topicRequested.equals("The pub sub demo service")) { String physicalTopic = "T/GettingStarted/pubsub"; topicRequested = physicalTopic; // this is where we would check with an external data source to confirm // if the client is entitled to the requested topic JCSMPFactory fact = JCSMPFactory.onlyInstance(); ClientName clientNameObject = fact.createClientName(clientName); Topic requestedTopic = fact.createTopic(topicRequested); String replyText = "ok"; try { session.addSubscription(clientNameObject, requestedTopic, JCSMPSession.WAIT_FOR_CONFIRM); } catch (JCSMPException e) { replyText = "ERROR: " + e.getMessage(); } sendReply(correlationId, replyText, replyTo, prod); } else { sendReply(correlationId, "unknown service", replyTo, prod); } } catch (JCSMPException | ParseException | UnsupportedEncodingException e) { e.printStackTrace(); } }
The sample content publisher
This program is essentially exactly the same as the publishing client from the MQTT Pub/Sub getting started article, and hence we won’t explore it here.
Building the source code
Download the source code from the following GitHub repository and follow the instructions for building this project contained in the read me file:
https://github.com/SolaceMike/Solace-OBO-with-MQTT
Sample Output
Each of these 3 programs requires that you pass in the IP address of your VMR or router as a command line argument.
If you start the OBOSubscriptionManager
it will connect and wait for request messages.
$ ./build/staged/bin/oBOSubscriptionManager <your router ip> OBOSubscriptionManager initializing... Consumer and producer created...
Then you can launch the subscription requesting client to send the request message. If successful, the output for the client will look like the following:
$ ./build/staged/bin/basicRequestor <your router ip:MQTT port> BasicRequestor initializing... Connecting to Solace broker: tcp://192.168.164.111:1883 Connected Requesting Reply-To topic from Solace... Received Reply-to topic from Solace for the MQTT client: Reply-To: _P2P/v:lab-129-11/_mqtt/HelloWorldBasicRequestor/79 Subscribing client to Solace provide client name Requesting Client name from Solace... Received client name from Solace for the MQTT client: client name: #mqtt/HelloWorldBasicRequestor/79 Sending request to: T/obo/request Received a response! Correlation Id: 44b480e6-1baf-430c-b4bc-ce5d14eb5dc7 Result: ok Check the router to see your subscription. Press <return> to exit.
Note the body of the reply contains ‘ok’. This is the acknowledgment from the subscription manager. The BasicRequestor
now sits and waits. Just hit the return key when you want to stop it, after you have proven the subscription works by running the content producer and seeing the messages received by the BasicRequestor
.
With the subscription request now completed, the OBOSubscriptionManager
output will look like the following:
Redirecting service abstraction 'The pub sub demo service' to physical topic 'T/GettingStarted/pubsub'. Request Message received: from client '#mqtt/HelloWorldBasicRequestor/81 for topic T/GettingStarted/pubsub This request will be allowed; making subscription on behalf of the client. The subscription has been successfully made on the router. Exiting.
Finally, run the content producer.
$ ./build/staged/bin/topicPublisher <your router ip:MQTT port>; TopicPublisher initializing... Connecting to Solace broker: tcp://192.168.164.111:1883 Connected Publishing message: Hello world from MQTT 1 Publishing message: Hello world from MQTT 2 Publishing message: Hello world from MQTT 3 Publishing message: Hello world from MQTT 4 Publishing message: Hello world from MQTT 5 Publishing message: Hello world from MQTT 6 Publishing message: Hello world from MQTT 7 Publishing message: Hello world from MQTT 8 Publishing message: Hello world from MQTT 9 Publishing message: Hello world from MQTT 10 Messages published. Exiting
After the producer is up and running, you should see this in your BasicRequestor
output:
Received a Message! Time: 2016-08-19 14:27:09.249 Topic: T/GettingStarted/pubsub Message: Hello world from MQTT 1 QoS: 0
The client will continue dumping these content messages to the screen until you terminate it, or 20 seconds later the content publisher completes and exists.
Summary
You have now successfully connected an MQTT OBO client and subscription manager, made the client request an abstract service subscription, and have seen the OBO subscription manager make the subscription on behalf of the client.
In this blog we have examined the advantages realized when using an OBO subscription manager with MQTT clients, and examined how to code the exchange between MQTT OBO clients and an OBO subscription manager. We have seen how to:
- Create client profiles, ACLs and MQTT session Ids in the Solace CLI
- Create and consume JSON Payload messages
- Exchange messages between MQTT and JCSMP applications
- Implement the request/reply pattern with an MQTT client
- Make subscriptions On-Behalf-Of another program using the JCSMP API
Please feel free to leave me any comments regarding this example, or leave comments for the entire community to get involved in. If you have any issues sending and receiving a message, check the Solace community Q&A for answers to common issues seen.
Explore other posts from category: DevOps