/******************************************************************************* * Copyright (c) 1999, 2015 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Ian Craggs - Per subscription message handlers bug 466579 * Ian Craggs - ack control (bug 472172) * */ package org.eclipse.paho.android.service; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.security.KeyManagementException; import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClientPersistence; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttSecurityException; import org.eclipse.paho.client.mqttv3.MqttToken; import android.content.BroadcastReceiver; import android.content.ComponentName; import android.content.Context; import android.content.Intent; import android.content.IntentFilter; import android.content.ServiceConnection; import android.os.Bundle; import android.os.IBinder; import android.support.v4.content.LocalBroadcastManager; import android.util.SparseArray; /** * Enables an android application to communicate with an MQTT server using non-blocking methods. *

* Implementation of the MQTT asynchronous client interface {@link IMqttAsyncClient} , using the MQTT * android service to actually interface with MQTT server. It provides android applications a simple programming interface to all features of the MQTT version 3.1 * specification including: *

*

*/ public class MqttAndroidClient extends BroadcastReceiver implements IMqttAsyncClient { /** * * The Acknowledgment mode for messages received from {@link MqttCallback#messageArrived(String, MqttMessage)} * */ public enum Ack { /** * As soon as the {@link MqttCallback#messageArrived(String, MqttMessage)} returns, * the message has been acknowledged as received . */ AUTO_ACK, /** * When {@link MqttCallback#messageArrived(String, MqttMessage)} returns, the message * will not be acknowledged as received, the application will have to make an acknowledgment call * to {@link MqttAndroidClient} using {@link MqttAndroidClient#acknowledgeMessage(String)} */ MANUAL_ACK } private static final String SERVICE_NAME = "org.eclipse.paho.android.service.MqttService"; private static final int BIND_SERVICE_FLAG = 0; private static ExecutorService pool = Executors.newCachedThreadPool(); /** * ServiceConnection to process when we bind to our service */ private final class MyServiceConnection implements ServiceConnection { @Override public void onServiceConnected(ComponentName name, IBinder binder) { mqttService = ((MqttServiceBinder) binder).getService(); bindedService = true; // now that we have the service available, we can actually // connect... doConnect(); } @Override public void onServiceDisconnected(ComponentName name) { mqttService = null; } } // Listener for when the service is connected or disconnected private MyServiceConnection serviceConnection = new MyServiceConnection(); // The Android Service which will process our mqtt calls private MqttService mqttService; // An identifier for the underlying client connection, which we can pass to // the service private String clientHandle; Context myContext; // We hold the various tokens in a collection and pass identifiers for them // to the service private SparseArray tokenMap = new SparseArray(); private int tokenNumber = 0; // Connection data private String serverURI; private String clientId; private MqttClientPersistence persistence = null; private MqttConnectOptions connectOptions; private IMqttToken connectToken; // The MqttCallback provided by the application private MqttCallback callback; private MqttTraceHandler traceCallback; //The acknowledgment that a message has been processed by the application private Ack messageAck; private boolean traceEnabled = false; private volatile boolean receiverRegistered = false; private volatile boolean bindedService = false; /** * Constructor - create an MqttAndroidClient that can be used to communicate with an MQTT server on android * * @param context * object used to pass context to the callback. * @param serverURI * specifies the protocol, host name and port to be used to * connect to an MQTT server * @param clientId * specifies the name by which this connection should be * identified to the server */ public MqttAndroidClient(Context context, String serverURI, String clientId) { this(context, serverURI, clientId, null, Ack.AUTO_ACK); } /** * Constructor - create an MqttAndroidClient that can be used to communicate * with an MQTT server on android * * @param ctx * Application's context * @param serverURI * specifies the protocol, host name and port to be used to * connect to an MQTT server * @param clientId * specifies the name by which this connection should be * identified to the server * @param ackType * how the application wishes to acknowledge a message has been * processed */ public MqttAndroidClient(Context ctx, String serverURI, String clientId, Ack ackType) { this(ctx, serverURI, clientId, null, ackType); } /** * Constructor - create an MqttAndroidClient that can be used to communicate * with an MQTT server on android * * @param ctx * Application's context * @param serverURI * specifies the protocol, host name and port to be used to * connect to an MQTT server * @param clientId * specifies the name by which this connection should be * identified to the server * @param persistence * The object to use to store persisted data */ public MqttAndroidClient(Context ctx, String serverURI, String clientId, MqttClientPersistence persistence) { this(ctx, serverURI, clientId, persistence, Ack.AUTO_ACK); } /** * Constructor- create an MqttAndroidClient that can be used to communicate * with an MQTT server on android * * @param context * used to pass context to the callback. * @param serverURI * specifies the protocol, host name and port to be used to * connect to an MQTT server * @param clientId * specifies the name by which this connection should be * identified to the server * @param persistence * the persistence class to use to store in-flight message. If * null then the default persistence mechanism is used * @param ackType * how the application wishes to acknowledge a message has been * processed. */ public MqttAndroidClient(Context context, String serverURI, String clientId, MqttClientPersistence persistence, Ack ackType) { myContext = context; this.serverURI = serverURI; this.clientId = clientId; this.persistence = persistence; messageAck = ackType; } /** * Determines if this client is currently connected to the server. * * @return true if connected, false otherwise. */ @Override public boolean isConnected() { if (clientHandle != null && mqttService != null) { return mqttService.isConnected(clientHandle); } else { return false; } } /** * Returns the client ID used by this client. *

* All clients connected to the same server or server farm must have a * unique ID. *

* * @return the client ID used by this client. */ @Override public String getClientId() { return clientId; } /** * Returns the URI address of the server used by this client. *

* The format of the returned String is the same as that used on the * constructor. *

* * @return the server's address, as a URI String. */ @Override public String getServerURI() { return serverURI; } /** * Close the client. Releases all resource associated with the client. After * the client has been closed it cannot be reused. For instance attempts to * connect will fail. * * @throws MqttException * if the client is not disconnected. */ @Override public void close() { if (clientHandle == null) { System.out.println(serverURI); System.out.println(clientId); System.out.println(myContext.getApplicationInfo().packageName); System.out.println(persistence); clientHandle = mqttService.getClient(serverURI, clientId, myContext.getApplicationInfo().packageName,persistence); } mqttService.close(clientHandle); } /** * Connects to an MQTT server using the default options. *

* The default options are specified in {@link MqttConnectOptions} class. *

* * @throws MqttException * for any connected problems * @return token used to track and wait for the connect to complete. The * token will be passed to the callback methods if a callback is * set. * @see #connect(MqttConnectOptions, Object, IMqttActionListener) */ @Override public IMqttToken connect() throws MqttException { return connect(null, null); } /** * Connects to an MQTT server using the provided connect options. *

* The connection will be established using the options specified in the * {@link MqttConnectOptions} parameter. *

* * @param options * a set of connection parameters that override the defaults. * @throws MqttException * for any connected problems * @return token used to track and wait for the connect to complete. The * token will be passed to any callback that has been set. * @see #connect(MqttConnectOptions, Object, IMqttActionListener) */ @Override public IMqttToken connect(MqttConnectOptions options) throws MqttException { return connect(options, null, null); } /** * Connects to an MQTT server using the default options. *

* The default options are specified in {@link MqttConnectOptions} class. *

* * @param userContext * optional object used to pass context to the callback. Use null * if not required. * @param callback * optional listener that will be notified when the connect * completes. Use null if not required. * @throws MqttException * for any connected problems * @return token used to track and wait for the connect to complete. The * token will be passed to any callback that has been set. * @see #connect(MqttConnectOptions, Object, IMqttActionListener) */ @Override public IMqttToken connect(Object userContext, IMqttActionListener callback) throws MqttException { return connect(new MqttConnectOptions(), userContext, callback); } /** * Connects to an MQTT server using the specified options. *

* The server to connect to is specified on the constructor. It is * recommended to call {@link #setCallback(MqttCallback)} prior to * connecting in order that messages destined for the client can be accepted * as soon as the client is connected. *

*

* The method returns control before the connect completes. Completion can * be tracked by: *

*

* * @param options * a set of connection parameters that override the defaults. * @param userContext * optional object for used to pass context to the callback. Use * null if not required. * @param callback * optional listener that will be notified when the connect * completes. Use null if not required. * @return token used to track and wait for the connect to complete. The * token will be passed to any callback that has been set. * @throws MqttException * for any connected problems, including communication errors */ @Override public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException { IMqttToken token = new MqttTokenAndroid(this, userContext, callback); connectOptions = options; connectToken = token; /* * The actual connection depends on the service, which we start and bind * to here, but which we can't actually use until the serviceConnection * onServiceConnected() method has run (asynchronously), so the * connection itself takes place in the onServiceConnected() method */ if (mqttService == null) { // First time - must bind to the service Intent serviceStartIntent = new Intent(); serviceStartIntent.setClassName(myContext, SERVICE_NAME); Object service = myContext.startService(serviceStartIntent); if (service == null) { IMqttActionListener listener = token.getActionCallback(); if (listener != null) { listener.onFailure(token, new RuntimeException( "cannot start service " + SERVICE_NAME)); } } // We bind with BIND_SERVICE_FLAG (0), leaving us the manage the lifecycle // until the last time it is stopped by a call to stopService() myContext.bindService(serviceStartIntent, serviceConnection, Context.BIND_AUTO_CREATE); if (!receiverRegistered) registerReceiver(this); } else { pool.execute(new Runnable() { @Override public void run() { doConnect(); //Register receiver to show shoulder tap. if (!receiverRegistered) registerReceiver(MqttAndroidClient.this); } }); } return token; } private void registerReceiver(BroadcastReceiver receiver) { IntentFilter filter = new IntentFilter(); filter.addAction(MqttServiceConstants.CALLBACK_TO_ACTIVITY); LocalBroadcastManager.getInstance(myContext).registerReceiver(receiver, filter); receiverRegistered = true; } /** * Actually do the mqtt connect operation */ private void doConnect() { if (clientHandle == null) { clientHandle = mqttService.getClient(serverURI, clientId,myContext.getApplicationInfo().packageName, persistence); } mqttService.setTraceEnabled(traceEnabled); mqttService.setTraceCallbackId(clientHandle); String activityToken = storeToken(connectToken); try { mqttService.connect(clientHandle, connectOptions, null, activityToken); } catch (MqttException e) { IMqttActionListener listener = connectToken.getActionCallback(); if (listener != null) { listener.onFailure(connectToken, e); } } } /** * Disconnects from the server. *

* An attempt is made to quiesce the client allowing outstanding work to * complete before disconnecting. It will wait for a maximum of 30 seconds * for work to quiesce before disconnecting. This method must not be called * from inside {@link MqttCallback} methods. *

* * @return token used to track and wait for disconnect to complete. The * token will be passed to any callback that has been set. * @throws MqttException * for problems encountered while disconnecting * @see #disconnect(long, Object, IMqttActionListener) */ @Override public IMqttToken disconnect() throws MqttException { IMqttToken token = new MqttTokenAndroid(this, null, (IMqttActionListener) null); String activityToken = storeToken(token); mqttService.disconnect(clientHandle, null, activityToken); return token; } /** * Disconnects from the server. *

* An attempt is made to quiesce the client allowing outstanding work to * complete before disconnecting. It will wait for a maximum of the * specified quiesce time for work to complete before disconnecting. This * method must not be called from inside {@link MqttCallback} methods. *

* * @param quiesceTimeout * the amount of time in milliseconds to allow for existing work * to finish before disconnecting. A value of zero or less means * the client will not quiesce. * @return token used to track and wait for disconnect to complete. The * token will be passed to the callback methods if a callback is * set. * @throws MqttException * for problems encountered while disconnecting * @see #disconnect(long, Object, IMqttActionListener) */ @Override public IMqttToken disconnect(long quiesceTimeout) throws MqttException { IMqttToken token = new MqttTokenAndroid(this, null, (IMqttActionListener) null); String activityToken = storeToken(token); mqttService.disconnect(clientHandle, quiesceTimeout, null, activityToken); return token; } /** * Disconnects from the server. *

* An attempt is made to quiesce the client allowing outstanding work to * complete before disconnecting. It will wait for a maximum of 30 seconds * for work to quiesce before disconnecting. This method must not be called * from inside {@link MqttCallback} methods. *

* * @param userContext * optional object used to pass context to the callback. Use null * if not required. * @param callback * optional listener that will be notified when the disconnect * completes. Use null if not required. * @return token used to track and wait for the disconnect to complete. The * token will be passed to any callback that has been set. * @throws MqttException * for problems encountered while disconnecting * @see #disconnect(long, Object, IMqttActionListener) */ @Override public IMqttToken disconnect(Object userContext, IMqttActionListener callback) throws MqttException { IMqttToken token = new MqttTokenAndroid(this, userContext, callback); String activityToken = storeToken(token); mqttService.disconnect(clientHandle, null, activityToken); return token; } /** * Disconnects from the server. *

* The client will wait for {@link MqttCallback} methods to complete. It * will then wait for up to the quiesce timeout to allow for work which has * already been initiated to complete. For instance when a QoS 2 message has * started flowing to the server but the QoS 2 flow has not completed.It * prevents new messages being accepted and does not send any messages that * have been accepted but not yet started delivery across the network to the * server. When work has completed or after the quiesce timeout, the client * will disconnect from the server. If the cleanSession flag was set to * false and next time it is also set to false in the connection, the * messages made in QoS 1 or 2 which were not previously delivered will be * delivered this time. *

*

* This method must not be called from inside {@link MqttCallback} methods. *

*

* The method returns control before the disconnect completes. Completion * can be tracked by: *

*

* * @param quiesceTimeout * the amount of time in milliseconds to allow for existing work * to finish before disconnecting. A value of zero or less means * the client will not quiesce. * @param userContext * optional object used to pass context to the callback. Use null * if not required. * @param callback * optional listener that will be notified when the disconnect * completes. Use null if not required. * @return token used to track and wait for the disconnect to complete. The * token will be passed to any callback that has been set. * @throws MqttException * for problems encountered while disconnecting */ @Override public IMqttToken disconnect(long quiesceTimeout, Object userContext, IMqttActionListener callback) throws MqttException { IMqttToken token = new MqttTokenAndroid(this, userContext, callback); String activityToken = storeToken(token); mqttService.disconnect(clientHandle, quiesceTimeout, null, activityToken); return token; } /** * Publishes a message to a topic on the server. *

* A convenience method, which will create a new {@link MqttMessage} object * with a byte array payload and the specified QoS, and then publish it. *

* * @param topic * to deliver the message to, for example "finance/stock/ibm". * @param payload * the byte array to use as the payload * @param qos * the Quality of Service to deliver the message at. Valid values * are 0, 1 or 2. * @param retained * whether or not this message should be retained by the server. * @return token used to track and wait for the publish to complete. The * token will be passed to any callback that has been set. * @throws MqttPersistenceException * when a problem occurs storing the message * @throws IllegalArgumentException * if value of QoS is not 0, 1 or 2. * @throws MqttException * for other errors encountered while publishing the message. * For instance, too many messages are being processed. * @see #publish(String, MqttMessage, Object, IMqttActionListener) */ @Override public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException { return publish(topic, payload, qos, retained, null, null); } /** * Publishes a message to a topic on the server. Takes an * {@link MqttMessage} message and delivers it to the server at the * requested quality of service. * * @param topic * to deliver the message to, for example "finance/stock/ibm". * @param message * to deliver to the server * @return token used to track and wait for the publish to complete. The * token will be passed to any callback that has been set. * @throws MqttPersistenceException * when a problem occurs storing the message * @throws IllegalArgumentException * if value of QoS is not 0, 1 or 2. * @throws MqttException * for other errors encountered while publishing the message. * For instance client not connected. * @see #publish(String, MqttMessage, Object, IMqttActionListener) */ @Override public IMqttDeliveryToken publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException { return publish(topic, message, null, null); } /** * Publishes a message to a topic on the server. *

* A convenience method, which will create a new {@link MqttMessage} object * with a byte array payload, the specified QoS and retained, then publish it. *

* * @param topic * to deliver the message to, for example "finance/stock/ibm". * @param payload * the byte array to use as the payload * @param qos * the Quality of Service to deliver the message at. Valid values * are 0, 1 or 2. * @param retained * whether or not this message should be retained by the server. * @param userContext * optional object used to pass context to the callback. Use null * if not required. * @param callback * optional listener that will be notified when message delivery * has completed to the requested quality of service * @return token used to track and wait for the publish to complete. The * token will be passed to any callback that has been set. * @throws MqttPersistenceException * when a problem occurs storing the message * @throws IllegalArgumentException * if value of QoS is not 0, 1 or 2. * @throws MqttException * for other errors encountered while publishing the message. * For instance client not connected. * @see #publish(String, MqttMessage, Object, IMqttActionListener) */ @Override public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained, Object userContext, IMqttActionListener callback) throws MqttException, MqttPersistenceException { MqttMessage message = new MqttMessage(payload); message.setQos(qos); message.setRetained(retained); MqttDeliveryTokenAndroid token = new MqttDeliveryTokenAndroid( this, userContext, callback, message); String activityToken = storeToken(token); IMqttDeliveryToken internalToken = mqttService.publish(clientHandle, topic, payload, qos, retained, null, activityToken); token.setDelegate(internalToken); return token; } /** * Publishes a message to a topic on the server. *

* Once this method has returned cleanly, the message has been accepted for * publication by the client and will be delivered on a background thread. * In the event the connection fails or the client stops, Messages will be * delivered to the requested quality of service once the connection is * re-established to the server on condition that: *

*

* *

* When building an application, the design of the topic tree should take * into account the following principles of topic name syntax and semantics: *

* * * *

* The following principles apply to the construction and content of a topic * tree: *

* * *

*

* The method returns control before the publish completes. Completion can * be tracked by: *

*

* * @param topic * to deliver the message to, for example "finance/stock/ibm". * @param message * to deliver to the server * @param userContext * optional object used to pass context to the callback. Use null * if not required. * @param callback * optional listener that will be notified when message delivery * has completed to the requested quality of service * @return token used to track and wait for the publish to complete. The * token will be passed to callback methods if set. * @throws MqttPersistenceException * when a problem occurs storing the message * @throws IllegalArgumentException * if value of QoS is not 0, 1 or 2. * @throws MqttException * for other errors encountered while publishing the message. * For instance, client not connected. * @see MqttMessage */ @Override public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) throws MqttException, MqttPersistenceException { MqttDeliveryTokenAndroid token = new MqttDeliveryTokenAndroid( this, userContext, callback, message); String activityToken = storeToken(token); IMqttDeliveryToken internalToken = mqttService.publish(clientHandle, topic, message, null, activityToken); token.setDelegate(internalToken); return token; } /** * Subscribe to a topic, which may include wildcards. * * @param topic * the topic to subscribe to, which can include wildcards. * @param qos * the maximum quality of service at which to subscribe. Messages * published at a lower quality of service will be received at * the published QoS. Messages published at a higher quality of * service will be received using the QoS specified on the * subscription. * @return token used to track and wait for the subscribe to complete. The * token will be passed to callback methods if set. * @throws MqttSecurityException * for security related problems * @throws MqttException * for non security related problems * * @see #subscribe(String[], int[], Object, IMqttActionListener) */ @Override public IMqttToken subscribe(String topic, int qos) throws MqttException, MqttSecurityException { return subscribe(topic, qos, null, null); } /** * Subscribe to multiple topics, each topic may include wildcards. * *

* Provides an optimized way to subscribe to multiple topics compared to * subscribing to each one individually. *

* * @param topic * one or more topics to subscribe to, which can include * wildcards * @param qos * the maximum quality of service at which to subscribe. Messages * published at a lower quality of service will be received at * the published QoS. Messages published at a higher quality of * service will be received using the QoS specified on the * subscription. * @return token used to track and wait for the subscription to complete. The * token will be passed to callback methods if set. * @throws MqttSecurityException * for security related problems * @throws MqttException * for non security related problems * * @see #subscribe(String[], int[], Object, IMqttActionListener) */ @Override public IMqttToken subscribe(String[] topic, int[] qos) throws MqttException, MqttSecurityException { return subscribe(topic, qos, null, null); } /** * Subscribe to a topic, which may include wildcards. * * @param topic * the topic to subscribe to, which can include wildcards. * @param qos * the maximum quality of service at which to subscribe. Messages * published at a lower quality of service will be received at * the published QoS. Messages published at a higher quality of * service will be received using the QoS specified on the * subscription. * @param userContext * optional object used to pass context to the callback. Use null * if not required. * @param callback * optional listener that will be notified when subscribe has * completed * @return token used to track and wait for the subscribe to complete. The * token will be passed to callback methods if set. * @throws MqttException * if there was an error when registering the subscription. * * @see #subscribe(String[], int[], Object, IMqttActionListener) */ @Override public IMqttToken subscribe(String topic, int qos, Object userContext, IMqttActionListener callback) throws MqttException { IMqttToken token = new MqttTokenAndroid(this, userContext, callback, new String[]{topic}); String activityToken = storeToken(token); mqttService.subscribe(clientHandle, topic, qos, null, activityToken); return token; } /** * Subscribes to multiple topics, each topic may include wildcards. *

* Provides an optimized way to subscribe to multiple topics compared to * subscribing to each one individually. *

*

* The {@link #setCallback(MqttCallback)} method should be called before * this method, otherwise any received messages will be discarded. *

*

* If (@link MqttConnectOptions#setCleanSession(boolean)} was set to true, * when connecting to the server, the subscription remains in place until * either: *