C++ SDK : Publish and Subscribe using same GreengrassCoreIpcClient

0

Hello,
This is in reference with [1]

I am trying to run the mixed C and C++ code but currently I am facing an issue with the below application. I don't have any experience with C++.

The application receives message on one topic abc/world and from the callback it calls another method to publish sample message on another topic xyz/world .

#include <iostream>

#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>

using namespace std;
using namespace Aws::Crt;
using namespace Aws::Greengrass;

GreengrassCoreIpcClient *ipcClient;

void publishMessage();

class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
    void OnStreamEvent(SubscriptionResponseMessage *response) override {
        auto jsonMessage = response->GetJsonMessage();
        if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
            auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
            // Handle JSON message.
            printf("%s\n",messageString.c_str());
            publishMessage();
        } else {
            auto binaryMessage = response->GetBinaryMessage();
            if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
                auto messageBytes = binaryMessage.value().GetMessage().value();
                std::string messageString(messageBytes.begin(), messageBytes.end());
                // Handle binary message.
            }
        } 
    }

    bool OnStreamError(OperationError *error) override {
        // Handle error.
        return false; // Return true to close stream, false to keep stream open.
    }
    
    void OnStreamClosed() override {
        // Handle close.
    }
};

class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
    void OnConnectCallback() override {
        // Handle connection to IPC service.
    }

    void OnDisconnectCallback(RpcError error) override {
        // Handle disconnection from IPC service.
    }

    bool OnErrorCallback(RpcError error) override {
        // Handle IPC service connection error.
        return true;
    }
};

int main() {
    ApiHandle apiHandle(g_allocator);
    Io::EventLoopGroup eventLoopGroup(1);
    Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
    Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
    IpcClientLifecycleHandler ipcLifecycleHandler;
    ipcClient = new GreengrassCoreIpcClient(bootstrap);
    auto connectionStatus = ipcClient->Connect(ipcLifecycleHandler).get();
    if (!connectionStatus) {
        std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
        exit(-1);
    }
    
    String topic("abc/world");
    int timeout = 10;

    SubscribeToTopicRequest request;
    request.SetTopic(topic);
    
    SubscribeResponseHandler streamHandler;
    SubscribeToTopicOperation operation = ipcClient->NewSubscribeToTopic(streamHandler);
    auto activate = operation.Activate(request, nullptr);
    activate.wait();

    auto responseFuture = operation.GetResult();
    if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
        std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
        exit(-1);
    }
    
/*    if (!response) {
        // Handle error.
        auto errorType = response.GetResultType();
        if (errorType == OPERATION_ERROR) {
            auto *error = response.GetOperationError();
            // Handle operation error.
        } else {
            // Handle RPC error.
        }
        exit(-1);
    }
*/
    // Keep the main thread alive, or the process will exit.
    while (true) {
        cout << "Hello, World!" << endl;
        std::this_thread::sleep_for(std::chrono::seconds(5));
    }
    
    operation.Close();
    return 0;
}


void publishMessage()
{

    printf("\nWelcom to publish message\n");

    String topic("xyz/world");
    int timeout = 10;
    PublishToTopicRequest request;
    String message("MKLOLEWRWTRSGH V DSZF QWRO<D!");
    Vector<uint8_t> messageData({message.begin(), message.end()});
    BinaryMessage binaryMessage;
    binaryMessage.SetMessage(messageData);
    PublishMessage publishMessage;
    publishMessage.SetBinaryMessage(binaryMessage);
    request.SetTopic(topic);
    request.SetPublishMessage(publishMessage);

    PublishToTopicOperation operation = ipcClient->NewPublishToTopic();
    printf("\nBEEEEEEETSSTAT00000\n");
    auto activate = operation.Activate(request).get();
//    activate.wait();
    printf("\nBEEEEEEETSSTATT11111\n");

    if (!activate)
    {
        printf("Failed to publish to %s topic with error %s\n", topic.c_str(), activate.StatusToString().c_str());
        //exit(-1);
    }

    auto responseFuture = operation.GetResult();
    auto publishResult = responseFuture.get();
    if (publishResult)
    {
        fprintf(stdout, "Successfully published to %s topic\n", topic.c_str());
        auto *response = publishResult.GetOperationResponse();
        (void)response;
    }
    else
    {
        auto errorType = publishResult.GetResultType();
        if (errorType == OPERATION_ERROR)
        {
            OperationError *error = publishResult.GetOperationError();
            /*
             * This pointer can be casted to any error type like so:
             * if(error->GetModelName() == UnauthorizedError::MODEL_NAME)
             *    UnauthorizedError *unauthorizedError = static_cast<UnauthorizedError*>(error);
             */
            if (error->GetMessage().has_value())
                fprintf(stderr, "Greengrass Core responded with an error: %s\n", error->GetMessage().value().c_str());
        }
        else
        {
            fprintf(
                stderr,
                "Attempting to receive the response from the server failed with error code %s\n",
                publishResult.GetRpcError().StatusToString().c_str());
        }
    }

    printf("\nBEEEEEEETSSTATT222222\n");
}

The above application stop receiving messages after receiving first message and it did not give any error and even did not publish to topic xyz/world.
But this application works well and able to receive messages if I comment the publishMessage() method from the callback.

Please find the logs below.

2021-11-23T07:15:23.767Z [INFO] (pool-2-thread-6) com.example.PubSubSubscriberCpp: shell-runner-start. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=STARTING, command=["/greengrass/v2/packages/artifacts/com.example.PubSubSubscr..."]}
2021-11-23T07:15:23.791Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Hello, World!. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:15:28.791Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. {. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:15:28.792Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. "hello":	{. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:15:28.800Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. "timeOut":	12. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:15:28.801Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. }. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:15:28.807Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. }. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:15:28.808Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:15:28.809Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Welcom to publish message. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:15:28.810Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:15:28.812Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. BEEEEEEETSSTAT00000. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:15:28.813Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Hello, World!. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}








2021-11-23T07:15:33.791Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Hello, World!. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}



2021-11-23T07:15:38.791Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Hello, World!. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:15:43.791Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Hello, World!. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:15:48.792Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Hello, World!. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:15:53.792Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Hello, World!. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:15:58.793Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Hello, World!. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:16:03.795Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Hello, World!. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:16:08.796Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Hello, World!. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:16:13.796Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Hello, World!. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:16:18.796Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Hello, World!. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:16:23.796Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Hello, World!. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}
2021-11-23T07:16:28.796Z [INFO] (Copier) com.example.PubSubSubscriberCpp: stdout. Hello, World!. {scriptName=services.com.example.PubSubSubscriberCpp.lifecycle.Run.Script, serviceName=com.example.PubSubSubscriberCpp, currentState=RUNNING}

What could be the issue here?
References -
[1] - https://forums.aws.amazon.com/message.jspa?messageID=1000581#1000581

asked 3 years ago473 views
2 Answers
0
Accepted Answer

Hi edgegoldberg,

The explanation and fix you've found for this problem is appropriate. As the documentation says https://docs.aws.amazon.com/greengrass/v2/developerguide/interprocess-communication.html#ipc-subscribe-operations, you cannot block on a IPC operation's response(in your case, the publish operation) from the subscription handler. You should make the new request and wait on its result in a separate thread like you were already able to achieve.

Thanks,
Shagupta

AWS
answered 3 years ago
0

Hi,

I was going through the documents to confirm if I missed anything and I found the below statements here [1]

Send new IPC requests asynchronously

The IPC client can't send a new request from within subscription handler functions, because the request blocks the handler function if you wait for a response. You can send IPC requests in a separate thread that you run from the handler function.

I updated the application as below -

#include <pthread.h>
#include <iostream>

#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>

using namespace std;
using namespace Aws::Crt;
using namespace Aws::Greengrass;

GreengrassCoreIpcClient *ipcClient;

void publishMessage();

class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
    void OnStreamEvent(SubscriptionResponseMessage *response) override {
        auto jsonMessage = response->GetJsonMessage();
        if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
            auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
            // Handle JSON message.
            printf("%s\n",messageString.c_str());
            publishMessage();
        } else {
            auto binaryMessage = response->GetBinaryMessage();
            if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
                auto messageBytes = binaryMessage.value().GetMessage().value();
                std::string messageString(messageBytes.begin(), messageBytes.end());
                // Handle binary message.
            }
        } 
    }

    bool OnStreamError(OperationError *error) override {
        // Handle error.
        return false; // Return true to close stream, false to keep stream open.
    }
    
    void OnStreamClosed() override {
        // Handle close.
    }
};

class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
    void OnConnectCallback() override {
        // Handle connection to IPC service.
    }

    void OnDisconnectCallback(RpcError error) override {
        // Handle disconnection from IPC service.
    }

    bool OnErrorCallback(RpcError error) override {
        // Handle IPC service connection error.
        return true;
    }
};

int main() {
    ApiHandle apiHandle(g_allocator);
    Io::EventLoopGroup eventLoopGroup(1);
    Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
    Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
    IpcClientLifecycleHandler ipcLifecycleHandler;
    ipcClient = new GreengrassCoreIpcClient(bootstrap);
    auto connectionStatus = ipcClient->Connect(ipcLifecycleHandler).get();
    if (!connectionStatus) {
        std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
        exit(-1);
    }
    
    String topic("abc/world");
    int timeout = 10;

    SubscribeToTopicRequest request;
    request.SetTopic(topic);
    
    SubscribeResponseHandler streamHandler;
    SubscribeToTopicOperation operation = ipcClient->NewSubscribeToTopic(streamHandler);
    auto activate = operation.Activate(request, nullptr);
    activate.wait();

    auto responseFuture = operation.GetResult();
    if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
        std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
        exit(-1);
    }
    
/*    if (!response) {
        // Handle error.
        auto errorType = response.GetResultType();
        if (errorType == OPERATION_ERROR) {
            auto *error = response.GetOperationError();
            // Handle operation error.
        } else {
            // Handle RPC error.
        }
        exit(-1);
    }
*/
    // Keep the main thread alive, or the process will exit.
    while (true) {
        cout << "Hello, World!" << endl;
        std::this_thread::sleep_for(std::chrono::seconds(5));
    }
    
    operation.Close();
    return 0;
}

void *sampleTask(void *argument)
{
    printf("\nstart of sample task\n");	
    String topic("xyz/world");
    int timeout = 10;
    PublishToTopicRequest request;
    String message("MKLOLEWRWTRSGH V DSZF QWRO<D!");
    Vector<uint8_t> messageData({message.begin(), message.end()});
    BinaryMessage binaryMessage;
    binaryMessage.SetMessage(messageData);
    PublishMessage publishMessage;
    publishMessage.SetBinaryMessage(binaryMessage);
    request.SetTopic(topic);
    request.SetPublishMessage(publishMessage);

    PublishToTopicOperation operation = ipcClient->NewPublishToTopic();
    printf("\nbefore activate\n");
    auto activate = operation.Activate(request).get();
//  activate.wait();
    printf("\nafter activate\n");

    if (!activate)
    {
        printf("Failed to publish to %s topic with error %s\n", topic.c_str(), activate.StatusToString().c_str());
        //exit(-1);
    }

    auto responseFuture = operation.GetResult();
    auto publishResult = responseFuture.get();
    if (publishResult)
    {
        fprintf(stdout, "Successfully published to %s topic\n", topic.c_str());
        auto *response = publishResult.GetOperationResponse();
        (void)response;
    }
    else
    {
        auto errorType = publishResult.GetResultType();
        if (errorType == OPERATION_ERROR)
        {
            OperationError *error = publishResult.GetOperationError();
            /*
             * This pointer can be casted to any error type like so:
             * if(error->GetModelName() == UnauthorizedError::MODEL_NAME)
             *    UnauthorizedError *unauthorizedError = static_cast<UnauthorizedError*>(error);
             */
            if (error->GetMessage().has_value())
                fprintf(stderr, "Greengrass Core responded with an error: %s\n", error->GetMessage().value().c_str());
        }
        else
        {
            fprintf(
                stderr,
                "Attempting to receive the response from the server failed with error code %s\n",
                publishResult.GetRpcError().StatusToString().c_str());
        }
    }
    return 0;
}

void publishMessage()
{

    printf("\nWelcom to publish message\n");
    pthread_t sampleThread;
    pthread_create(&sampleThread, NULL, sampleTask, NULL);
    printf("\nend of publish message\n");
}

The above updates fixed the problem.

Is this correct?

References -
[1] - https://docs.aws.amazon.com/greengrass/v2/developerguide/interprocess-communication.html#ipc-subscribe-operations

answered 3 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions