Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting multiple mqtt callbacks #391

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand Down Expand Up @@ -97,8 +98,8 @@ public class MqttAndroidClient extends BroadcastReceiver implements IMqttAsyncCl
private MqttClientPersistence persistence = null;
private MqttConnectOptions connectOptions;
private IMqttToken connectToken;
// The MqttCallback provided by the application
private MqttCallback callback;
// The MqttCallback list provided by the application
private ArrayList<MqttCallback> callbacksList = new ArrayList<>();
private MqttTraceHandler traceCallback;
private boolean traceEnabled = false;
private volatile boolean receiverRegistered = false;
Expand Down Expand Up @@ -1074,8 +1075,34 @@ public IMqttDeliveryToken[] getPendingDeliveryTokens() {
*/
@Override
public void setCallback(MqttCallback callback) {
this.callback = callback;
if (callbacksList == null) callbacksList = new ArrayList<>();
callbacksList.add(callback);
}

/**
* Adds a callback listener to use for events that happen asynchronously.
* <p>
* There are a number of events that the listener will be notified about.
* These include:
* </p>
* <ul>
* <li>A new message has arrived and is ready to be processed</li>
* <li>The connection to the server has been lost</li>
* <li>Delivery of a message to the server has completed</li>
* </ul>
* <p>
* Other events that track the progress of an individual operation such as
* connect and subscribe can be tracked using the {@link MqttToken} returned
* from each non-blocking method or using setting a
* {@link IMqttActionListener} on the non-blocking method.
* <p>
*
* @param callback which will be invoked for certain asynchronous events
* @see MqttCallback
*/
public void addCallback(MqttCallback callback) {
if (callbacksList == null) callbacksList = new ArrayList<>();
callbacksList.add(callback);
}

/**
Expand Down Expand Up @@ -1200,8 +1227,10 @@ private void disconnected(Bundle data) {
if (token != null) {
((MqttTokenAndroid) token).notifyComplete();
}
if (callback != null) {
callback.connectionLost(null);
if (callbacksList != null) {
for (MqttCallback callback : callbacksList) {
callback.connectionLost(null);
}
}
}

Expand All @@ -1211,21 +1240,27 @@ private void disconnected(Bundle data) {
* @param data
*/
private void connectionLostAction(Bundle data) {
if (callback != null) {
Exception reason = (Exception) data.getSerializable(MqttServiceConstants.CALLBACK_EXCEPTION);
callback.connectionLost(reason);
Exception reason = (Exception) data.getSerializable(MqttServiceConstants.CALLBACK_EXCEPTION);

if (callbacksList != null) {
for (MqttCallback callback : callbacksList) {
callback.connectionLost(reason);
}
}
}

private void connectExtendedAction(Bundle data) {
// This is called differently from a normal connect
boolean reconnect = data.getBoolean(MqttServiceConstants.CALLBACK_RECONNECT, false);
String serverURI = data.getString(MqttServiceConstants.CALLBACK_SERVER_URI);

if (callback instanceof MqttCallbackExtended) {
boolean reconnect = data.getBoolean(MqttServiceConstants.CALLBACK_RECONNECT, false);
String serverURI = data.getString(MqttServiceConstants.CALLBACK_SERVER_URI);
((MqttCallbackExtended) callback).connectComplete(reconnect, serverURI);
if (callbacksList != null) {
for (MqttCallback callback : callbacksList) {
if (callback instanceof MqttCallbackExtended) {
((MqttCallbackExtended) callback).connectComplete(reconnect, serverURI);
}
}
}

}

/**
Expand Down Expand Up @@ -1286,11 +1321,13 @@ private void unSubscribeAction(Bundle data) {
*/
private void messageDeliveredAction(Bundle data) {
IMqttToken token = removeMqttToken(data);
Status status = (Status) data.getSerializable(MqttServiceConstants.CALLBACK_STATUS);
if (token != null) {
if (callback != null) {
Status status = (Status) data.getSerializable(MqttServiceConstants.CALLBACK_STATUS);
if (callbacksList != null) {
if (status == Status.OK && token instanceof IMqttDeliveryToken) {
callback.deliveryComplete((IMqttDeliveryToken) token);
for (MqttCallback callback : callbacksList) {
callback.deliveryComplete((IMqttDeliveryToken) token);
}
}
}
}
Expand All @@ -1302,21 +1339,24 @@ private void messageDeliveredAction(Bundle data) {
* @param data
*/
private void messageArrivedAction(Bundle data) {
if (callback != null) {
String messageId = data.getString(MqttServiceConstants.CALLBACK_MESSAGE_ID);
String destinationName = data.getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME);
String messageId = data.getString(MqttServiceConstants.CALLBACK_MESSAGE_ID);
String destinationName = data.getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME);

ParcelableMqttMessage message = data.getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL);
ParcelableMqttMessage message = data.getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL);

if (callbacksList != null) {
try {
if (messageAck == Ack.AUTO_ACK) {
callback.messageArrived(destinationName, message);
for (MqttCallback callback : callbacksList) {
callback.messageArrived(destinationName, message);
}
mqttService.acknowledgeMessageArrival(clientHandle, messageId);
} else {
message.messageId = messageId;
callback.messageArrived(destinationName, message);
for (MqttCallback callback : callbacksList) {
callback.messageArrived(destinationName, message);
}
}

// let the service discard the saved message details
} catch (Exception e) {
mqttService.traceError(MqttService.TAG, "messageArrivedAction failed: " + e);
}
Expand Down