- Newest
- Most votes
- Most comments
Hi edgegoldberg,
The explanation and fix you've found for this problem is appropriate. As the documentation says https://docs.aws.amazon.com/greengrass/v2/developerguide/interprocess-communication.html#ipc-subscribe-operations, you cannot block on a IPC operation's response(in your case, the publish operation) from the subscription handler. You should make the new request and wait on its result in a separate thread like you were already able to achieve.
Thanks,
Shagupta
Hi,
I was going through the documents to confirm if I missed anything and I found the below statements here [1]
Send new IPC requests asynchronously
The IPC client can't send a new request from within subscription handler functions, because the request blocks the handler function if you wait for a response. You can send IPC requests in a separate thread that you run from the handler function.
I updated the application as below -
#include <pthread.h>
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace std;
using namespace Aws::Crt;
using namespace Aws::Greengrass;
GreengrassCoreIpcClient *ipcClient;
void publishMessage();
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
// Handle JSON message.
printf("%s\n",messageString.c_str());
publishMessage();
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
// Handle binary message.
}
}
}
bool OnStreamError(OperationError *error) override {
// Handle error.
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
// Handle close.
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
ipcClient = new GreengrassCoreIpcClient(bootstrap);
auto connectionStatus = ipcClient->Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("abc/world");
int timeout = 10;
SubscribeToTopicRequest request;
request.SetTopic(topic);
SubscribeResponseHandler streamHandler;
SubscribeToTopicOperation operation = ipcClient->NewSubscribeToTopic(streamHandler);
auto activate = operation.Activate(request, nullptr);
activate.wait();
auto responseFuture = operation.GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
/* if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
// Handle operation error.
} else {
// Handle RPC error.
}
exit(-1);
}
*/
// Keep the main thread alive, or the process will exit.
while (true) {
cout << "Hello, World!" << endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
}
operation.Close();
return 0;
}
void *sampleTask(void *argument)
{
printf("\nstart of sample task\n");
String topic("xyz/world");
int timeout = 10;
PublishToTopicRequest request;
String message("MKLOLEWRWTRSGH V DSZF QWRO<D!");
Vector<uint8_t> messageData({message.begin(), message.end()});
BinaryMessage binaryMessage;
binaryMessage.SetMessage(messageData);
PublishMessage publishMessage;
publishMessage.SetBinaryMessage(binaryMessage);
request.SetTopic(topic);
request.SetPublishMessage(publishMessage);
PublishToTopicOperation operation = ipcClient->NewPublishToTopic();
printf("\nbefore activate\n");
auto activate = operation.Activate(request).get();
// activate.wait();
printf("\nafter activate\n");
if (!activate)
{
printf("Failed to publish to %s topic with error %s\n", topic.c_str(), activate.StatusToString().c_str());
//exit(-1);
}
auto responseFuture = operation.GetResult();
auto publishResult = responseFuture.get();
if (publishResult)
{
fprintf(stdout, "Successfully published to %s topic\n", topic.c_str());
auto *response = publishResult.GetOperationResponse();
(void)response;
}
else
{
auto errorType = publishResult.GetResultType();
if (errorType == OPERATION_ERROR)
{
OperationError *error = publishResult.GetOperationError();
/*
* This pointer can be casted to any error type like so:
* if(error->GetModelName() == UnauthorizedError::MODEL_NAME)
* UnauthorizedError *unauthorizedError = static_cast<UnauthorizedError*>(error);
*/
if (error->GetMessage().has_value())
fprintf(stderr, "Greengrass Core responded with an error: %s\n", error->GetMessage().value().c_str());
}
else
{
fprintf(
stderr,
"Attempting to receive the response from the server failed with error code %s\n",
publishResult.GetRpcError().StatusToString().c_str());
}
}
return 0;
}
void publishMessage()
{
printf("\nWelcom to publish message\n");
pthread_t sampleThread;
pthread_create(&sampleThread, NULL, sampleTask, NULL);
printf("\nend of publish message\n");
}
The above updates fixed the problem.
Is this correct?
References -
[1] - https://docs.aws.amazon.com/greengrass/v2/developerguide/interprocess-communication.html#ipc-subscribe-operations
Relevant content
- asked 2 years ago
- asked 4 years ago
- AWS OFFICIALUpdated 4 years ago
- AWS OFFICIALUpdated 3 years ago