In June 2016 Eclipse announced the release of Neon, (1.2.0), the MQTT Paho Java API. The new functionality provided is automatic reconnect and offline buffering (allowing publish to be called when the application is not connected) for C, Java, JavaScript and Android clients. It also includes WebSocket support for Java and Python.

Solace continues to support open standards and open APIs by investing engineering effort to ensure these APIs interop in the most efficient manner with the Solace message router. In this blog I will look at the new auto-reconnect, high availability and publish-offline features and show how they interop with the Solace message broker to provide an end-to-end highly available solution.

Click these links to learn more about Solace’s general support for MQTT or high availability features, or obtain a full working example on Github.

High Availability with Broker URL Lists

This new feature allows the application to provide the Paho API with a list of MQTT brokers to connect to. The API will take a primary or preferred broker to connect to then use the following list to attempt to connect to secondary and possibly disaster recovery brokers. This allows the MQTT clients to take advantage of Solace message routers’ redundancy and disaster recovery features by enabling them to connect to the primary site as active then standby message routers. If this fails then it tries the disaster recovery sites active then standby message routers. This will happen without application intervention. As described in the next section, the application will be told that it has successfully connected and which broker it has connected to. Here is an example of how this is done in Java:

connOpt = new MqttConnectOptions();
connOpt.setConnectionTimeout(60); //default is 30 seconds
String[] brokerList = new String[2];
brokerList[0] = BROKER_URL1;
brokerList[1] = BROKER_URL2;
connOpt.setServerURIs(brokerList);
myClient = new MqttClient(brokerList[0],  clientID,  persistence);
myClient.setCallback(this);
myClient.connect(connOpt);

Automatic Reconnect

This new feature allows the API to automatically reconnect in the event of a disconnect from the message broker; it’s detected via keepalives, the distant-end MQTT broker, the intermediary firewall, or the load balancer closing the TCP connection. Besides the obvious use case of reconnecting automatically each time the end device MQTT Client loses connectivity (for example, by going under a bridge or into an elevator), it also makes the reconnect of the MQTT Client seamless in the event of a Solace message broker appliance failover. In this case the message broker’s IP address will move to the standby message router appliance and the MQTT client will reconnect to the same address. All of its messaging resources would be available on the standby message router exactly as per the previous active router.

There is a new interface called MQTTCallbackExtended which adds the connectionComplete(Boolean reconnect, String serverURI) callback method. This method will be called on each successful connection or reconnection to a MQTT message broker. For clean=1 sessions this will provide an opportunity to re-apply any subscriptions as the connection is now established. Between the original connectionLost() event and the new connectionComplete() event, the API will attempt to connect with an increasing time interval, starting at 1 second, doubling on each failed attempt up to 2 minutes.

Here’s an example of how to use the new callback:

public class SimpleMqttClient implements MqttCallbackExtended {
 
    MqttClient myClient;
    MqttConnectOptions connOpt;
    connOpt.setCleanSession(true);
    connOpt.setKeepAliveInterval(30);
    connOpt.setConnectionTimeout(60);
    connOpt.setAutomaticReconnect(true);
    myClient = new MqttClient(brokerList[0],  clientID,  persistence);
    myClient.setCallback(this);
    myClient.connect(connOpt);
    //Notice to application that connection is down
@Override
    public void connectionLost(Throwable t) {
        System.out.println("Connection lost!" + t.toString());
    }
 
    //Notice to application that connection is up
    @Override
    public void connectComplete(boolean reconnect,  String url) {
        if (reconnect) {
            System.out.println("Connection Reconnected! To: " + url);
        } else {
            System.out.println("Initial Connection! To: " + url1);
        }
        addSubscriptions();
    }

Disconnected Publishing

To prevent an “is not connected” MqttException while publishing messages during a reconnect, buffered publishing is possible. This allows messages to be buffered either in memory or to disk during the time the connection is reconnecting. Then once the connection is reconnected the messages are sent in publish order (messages in the disconnect buffer are sent before new publish messages). Here’s an example of how to use this:

DisconnectedBufferOptions bufferOpts = new DisconnectedBufferOptions();
    bufferOpts.setBufferEnabled(true);
    bufferOpts.setBufferSize(100); // 100 message buffer
    bufferOpts.setDeleteOldestMessages(true); // Purge oldest messages when buffer is full
    bufferOpts.setPersistBuffer(false); // Do not buffer to disk
myClient.setBufferOpts(bufferOpts);

With these new features integrated into the Eclipse Paho MQTT Java API, the MQTT client now supports and interoperates with the Solace Message Router high availability main feature set.

Ken Barr

Ken Barr is a Senior Product Integration Architect working with the Solace CTO group. He's focused on exploring areas in which our customers would benefit from Solace innovation, then defining how these new technologies fit into Solace’s product lines.

Prior to joining Solace, Ken was a Technical Lead at Cisco Systems holding several roles in carrier core routing business units including QA team lead for initial implementations of IPv6 and bringing next generation IOS called IOS-XR to the core routing platforms. Preceding Cisco, Ken was a Communications Electronics Engineering Officer for the Royal Canadian Air Force responsible for operational management of the National Defence Headquarters Metropolitan Area Network.