How to implement IPC Request/response instead of pub/sub mechanism in Greengrass V2 ?

0

Hey Team,

We want to implement request/response over IPC, the way normal REST APIs are working. Is there any provision in Greengrass v2 Sdks ? Since we are currently using pub/sub which is async. Instead of that we want sync implementation.

Could you please help with some reference for the same.

Regards, Nalay Patel

gefragt vor einem Jahr313 Aufrufe
2 Antworten
1
Akzeptierte Antwort

Greengrass does not implement request/response in our IPC, but this doesn't mean that you don't have any solutions. You're able to run any communication protocol you like, such as HTTP or gRPC by deploying a server as one of your components and then connecting to that server from other components.

AWS
EXPERTE
beantwortet vor einem Jahr
1

You can use future to make your request appear as synchronous. When you publish the request you return a future to the caller that gets completed when the response is received or after a timeout.

The following gives a blueprint for such implementation (to be completed with proper error handling and shutdown behavior for the server).

Let's say you have a client and a server. The client sends a request on topic /work/req and the server responds on a topic /work/resp. The following code allows the client to call send_request_sync() to have a synchronous process of the request.

On the "client" side:

from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import ( 
    PublishMessage, JsonMessage, ReceiveMode, SubscriptionResponseMessage)
import typing
from concurrent.futures import Future
import asyncio
from uuid import uuid4

client = GreengrassCoreIPCClientV2()

def send_request_sync(val: int):
    f = Future()
    sub = None
    token = str(uuid4())
    print(token)
    def handle_response(msg: SubscriptionResponseMessage):
        print(msg.json_message.context.topic)
        print(msg.json_message.message)
        if (msg.json_message.message['req_token'] == token):
            print("Closing subcription")
            sub[1].close()
            print("Setting future")
            f.set_result(msg.json_message.message)
    
    sub = client.subscribe_to_topic(topic='/work/resp', 
                                receive_mode=ReceiveMode.RECEIVE_ALL_MESSAGES, 
                                on_stream_event=handle_response)
    
    client.publish_to_topic(topic='/work/req', 
                        publish_message=PublishMessage(json_message=JsonMessage(message={"m":val, "req_token": token})))
    
    return f

future_1 = send_request_sync(1)

result = future_1.result(timeout=10)

On the "server" side:

from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import ( 
    PublishMessage, JsonMessage, ReceiveMode, SubscriptionResponseMessage)
import typing
import time
import asyncio

client = GreengrassCoreIPCClientV2()

def handle_request(msg: SubscriptionResponseMessage):
    print(msg.json_message.context.topic)
    print(msg.json_message.message)
    time.sleep(5)
    client.publish_to_topic(topic='/'.join(msg.json_message.context.topic.split('/')[:-1] + ['resp']), 
        publish_message=PublishMessage(json_message=JsonMessage(message=msg.json_message.message)))
    
sub = client.subscribe_to_topic(topic='/work/req', 
                                receive_mode=ReceiveMode.RECEIVE_ALL_MESSAGES, 
                                on_stream_event=handle_request)

async def main():
    await asyncio.Event().wait()

asyncio.run(main())
AWS
EXPERTE
beantwortet vor einem Jahr

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen