/******************************************************************************* * Copyright (c) 1999, 2016 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Ian Craggs - Per subscription message handlers bug 466579 * Ian Craggs - ack control (bug 472172) * */ package org.eclipse.paho.android.service; import java.io.IOException; import java.io.InputStream; import java.security.KeyManagementException; import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClientPersistence; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttSecurityException; import org.eclipse.paho.client.mqttv3.MqttToken; import android.content.BroadcastReceiver; import android.content.ComponentName; import android.content.Context; import android.content.Intent; import android.content.IntentFilter; import android.content.ServiceConnection; import android.os.Bundle; import android.os.IBinder; import android.support.v4.content.LocalBroadcastManager; import android.util.SparseArray; /** * Enables an android application to communicate with an MQTT server using non-blocking methods. *
* Implementation of the MQTT asynchronous client interface {@link IMqttAsyncClient} , using the MQTT * android service to actually interface with MQTT server. It provides android applications a simple programming interface to all features of the MQTT version 3.1 * specification including: *
*true
if connected, false
otherwise.
*/
@Override
public boolean isConnected() {
return clientHandle != null && mqttService != null && mqttService.isConnected(clientHandle);
}
/**
* Returns the client ID used by this client.
* * All clients connected to the same server or server farm must have a * unique ID. *
* * @return the client ID used by this client. */ @Override public String getClientId() { return clientId; } /** * Returns the URI address of the server used by this client. ** The format of the returned String is the same as that used on the * constructor. *
* * @return the server's address, as a URI String. */ @Override public String getServerURI() { return serverURI; } /** * Close the client. Releases all resource associated with the client. After * the client has been closed it cannot be reused. For instance attempts to * connect will fail. * */ @Override public void close() { if(mqttService != null){ if (clientHandle == null) { clientHandle = mqttService.getClient(serverURI, clientId, myContext.getApplicationInfo().packageName,persistence); } mqttService.close(clientHandle); } } /** * Connects to an MQTT server using the default options. ** The default options are specified in {@link MqttConnectOptions} class. *
* * @throws MqttException * for any connected problems * @return token used to track and wait for the connect to complete. The * token will be passed to the callback methods if a callback is * set. * @see #connect(MqttConnectOptions, Object, IMqttActionListener) */ @Override public IMqttToken connect() throws MqttException { return connect(null, null); } /** * Connects to an MQTT server using the provided connect options. ** The connection will be established using the options specified in the * {@link MqttConnectOptions} parameter. *
* * @param options * a set of connection parameters that override the defaults. * @throws MqttException * for any connected problems * @return token used to track and wait for the connect to complete. The * token will be passed to any callback that has been set. * @see #connect(MqttConnectOptions, Object, IMqttActionListener) */ @Override public IMqttToken connect(MqttConnectOptions options) throws MqttException { return connect(options, null, null); } /** * Connects to an MQTT server using the default options. ** The default options are specified in {@link MqttConnectOptions} class. *
* * @param userContext * optional object used to pass context to the callback. Use null * if not required. * @param callback * optional listener that will be notified when the connect * completes. Use null if not required. * @throws MqttException * for any connected problems * @return token used to track and wait for the connect to complete. The * token will be passed to any callback that has been set. * @see #connect(MqttConnectOptions, Object, IMqttActionListener) */ @Override public IMqttToken connect(Object userContext, IMqttActionListener callback) throws MqttException { return connect(new MqttConnectOptions(), userContext, callback); } /** * Connects to an MQTT server using the specified options. ** The server to connect to is specified on the constructor. It is * recommended to call {@link #setCallback(MqttCallback)} prior to * connecting in order that messages destined for the client can be accepted * as soon as the client is connected. *
* ** The method returns control before the connect completes. Completion can * be tracked by: *
** An attempt is made to quiesce the client allowing outstanding work to * complete before disconnecting. It will wait for a maximum of 30 seconds * for work to quiesce before disconnecting. This method must not be called * from inside {@link MqttCallback} methods. *
* * @return token used to track and wait for disconnect to complete. The * token will be passed to any callback that has been set. * @throws MqttException * for problems encountered while disconnecting * @see #disconnect(long, Object, IMqttActionListener) */ @Override public IMqttToken disconnect() throws MqttException { IMqttToken token = new MqttTokenAndroid(this, null, null); String activityToken = storeToken(token); mqttService.disconnect(clientHandle, null, activityToken); return token; } /** * Disconnects from the server. ** An attempt is made to quiesce the client allowing outstanding work to * complete before disconnecting. It will wait for a maximum of the * specified quiesce time for work to complete before disconnecting. This * method must not be called from inside {@link MqttCallback} methods. *
* * @param quiesceTimeout * the amount of time in milliseconds to allow for existing work * to finish before disconnecting. A value of zero or less means * the client will not quiesce. * @return token used to track and wait for disconnect to complete. The * token will be passed to the callback methods if a callback is * set. * @throws MqttException * for problems encountered while disconnecting * @see #disconnect(long, Object, IMqttActionListener) */ @Override public IMqttToken disconnect(long quiesceTimeout) throws MqttException { IMqttToken token = new MqttTokenAndroid(this, null, null); String activityToken = storeToken(token); mqttService.disconnect(clientHandle, quiesceTimeout, null, activityToken); return token; } /** * Disconnects from the server. ** An attempt is made to quiesce the client allowing outstanding work to * complete before disconnecting. It will wait for a maximum of 30 seconds * for work to quiesce before disconnecting. This method must not be called * from inside {@link MqttCallback} methods. *
* * @param userContext * optional object used to pass context to the callback. Use null * if not required. * @param callback * optional listener that will be notified when the disconnect * completes. Use null if not required. * @return token used to track and wait for the disconnect to complete. The * token will be passed to any callback that has been set. * @throws MqttException * for problems encountered while disconnecting * @see #disconnect(long, Object, IMqttActionListener) */ @Override public IMqttToken disconnect(Object userContext, IMqttActionListener callback) throws MqttException { IMqttToken token = new MqttTokenAndroid(this, userContext, callback); String activityToken = storeToken(token); mqttService.disconnect(clientHandle, null, activityToken); return token; } /** * Disconnects from the server. ** The client will wait for {@link MqttCallback} methods to complete. It * will then wait for up to the quiesce timeout to allow for work which has * already been initiated to complete. For instance when a QoS 2 message has * started flowing to the server but the QoS 2 flow has not completed.It * prevents new messages being accepted and does not send any messages that * have been accepted but not yet started delivery across the network to the * server. When work has completed or after the quiesce timeout, the client * will disconnect from the server. If the cleanSession flag was set to * false and next time it is also set to false in the connection, the * messages made in QoS 1 or 2 which were not previously delivered will be * delivered this time. *
** This method must not be called from inside {@link MqttCallback} methods. *
** The method returns control before the disconnect completes. Completion * can be tracked by: *
** A convenience method, which will create a new {@link MqttMessage} object * with a byte array payload and the specified QoS, and then publish it. *
* * @param topic * to deliver the message to, for example "finance/stock/ibm". * @param payload * the byte array to use as the payload * @param qos * the Quality of Service to deliver the message at. Valid values * are 0, 1 or 2. * @param retained * whether or not this message should be retained by the server. * @return token used to track and wait for the publish to complete. The * token will be passed to any callback that has been set. * @throws MqttPersistenceException * when a problem occurs storing the message * @throws IllegalArgumentException * if value of QoS is not 0, 1 or 2. * @throws MqttException * for other errors encountered while publishing the message. * For instance, too many messages are being processed. * @see #publish(String, MqttMessage, Object, IMqttActionListener) */ @Override public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException { return publish(topic, payload, qos, retained, null, null); } /** * Publishes a message to a topic on the server. Takes an * {@link MqttMessage} message and delivers it to the server at the * requested quality of service. * * @param topic * to deliver the message to, for example "finance/stock/ibm". * @param message * to deliver to the server * @return token used to track and wait for the publish to complete. The * token will be passed to any callback that has been set. * @throws MqttPersistenceException * when a problem occurs storing the message * @throws IllegalArgumentException * if value of QoS is not 0, 1 or 2. * @throws MqttException * for other errors encountered while publishing the message. * For instance client not connected. * @see #publish(String, MqttMessage, Object, IMqttActionListener) */ @Override public IMqttDeliveryToken publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException { return publish(topic, message, null, null); } /** * Publishes a message to a topic on the server. ** A convenience method, which will create a new {@link MqttMessage} object * with a byte array payload, the specified QoS and retained, then publish it. *
* * @param topic * to deliver the message to, for example "finance/stock/ibm". * @param payload * the byte array to use as the payload * @param qos * the Quality of Service to deliver the message at. Valid values * are 0, 1 or 2. * @param retained * whether or not this message should be retained by the server. * @param userContext * optional object used to pass context to the callback. Use null * if not required. * @param callback * optional listener that will be notified when message delivery * has completed to the requested quality of service * @return token used to track and wait for the publish to complete. The * token will be passed to any callback that has been set. * @throws MqttPersistenceException * when a problem occurs storing the message * @throws IllegalArgumentException * if value of QoS is not 0, 1 or 2. * @throws MqttException * for other errors encountered while publishing the message. * For instance client not connected. * @see #publish(String, MqttMessage, Object, IMqttActionListener) */ @Override public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained, Object userContext, IMqttActionListener callback) throws MqttException, MqttPersistenceException { MqttMessage message = new MqttMessage(payload); message.setQos(qos); message.setRetained(retained); MqttDeliveryTokenAndroid token = new MqttDeliveryTokenAndroid( this, userContext, callback, message); String activityToken = storeToken(token); IMqttDeliveryToken internalToken = mqttService.publish(clientHandle, topic, payload, qos, retained, null, activityToken); token.setDelegate(internalToken); return token; } /** * Publishes a message to a topic on the server. ** Once this method has returned cleanly, the message has been accepted for * publication by the client and will be delivered on a background thread. * In the event the connection fails or the client stops, Messages will be * delivered to the requested quality of service once the connection is * re-established to the server on condition that: *
** When building an application, the design of the topic tree should take * into account the following principles of topic name syntax and semantics: *
* ** The following principles apply to the construction and content of a topic * tree: *
* ** The method returns control before the publish completes. Completion can * be tracked by: *
** Provides an optimized way to subscribe to multiple topics compared to * subscribing to each one individually. *
* * @param topic * one or more topics to subscribe to, which can include * wildcards * @param qos * the maximum quality of service at which to subscribe. Messages * published at a lower quality of service will be received at * the published QoS. Messages published at a higher quality of * service will be received using the QoS specified on the * subscription. * @return token used to track and wait for the subscription to complete. The * token will be passed to callback methods if set. * @throws MqttSecurityException * for security related problems * @throws MqttException * for non security related problems * * @see #subscribe(String[], int[], Object, IMqttActionListener) */ @Override public IMqttToken subscribe(String[] topic, int[] qos) throws MqttException, MqttSecurityException { return subscribe(topic, qos, null, null); } /** * Subscribe to a topic, which may include wildcards. * * @param topic * the topic to subscribe to, which can include wildcards. * @param qos * the maximum quality of service at which to subscribe. Messages * published at a lower quality of service will be received at * the published QoS. Messages published at a higher quality of * service will be received using the QoS specified on the * subscription. * @param userContext * optional object used to pass context to the callback. Use null * if not required. * @param callback * optional listener that will be notified when subscribe has * completed * @return token used to track and wait for the subscribe to complete. The * token will be passed to callback methods if set. * @throws MqttException * if there was an error when registering the subscription. * * @see #subscribe(String[], int[], Object, IMqttActionListener) */ @Override public IMqttToken subscribe(String topic, int qos, Object userContext, IMqttActionListener callback) throws MqttException { IMqttToken token = new MqttTokenAndroid(this, userContext, callback, new String[]{topic}); String activityToken = storeToken(token); mqttService.subscribe(clientHandle, topic, qos, null, activityToken); return token; } /** * Subscribes to multiple topics, each topic may include wildcards. ** Provides an optimized way to subscribe to multiple topics compared to * subscribing to each one individually. *
** The {@link #setCallback(MqttCallback)} method should be called before * this method, otherwise any received messages will be discarded. *
** If (@link MqttConnectOptions#setCleanSession(boolean)} was set to true, * when connecting to the server, the subscription remains in place until * either: *
** If (@link MqttConnectOptions#setCleanSession(boolean)} was set to false, * when connecting to the server, the subscription remains in place * until either: *
*With cleanSession set to false the MQTT server will store messages * on behalf of the client when the client is not connected. The next time * the client connects with the same client ID the server will * deliver the stored messages to the client. *
* ** The "topic filter" string is used when subscription may contain special * characters, which allows you to subscribe to multiple topics at once. *
* The number sign (#) is a wildcard character that matches any number of * levels within a topic. For example, if you subscribe to finance/stock/ibm/#, you receive messages * on these topics: *
*finance/stock/ibm
finance/stock/ibm/closingprice
finance/stock/ibm/currentprice
* The multi-level wildcard can represent zero or more levels. Therefore, * finance/# can also match the singular finance, where * # represents zero levels. The topic level separator is * meaningless in this context, because there are no levels to separate. *
* ** The multi-level wildcard can be specified only on its own or * next to the topic level separator character. Therefore, # and * finance/# are both valid, but finance# is not valid. * The multi-level wildcard must be the last character used within the * topic tree. For example, finance/# is valid but * finance/#/closingprice is not valid. *
** The plus sign (+) is a wildcard character that matches only one topic * level. For example, finance/stock/+ matches * finance/stock/ibm and finance/stock/xyz, but not * finance/stock/ibm/closingprice. Also, because the single-level * wildcard matches only a single level, finance/+ does not match * finance. *
* ** Use the single-level wildcard at any level in the topic tree, and in * conjunction with the multilevel wildcard. Specify the single-level * wildcard next to the topic level separator, except when it is specified * on its own. Therefore, + and finance/+ are both valid, * but finance+ is not valid. The single-level wildcard can * be used at the end of the topic tree or within the topic tree. For * example, finance/+ and finance/+/ibm are both * valid. *
** The method returns control before the subscribe completes. Completion can * be tracked by: *
*Provides an optimized way to subscribe to multiple topics compared to * subscribing to each one individually.
* * @see #subscribe(String[], int[], Object, IMqttActionListener) * * @param topicFilters one or more topics to subscribe to, which can include wildcards * @param qos the maximum quality of service at which to subscribe. Messages * published at a lower quality of service will be received at the published * QoS. Messages published at a higher quality of service will be received using * the QoS specified on the subscribe. * @param messageListeners an array of callbacks to handle incoming messages * @return token used to track and wait for the subscribe to complete. The token * will be passed to callback methods if set. * @throws MqttException if there was an error registering the subscription. */ public IMqttToken subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) throws MqttException { return subscribe(topicFilters, qos, null, null, messageListeners); } /** * Subscribe to multiple topics, each of which may include wildcards. * *Provides an optimized way to subscribe to multiple topics compared to * subscribing to each one individually.
* * @see #subscribe(String[], int[], Object, IMqttActionListener) * * @param topicFilters one or more topics to subscribe to, which can include wildcards * @param qos the maximum quality of service at which to subscribe. Messages * published at a lower quality of service will be received at the published * QoS. Messages published at a higher quality of service will be received using * the QoS specified on the subscribe. * @param userContext optional object used to pass context to the callback. Use * null if not required. * @param callback optional listener that will be notified when subscribe * has completed * @param messageListeners an array of callbacks to handle incoming messages * @return token used to track and wait for the subscribe to complete. The token * will be passed to callback methods if set. * @throws MqttException if there was an error registering the subscription. */ public IMqttToken subscribe(String[] topicFilters, int[] qos, Object userContext, IMqttActionListener callback, IMqttMessageListener[] messageListeners) throws MqttException { IMqttToken token = new MqttTokenAndroid(this, userContext, callback, topicFilters); String activityToken = storeToken(token); mqttService.subscribe(clientHandle, topicFilters, qos, null, activityToken, messageListeners); return null; } /** * Requests the server unsubscribe the client from a topic. * * @param topic * the topic to unsubscribe from. It must match a topic specified * on an earlier subscribe. * @return token used to track and wait for the unsubscribe to complete. The * token will be passed to callback methods if set. * @throws MqttException * if there was an error unregistering the subscription. * * @see #unsubscribe(String[], Object, IMqttActionListener) */ @Override public IMqttToken unsubscribe(String topic) throws MqttException { return unsubscribe(topic, null, null); } /** * Requests the server to unsubscribe the client from one or more topics. * * @param topic * one or more topics to unsubscribe from. Each topic must match * one specified on an earlier subscription. * @return token used to track and wait for the unsubscribe to complete. The * token will be passed to callback methods if set. * @throws MqttException * if there was an error unregistering the subscription. * * @see #unsubscribe(String[], Object, IMqttActionListener) */ @Override public IMqttToken unsubscribe(String[] topic) throws MqttException { return unsubscribe(topic, null, null); } /** * Requests the server to unsubscribe the client from a topics. * * @param topic * the topic to unsubscribe from. It must match a topic specified * on an earlier subscribe. * @param userContext * optional object used to pass context to the callback. Use null * if not required. * @param callback * optional listener that will be notified when unsubscribe has * completed * @return token used to track and wait for the unsubscribe to complete. The * token will be passed to callback methods if set. * @throws MqttException * if there was an error unregistering the subscription. * * @see #unsubscribe(String[], Object, IMqttActionListener) */ @Override public IMqttToken unsubscribe(String topic, Object userContext, IMqttActionListener callback) throws MqttException { IMqttToken token = new MqttTokenAndroid(this, userContext, callback); String activityToken = storeToken(token); mqttService.unsubscribe(clientHandle, topic, null, activityToken); return token; } /** * Requests the server to unsubscribe the client from one or more topics. ** Unsubcribing is the opposite of subscribing. When the server receives the * unsubscribe request it looks to see if it can find a matching * subscription for the client and then removes it. After this point the * server will send no more messages to the client for this subscription. *
** The topic(s) specified on the unsubscribe must match the topic(s) * specified in the original subscribe request for the unsubscribe to * succeed *
** The method returns control before the unsubscribe completes. Completion * can be tracked by: *
** If a client has been restarted and there are messages that were in the * process of being delivered when the client stopped, this method returns a * token for each in-flight message to enable the delivery to be tracked. * Alternately the {@link MqttCallback#deliveryComplete(IMqttDeliveryToken)} * callback can be used to track the delivery of outstanding messages. *
** If a client connects with cleanSession true then there will be no * delivery tokens as the cleanSession option deletes all earlier state. For * state to be remembered the client must connect with cleanSession set to * false *
* * @return zero or more delivery tokens */ @Override public IMqttDeliveryToken[] getPendingDeliveryTokens() { return mqttService.getPendingDeliveryTokens(clientHandle); } /** * Sets a callback listener to use for events that happen asynchronously. ** There are a number of events that the listener will be notified about. * These include: *
** Other events that track the progress of an individual operation such as * connect and subscribe can be tracked using the {@link MqttToken} returned * from each non-blocking method or using setting a * {@link IMqttActionListener} on the non-blocking method. *
*
* @param callback
* which will be invoked for certain asynchronous events
*
* @see MqttCallback
*/
@Override
public void setCallback(MqttCallback callback) {
this.callback = callback;
}
/**
* identify the callback to be invoked when making tracing calls back into
* the Activity
*
* @param traceCallback handler
*/
public void setTraceCallback(MqttTraceHandler traceCallback) {
this.traceCallback = traceCallback;
// mqttService.setTraceCallbackId(traceCallbackId);
}
/**
* turn tracing on and off
*
* @param traceEnabled
* set true
to enable trace, otherwise, set
* false
to disable trace
*
*/
public void setTraceEnabled(boolean traceEnabled) {
this.traceEnabled = traceEnabled;
if (mqttService !=null)
mqttService.setTraceEnabled(traceEnabled);
}
/**
*
* Process incoming Intent objects representing the results of operations * and asynchronous activities such as message received *
*
* Note: This is only a public method because the Android
* APIs require such.
* This method should not be explicitly invoked.
*
* A convenience method, which will help user to create a SSLSocketFactory * object *
* * @param keyStore * the SSL key store which is generated by some SSL key tool, * such as keytool in Java JDK * @param password * the password of the key store which is set when the key store * is generated * @return SSLSocketFactory used to connect to the server with SSL * authentication * @throws MqttSecurityException * if there was any error when getting the SSLSocketFactory */ public SSLSocketFactory getSSLSocketFactory (InputStream keyStore, String password) throws MqttSecurityException { try{ SSLContext ctx = null; SSLSocketFactory sslSockFactory=null; KeyStore ts; ts = KeyStore.getInstance("BKS"); ts.load(keyStore, password.toCharArray()); TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509"); tmf.init(ts); TrustManager[] tm = tmf.getTrustManagers(); ctx = SSLContext.getInstance("TLSv1"); ctx.init(null, tm, null); sslSockFactory=ctx.getSocketFactory(); return sslSockFactory; } catch (KeyStoreException | CertificateException | IOException | NoSuchAlgorithmException | KeyManagementException e) { throw new MqttSecurityException(e); } } @Override public void disconnectForcibly() throws MqttException { throw new UnsupportedOperationException(); } @Override public void disconnectForcibly(long disconnectTimeout) throws MqttException { throw new UnsupportedOperationException(); } @Override public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException { throw new UnsupportedOperationException(); } /** * Unregister receiver which receives intent from MqttService avoids * IntentReceiver leaks. */ public void unregisterResources(){ if(myContext != null && receiverRegistered){ synchronized (MqttAndroidClient.this) { LocalBroadcastManager.getInstance(myContext).unregisterReceiver(this); receiverRegistered = false; } if(bindedService){ try{ myContext.unbindService(serviceConnection); bindedService = false; }catch(IllegalArgumentException e){ //Ignore unbind issue. } } } } /** * Register receiver to receiver intent from MqttService. Call this method * when activity is hidden and become to show again. * * @param context * - Current activity context. */ public void registerResources(Context context){ if(context != null){ this.myContext = context; if(!receiverRegistered){ registerReceiver(this); } } } }