accessing iot core topic from within docker c++ program

0

I have a c++ program that runs within a docker container. docker container receipe:

{
	"RecipeFormatVersion": "2020-01-25",
	"ComponentName": "au.com.mycompany.smartdvr.docker.streamcontroller",
	"ComponentVersion": "1.3.22",
	"ComponentType": "aws.greengrass.generic",
	"ComponentDescription": "A component that runs the smart dvr Docker container from a private Amazon ECR image. Video/Audio saving and streaming to Kinesis added logging to file, file culling,ridgun optimisation",
	"ComponentPublisher": "MYCompany",
	"ComponentConfiguration": {
		"DefaultConfiguration": {
			"accessControl": {
				"aws.greengrass.ipc.mqttproxy": {
					"au.com.mycompany.smartdvr.docker.streamcontroller:mqttproxy:1": {
						"policyDescription": "Allows access to publish/subscribe to all topics.",
						"operations": [
							"aws.greengrass#PublishToIoTCore",
							"aws.greengrass#SubscribeToIoTCore"
						],
						"resources": [
							"*"
						]
					}
				}
			}
		}
	},
	"ComponentDependencies": {
		"aws.greengrass.DockerApplicationManager": {
			"VersionRequirement": ">=2.0.0 <2.1.0",
			"DependencyType": "HARD"
		},
		"aws.greengrass.TokenExchangeService": {
			"VersionRequirement": ">=2.0.0 <2.1.0",
			"DependencyType": "HARD"
		},
		"au.com.mycompany.smartdvr.docker.gstd": {
			"VersionRequirement": ">=0.0.1 <5.0.0",
			"DependencyType": "HARD"
		}
	},
	"Manifests": [
		{
			"Platform": {
				"os": "linux"
			},
			"Lifecycle": {
				"Run": "echo '==========>>>>>>>'; $(docker kill streamcontroller || true) ; $(docker rm streamcontroller || true); docker run --name=streamcontroller --cap-add=SYS_PTRACE --runtime=nvidia -e DISPLAY=$DISPLAY --privileged --volume /tmp/.X11-unix:/tmp/.X11-unix --net=host -e NVIDIA_VISIBLE_DEVICES=all -v $HOME/.Xauthority:/root/.Xauthority -v /run/udev/control:/run/udev/control -v /greengrass/v2:/greengrass/v2 -v $AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT:$AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT  -e SVCUID -e AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT=$AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT -v /dev:/dev -v /sys/firmware/devicetree/base/serial-number:/sys/firmware/devicetree/base/serial-number -v /data:/data -e NVIDIA_DRIVER_CAPABILITIES=compute,utility,graphics   xxxxxxxx.dkr.ecr.ap-southeast-2.amazonaws.com/smartdvr:latest2 runstreamcontroller.sh"
			},
			"Artifacts": [
				{
					"Uri": "docker:xxxxxxx.dkr.ecr.ap-southeast-2.amazonaws.com/smartdvr:latest2",
					"Unarchive": "NONE",
					"Permission": {
						"Read": "OWNER",
						"Execute": "NONE"
					}
				}
			]
		}
	],
	"Lifecycle": {}
}

inside the c++ program i subscribe to an iot core topic like:

void ExternalBroker::subscribe_to_video_request_topic()
{

    string topic("mtd/");
    topic += LocalConfig::get_machine_id();
    topic += "/v1/mdvr/video/request";

    g_logging->info("External Broker subscribing to topic " + topic);

    QOS qos = QOS_AT_MOST_ONCE;
    int timeout = 10;

    g_logging->info("External Broker  : --00" );

    SubscribeToIoTCoreRequest request;
    request.SetTopicName(topic.c_str());
    request.SetQos(qos);
    g_logging->info("External Broker  : --01" );
    ExternalBrokerVideoRequestTopicHandler streamHandler;
    SubscribeToIoTCoreOperation operation = ipcClient.NewSubscribeToIoTCore(streamHandler);
    g_logging->info("External Broker  : --02" );
    auto activate = operation.Activate(request, nullptr);
    g_logging->info("External Broker  : --03" );
    activate.wait();

    g_logging->info("External Broker  : --11" );
    auto responseFuture = operation.GetResult();
    g_logging->info("External Broker  : --22" );
    if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) 
    {
        std::cerr << "External Broker Operation timed out while waiting for response from Greengrass Core." << std::endl;
        return;
    }
    g_logging->info("External Broker  : auto response = responseFuture.get()" );
    auto response = responseFuture.get();
    g_logging->info("External Broker  : respnse" );
    if (!response) {
        // Handle error.
        g_logging->error("External Broker  : ERROR" );
        auto errorType = response.GetResultType();
        if (errorType == OPERATION_ERROR) {
            auto *error = response.GetOperationError();
            // Handle operation error.
            g_logging->error("External Broker  : operation ERROR" );
            
        } else {
            // Handle RPC error.
            g_logging->error("External Broker  : RPC ERROR" );
        }
    }
    else
    {
        g_logging->info("External Broker  : got response: " );
    }
}

but it never gets past the

activate.wait();

and when i publish something on that topic using the consoles test page ( https://ap-southeast-2.console.aws.amazon.com/iot/home?region=ap-southeast-2#/test ) I never see the callback to the

class ExternalBrokerVideoRequestTopicHandler : public SubscribeToIoTCoreStreamHandler
void ExternalBrokerVideoRequestTopicHandler::OnStreamEvent(IoTCoreMessage *response) 
{
    g_logging->info("External Broker  : aa" );
    auto message = response->GetMessage();
    if (message.has_value() && message.value().GetPayload().has_value()) 
    {
        g_logging->error("External Broker  : bb" );
        auto messageBytes = message.value().GetPayload().value();
        std::string messageString(messageBytes.begin(), messageBytes.end());
        std::string topicName = message.value().GetTopicName().value().c_str();
        // Handle message.
        g_logging->info("External Broker received iot message on " + topicName + " messgae: " +  messageString);

        // try to load the json
        json payload_json = nlohmann::json::parse(messageString);
        if( payload_json )
        {
            g_logging->info("External Broker  : cc" );
            // forward request to queue and save queue
        }
        else
        {
            g_logging->error("External Broker can't load " +  messageString);
        }
    }

log output:

[2021-12-13 05:29:59.956] [streamcontroller] [info] External Broker trying to connect to Greengrass APC
[2021-12-13 05:29:59.964] [streamcontroller] [info] External Broker conneted
[2021-12-13 05:29:59.964] [streamcontroller] [info] External Broker set_connected :
[2021-12-13 05:29:59.964] [streamcontroller] [error] External Broker  : 22
[2021-12-13 05:29:59.964] [streamcontroller] [error] External Broker  : 33
[2021-12-13 05:29:59.964] [streamcontroller] [info] External Broker subscribing to topic mtd/smartdvr-1423019132001/v1/mdvr/video/request
[2021-12-13 05:29:59.964] [streamcontroller] [info] External Broker  : --00
[2021-12-13 05:29:59.965] [streamcontroller] [info] External Broker  : --01
[2021-12-13 05:29:59.965] [streamcontroller] [info] External Broker  : --02
[2021-12-13 05:29:59.965] [streamcontroller] [info] External Broker  : --03
[2021-12-13 05:30:09.907] [streamcontroller] [info] ExternalBroker timer tick..
[2021-12-13 05:30:19.947] [streamcontroller] [info] ExternalBroker timer tick..
[2021-12-13 05:30:29.952] [streamcontroller] [info] ExternalBroker timer tick..
[2021-12-13 05:30:40.003] [streamcontroller] [info] ExternalBroker timer tick..

i can't see any errors in greengrass.log what am i missing here ?

clogwog
asked 2 years ago289 views
4 Answers
0
Accepted Answer

Your hunch is correct; when the operation goes out of scope the subscription is closed.

What you're trying to do is very reasonable (and embarrassingly, very basic, expected functionality), but the current API doesn't allow it since the operation types aren't movable.

I initially looked at making the operations movable but ultimately decided not to due to the complexity of the web of objects that make up an operation. Instead, we have tentatively decided on a second set of factory functions for operations that return pointers, giving you complete control over operation lifetime. This avoids messing with the delicate relationships between the various eventstream ipc objects.

You can find a preview version of this functionality here:: https://github.com/aws/aws-iot-device-sdk-cpp-v2/tree/NewIpcApiExample

I'll be doing some testing on it next and we welcome any feedback or questions (ideally on the github repository/PR) if you'd like to try it out.

AWS
answered 2 years ago
  • Thank you ! I look forward to it being merged. Kind regards, tom

0

Hello,

I would suggest that you read https://docs.aws.amazon.com/greengrass/v2/developerguide/run-docker-container.html#docker-container-ipc if you haven't already.

There is specific setup required for a docker container to be able to use Greengrass IPC as it requires access to a specific Unix domain socket and some environment variables.

Greengrass.log file shows all IPC connections, so I'd recommend viewing the log and verifying that you see that your component is able to connect over IPC. Once the connection has been made, then the behavior is the same inside and outside of the container.

Cheers, Michael

AWS
EXPERT
answered 2 years ago
  • Hello Michael,

    thank you for that. I've gone through that setup ( see the -v $AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT:$AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT -e SVCUID -e AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT=$AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT part of my docker startup and the accessControl in the receipe.)

    I've been able to send a message to AWS IoT using a python script in the same Docker image using the same sockets and access control, but it looks like i'm doing something wrong for subscribing to an aws IoT Core topic.

    I'll have another good look at greengrass.log for the socket connections. thank you !

  • Yes, please do check on the logs. Make sure that you are mounting the directory which contains Greengrass as that directory also contains the IPC socket.

    If you can run without Docker, I'd also suggest doing that to remove Docker as a source of issues, that way we can narrow it down to whatever the true problem is.

  • If i Follow https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-iot-core-mqtt.html#ipc-operation-subscribetoiotcore and run it as a standalone program inside the docker then it works.

    at the end of the example shown above it does a while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); }

    however in a real program this is not an option as i need to do other stuff as well. what is the suggested way to keep a subscription alive ?

    i tried connecting and subscribing and waiting in a thread, but that crashes my program.

    My guess is that it stops subscribing as soon as SubscribeToIoTCoreOperation operation goes out of scope. is this a good guess ?

    how would i get around this ?

  • If it is going out of scope and causing a problem, then you need to create it on the heap rather than the stack and free the memory at an appropriate time.

    I would also suggest implementing onStreamError, such as

    bool OnStreamError(OperationError *operationError) override {
            std::cout << "Error received from SubscriptionHandler: " << operationError->GetMessage().value() << std::endl;
            return false;
        }
    

    This will then print if there are any problems on the stream.

  • How would i go about changing :

    SubscribeToIoTCoreOperation operation = ipcClient.NewSubscribeToIoTCore(streamHandler);
    

    to be created in the heap ? It is created using the NewSubscribeToIoTCore() factory function and returns an object and SubscribeToIoTCoreOperation doesn't seem to have a default constructor that I can use to add to my class and wait for it to be created ?

0

getting a bit closer now. Finding that right after the subscription goes all the way to the server (found it in CloudWatch in the AWSIotLogsV2 group) the streamHandler ( of type class ExternalBrokerVideoRequestTopicHandler : public SubscribeToIoTCoreStreamHandler ) that is used in

        SubscribeToIoTCoreOperation operation = ipcClient.NewSubscribeToIoTCore(streamHandler);

receives a OnStreamClosed callback right away. is that because in my subscribe method operation goes out of scope ? If this is the case, how do I keep the subscription going ? I can't seem to create an empty

SubscribeToIoTCoreOperation operation 

in my class and set it during the subscription method. The example at https://docs.aws.amazon.com/greengrass/v2/developerguide/interprocess-communication.html#ipc-subscription-handler-best-practices is a simple main() program where operation is created like:

        SubscribeToIoTCoreOperation operation = ipcClient.NewSubscribeToIoTCore(streamHandler);

but in a real program I would like to start a subscription and then continue with other stuff that my program needs to do.

Is my hunch that the subscription stops because the operation goes out of scope correct ? And if so, how can i keep the operation around ?

my latest subscription method:

void ExternalBroker::subscribe_to_video_request_topic()
{
    try
    {
        string topic("mtd/");
        topic += LocalConfig::get_machine_id();
        topic += "/v1/mdvr/video/request";

        int timeout = 10;
        subscription_request.SetTopicName(topic.c_str());
        subscription_request.SetQos(QOS_AT_LEAST_ONCE);
        SubscribeToIoTCoreOperation operation = ipcClient.NewSubscribeToIoTCore(streamHandler);

        auto requestStatus = operation.Activate(subscription_request).get();
        if (!requestStatus)
        {
            g_logging->error("Failed to send subscription request to " + topic);
            return;
        }
        auto subscribeResultFuture = operation.GetResult();
        auto subscribeResult = subscribeResultFuture.get();
        if (subscribeResult)
        {
            g_logging->info("External Broker  : Successfully subscribed to topic: " + topic);
        }
        else
        {
            auto errorType = subscribeResult.GetResultType();
            if (errorType == OPERATION_ERROR)
            {
                OperationError *error = subscribeResult.GetOperationError();
                if (error->GetMessage().has_value())
                {
                    string anerr("Greengrass Core responded with an error: ");
                    g_logging->error( anerr + error->GetMessage().value().c_str());
                }
            }
            else
            {
                string err("Attempting to receive the response from the server failed with error code: ");
                g_logging->error( err + subscribeResult.GetRpcError().StatusToString().c_str());
            }
        }
    }
    catch (std::exception &e)
    {
        string test("External Broker  : Exception subscribing: ");
        g_logging->error(test + e.what() );
    }
}
clogwog
answered 2 years ago
0

Hi Michael, I don't think it is a problem with the access to the socket as a python program in the same docker can publish to a iot core topic. Also I just recently seen that the subscription makes it all the way to IoT Core.

Unfortunately I need to run this in the Docker as it uses the IPC to talk to IoT Core.

clogwog
answered 2 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions