How to implement IPC Request/response instead of pub/sub mechanism in Greengrass V2 ?

0

Hey Team,

We want to implement request/response over IPC, the way normal REST APIs are working. Is there any provision in Greengrass v2 Sdks ? Since we are currently using pub/sub which is async. Instead of that we want sync implementation.

Could you please help with some reference for the same.

Regards, Nalay Patel

asked a year ago300 views
2 Answers
1
Accepted Answer

Greengrass does not implement request/response in our IPC, but this doesn't mean that you don't have any solutions. You're able to run any communication protocol you like, such as HTTP or gRPC by deploying a server as one of your components and then connecting to that server from other components.

AWS
EXPERT
answered a year ago
1

You can use future to make your request appear as synchronous. When you publish the request you return a future to the caller that gets completed when the response is received or after a timeout.

The following gives a blueprint for such implementation (to be completed with proper error handling and shutdown behavior for the server).

Let's say you have a client and a server. The client sends a request on topic /work/req and the server responds on a topic /work/resp. The following code allows the client to call send_request_sync() to have a synchronous process of the request.

On the "client" side:

from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import ( 
    PublishMessage, JsonMessage, ReceiveMode, SubscriptionResponseMessage)
import typing
from concurrent.futures import Future
import asyncio
from uuid import uuid4

client = GreengrassCoreIPCClientV2()

def send_request_sync(val: int):
    f = Future()
    sub = None
    token = str(uuid4())
    print(token)
    def handle_response(msg: SubscriptionResponseMessage):
        print(msg.json_message.context.topic)
        print(msg.json_message.message)
        if (msg.json_message.message['req_token'] == token):
            print("Closing subcription")
            sub[1].close()
            print("Setting future")
            f.set_result(msg.json_message.message)
    
    sub = client.subscribe_to_topic(topic='/work/resp', 
                                receive_mode=ReceiveMode.RECEIVE_ALL_MESSAGES, 
                                on_stream_event=handle_response)
    
    client.publish_to_topic(topic='/work/req', 
                        publish_message=PublishMessage(json_message=JsonMessage(message={"m":val, "req_token": token})))
    
    return f

future_1 = send_request_sync(1)

result = future_1.result(timeout=10)

On the "server" side:

from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import ( 
    PublishMessage, JsonMessage, ReceiveMode, SubscriptionResponseMessage)
import typing
import time
import asyncio

client = GreengrassCoreIPCClientV2()

def handle_request(msg: SubscriptionResponseMessage):
    print(msg.json_message.context.topic)
    print(msg.json_message.message)
    time.sleep(5)
    client.publish_to_topic(topic='/'.join(msg.json_message.context.topic.split('/')[:-1] + ['resp']), 
        publish_message=PublishMessage(json_message=JsonMessage(message=msg.json_message.message)))
    
sub = client.subscribe_to_topic(topic='/work/req', 
                                receive_mode=ReceiveMode.RECEIVE_ALL_MESSAGES, 
                                on_stream_event=handle_request)

async def main():
    await asyncio.Event().wait()

asyncio.run(main())
AWS
EXPERT
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions