What’s New with the Neon Release of MQTT Paho

In June 2016 Eclipse announced the Neon, (1.2.0), release of the MQTT Paho Java API. The new functionality provided is automatic reconnect and offline buffering (allowing publish to be called when the application is not connected) for the C, Java, JavaScript and Android clients.  As well as WebSockets support for Java, Python.

Solace continues to support open standards and open APIs by investing engineering effort to ensure these APIs interop in the most efficient manor with the Solace message router. In this blog I will look at the new auto-reconnect, high availability and publish offline features and show how they interop with Solace message broker to provide an end-to-end highly available solution.

Click these links to learn more about Solace’s general support for MQTT or high availability features, or to obtain a full working example on Github.

High Availability with Broker URL Lists

This new feature allows the application to provide the Paho API with a list of MQTT brokers to connect to.  The API will take a primary or preferred broker to connect to then use the following list to attempt to connect to secondary and possibly disaster recovery brokers.  This allows the MQTT clients to take advantage of Solace message routers redundancy and disaster recovery features by enabling them to connect to the primary site active then standby message routers, if this fails then it tries the disaster recovery sites active then standby message routers.  All this will happen without application intervention.  As described in the next section the application will be told that it has successfully connected and which broker it has connected to. Here is an example of how this is done in Java:

connOpt = new MqttConnectOptions();
connOpt.setConnectionTimeout(60); //default is 30 seconds
String[] brokerList = new String[2];
brokerList[0] = BROKER_URL1;
brokerList[1] = BROKER_URL2;
connOpt.setServerURIs(brokerList);
myClient = new MqttClient(brokerList[0],  clientID,  persistence);
myClient.setCallback(this);
myClient.connect(connOpt);

Automatic Reconnect

This new feature allows the API to automatically reconnect in the event of a disconnect from the message broker, either detected via keepalives or via the distant end MQTT broker or intermediary firewall or load balancer closing the TCP connection.   Besides the obvious use case of reconnecting automatically each time the end device MQTT Client loses connectivity, for example going under a bridge or into an elevator.   It also makes the reconnect of the MQTT Client seamless in the event of a Solace message broker appliance failover.  In this case the message brokers IP address will move to the standby message router appliance and the MQTT client will reconnect to the same address and all of its messaging resources would be available on the standby message router exactly as per the previous active router.

There is a new interface called MQTTCallbackExtended which adds the connectionComplete(Boolean reconnect, String serverURI) callback method.  This method will be called on each successful connection or reconnection to a MQTT message broker.  For clean=1 sessions this will provide an opportunity to re-apply any subscriptions as the connection is now established.   Between the original connectionLost() event and the new connectionComplete() event, the API will  attempt to connect with an increasing time interval, starting at 1 second, doubling on each failed attempt up to 2 minutes. Here’s an example of a how to use the new callback:

public class SimpleMqttClient implements MqttCallbackExtended {
 
    MqttClient myClient;
    MqttConnectOptions connOpt;
    connOpt.setCleanSession(true);
    connOpt.setKeepAliveInterval(30);
    connOpt.setConnectionTimeout(60);
    connOpt.setAutomaticReconnect(true);
    myClient = new MqttClient(brokerList[0],  clientID,  persistence);
    myClient.setCallback(this);
    myClient.connect(connOpt);
    //Notice to application that connection is down
@Override
    public void connectionLost(Throwable t) {
        System.out.println("Connection lost!" + t.toString());
    }
 
    //Notice to application that connection is up
    @Override
    public void connectComplete(boolean reconnect,  String url) {
        if (reconnect) {
            System.out.println("Connection Reconnected! To: " + url);
        } else {
            System.out.println("Initial Connection! To: " + url1);
        }
        addSubscriptions();
    }

Disconnected Publishing

To prevent an “is not connected” MqttException to occur while publishing messages during a reconnect, buffered publishing is possible. This allows messages to be buffered either in memory or to disk during the time the connection is reconnecting. Then once the connection is reconnected the messages are sent in publish order, ie. The messages in the disconnect buffer are sent before new publish messages. Here’s an example of how to use this:

DisconnectedBufferOptions bufferOpts = new DisconnectedBufferOptions();
    bufferOpts.setBufferEnabled(true);
    bufferOpts.setBufferSize(100); // 100 message buffer
    bufferOpts.setDeleteOldestMessages(true); // Purge oldest messages when buffer is full
    bufferOpts.setPersistBuffer(false); // Do not buffer to disk
myClient.setBufferOpts(bufferOpts);

With these new features integrated into the Eclipse Paho MQTT Java API, the MQTT client now supports and interoperates with the Solace Message Router high availability main feature set.