Skip to content

How to handle onConnectionInterrupted callback from AWS MQTT client

0

I am looking for any help suggesting how to handle a callback called from the AWS IoT SDK V2 for C++ MQTT client. The design of the communication component in the system I am working on provides a simple wrapper around the intrinsic AWS MqttClient object. The code borrows heavily from the sample code in the AWS SDK.

The code provides callbacks for several of the acknowledgements, including the OnConnectionCompleted, OnDisconnect, OnConnectionInterrupted, OnConnectionResumed, and the onMessageHandler… These callbacks occur on the AWS thread, and the example code shows how to use std::promise’s to synchronize between the threads. So, in the example snippet below we see the setting of the callbacks using the std::bind to allow calling into the wrapper classes methods (for example mqttwrapper::OnConnectionCompleted())

    connection = client.NewConnection(endpoint.c_str(), port, socketOptions, tlsContext);

    if (!connection) {
        cout << "MQTT Connection Creation failed with error " << ErrorDebugString(client.LastError());
        return false;
    }
    // connect up the callback functions... Use std::bind to call member
    // functions.
    connection->OnConnectionCompleted = std::bind(&mqttwrapper::onConnectionCompleted, this, _1, _2, _3, _4);
    connection->OnDisconnect = std::bind(&mqttwrapper::onDisconnect, this, _1);
    connection->OnConnectionInterrupted = std::bind(&mqttwrapper::onConnectionInterrupted, this, _1, _2);
    connection->OnConnectionResumed = std::bind(&mqttwrapper::onConnectionResumed, this, _1, _2, _3);

    connection->SetOnMessageHandler(std::bind(&mqttwrapper::onMessageHandler, this, _1, _2, _3, _4, _5, _6));

    connectionCompletedPromise = std::promise<bool>();

    connection->SetReconnectTimeout(10, 20);

    cout << "Connecting...";
    if (!connection->Connect(clientId.c_str(), false, 1000)) {
        cout << "MQTT Connection failed with error " << ErrorDebugString(connection->LastError());
        return false;
    } 
    // Wait for the connection completed promise to be fulfilled…
    connected = connectionCompletedPromise.get_future().get();

In this code the connection is created, configured and connected to… And the code in the connection complete callback looks like this:

void mqttwrapper::onConnectionCompleted([[maybe_unused]] Mqtt::MqttConnection &con, int errorCode, Mqtt::ReturnCode returnCode, [[maybe_unused]] bool sp) 
{
    cout << __PRETTY_FUNCTION__;

    if (errorCode) {
        cout << "Connection failed with error " << ErrorDebugString(errorCode);
        connectionCompletedPromise.set_value(false);
    } else {
        if (returnCode != AWS_MQTT_CONNECT_ACCEPTED) {
            cout << "Connection failed with mqtt return code " << (int)returnCode;
            connectionCompletedPromise.set_value(false);
        } else {
            cout << "Connection completed successfully.";
            connectionCompletedPromise.set_value(true);
        }
    }
}

So that when this callback is invoked it sets the value of the future, which the main thread is waiting for. It returns true or false depending on the values passed to it via the AWS connect logic.

This pattern, creating a promise, calling an AWS function, and waiting for the promise value to be set is used in several places in this system, as the code is based on the basic_pub_sub example. Here is the link to the code (link points to section where the connection completed promise is created): Connection Example in basic_pub_sub

Explanation of Problem

Now the problem that I am seeking to solve is that occasionally (the frequency and cause of the issue is unknown) our onConnectionInterrupted callback is called on the AWS thread. This usually happens when we are waiting for a publish complete callback to be called (after we called mqttClient Publish, and before we get any value from the publishCompletePromise).

The result of this failure is that the main thread is blocked waiting for the publish complete promise (which will never come after we have gotten the connection interrupted callback). And there are no examples of what to do when the code calls this method… Here is the method from the example code: onConnectionInterrupted handler in basic_pub_sub example

   auto onInterrupted = [&](Mqtt::MqttConnection &, int error) {
        fprintf(stdout, "Connection interrupted with error %s\n", ErrorDebugString(error));
    };

   …

   connection->OnConnectionInterrupted = std::move(onInterrupted);

Without having to shut-down everything and reopen everything, is there a way to gracefully respond to this condition? Please point me to a good example or explanation of how to handle this.

asked 3 years ago929 views
1 Answer
0

You can handle the onConnectionInterrupted callback by implementing a mechanism to wait for the connection to be resumed before attempting to publish again. You can use a std::mutex and std::condition_variable to achieve this.

Here's a high-level approach:

Create a std::mutex and a std::condition_variable in your mqttwrapper class. Modify the onConnectionInterrupted callback to lock the mutex and set a connectionInterrupted flag. Modify the onConnectionResumed callback to unset the connectionInterrupted flag, unlock the mutex, and notify the condition variable. Before attempting to publish a message, check the connectionInterrupted flag. If it's set, wait for the condition variable. Here's an example implementation:

#include <condition_variable>
#include <mutex>

class mqttwrapper {
    // ...
    std::mutex connectionMutex;
    std::condition_variable connectionCV;
    bool connectionInterrupted = false;

    // ...
};

void mqttwrapper::onConnectionInterrupted([[maybe_unused]] Mqtt::MqttConnection &con, int errorCode) {
    std::unique_lock<std::mutex> lock(connectionMutex);
    connectionInterrupted = true;
    cout << "Connection interrupted with error " << ErrorDebugString(errorCode);
}

void mqttwrapper::onConnectionResumed([[maybe_unused]] Mqtt::MqttConnection &con, [[maybe_unused]] Mqtt::ConnectReturnCode ret, bool sessionPresent) {
    std::unique_lock<std::mutex> lock(connectionMutex);
    connectionInterrupted = false;
    cout << "Connection resumed. Session present: " << sessionPresent;
    connectionCV.notify_all();
}

bool mqttwrapper::publish(/*...*/) {
    // Before attempting to publish, wait for the connection to be resumed if it's interrupted.
    std::unique_lock<std::mutex> lock(connectionMutex);
    if (connectionInterrupted) {
        connectionCV.wait(lock, [this]() { return !connectionInterrupted; });
    }
    lock.unlock();

    // Proceed with publishing the message
    // ...
}

This implementation will make the publish function wait for the connection to be resumed before attempting to publish again. This should help you gracefully handle the connection interruption scenario without shutting down and reopening everything.

EXPERT
answered 3 years ago
  • Hello,

    Thank you for the prompt reply. It would be acceptable except for the fact that we never seem to have the onConnectionResumed() handler called. We were expecting, as you posted above, that the connection would be re-established and the onConnectionResumed() callback called, but currently we have not seem this sequence of callbacks.

    See in the sample code above we registered a callback to be called on connection resumed.

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.