- Newest
- Most votes
- Most comments
Greengrass does not implement request/response in our IPC, but this doesn't mean that you don't have any solutions. You're able to run any communication protocol you like, such as HTTP or gRPC by deploying a server as one of your components and then connecting to that server from other components.
You can use future to make your request appear as synchronous. When you publish the request you return a future to the caller that gets completed when the response is received or after a timeout.
The following gives a blueprint for such implementation (to be completed with proper error handling and shutdown behavior for the server).
Let's say you have a client and a server. The client sends a request on topic /work/req
and the server responds on a topic /work/resp
.
The following code allows the client to call send_request_sync()
to have a synchronous process of the request.
On the "client" side:
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( PublishMessage, JsonMessage, ReceiveMode, SubscriptionResponseMessage) import typing from concurrent.futures import Future import asyncio from uuid import uuid4 client = GreengrassCoreIPCClientV2() def send_request_sync(val: int): f = Future() sub = None token = str(uuid4()) print(token) def handle_response(msg: SubscriptionResponseMessage): print(msg.json_message.context.topic) print(msg.json_message.message) if (msg.json_message.message['req_token'] == token): print("Closing subcription") sub[1].close() print("Setting future") f.set_result(msg.json_message.message) sub = client.subscribe_to_topic(topic='/work/resp', receive_mode=ReceiveMode.RECEIVE_ALL_MESSAGES, on_stream_event=handle_response) client.publish_to_topic(topic='/work/req', publish_message=PublishMessage(json_message=JsonMessage(message={"m":val, "req_token": token}))) return f future_1 = send_request_sync(1) result = future_1.result(timeout=10)
On the "server" side:
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( PublishMessage, JsonMessage, ReceiveMode, SubscriptionResponseMessage) import typing import time import asyncio client = GreengrassCoreIPCClientV2() def handle_request(msg: SubscriptionResponseMessage): print(msg.json_message.context.topic) print(msg.json_message.message) time.sleep(5) client.publish_to_topic(topic='/'.join(msg.json_message.context.topic.split('/')[:-1] + ['resp']), publish_message=PublishMessage(json_message=JsonMessage(message=msg.json_message.message))) sub = client.subscribe_to_topic(topic='/work/req', receive_mode=ReceiveMode.RECEIVE_ALL_MESSAGES, on_stream_event=handle_request) async def main(): await asyncio.Event().wait() asyncio.run(main())
Relevant content
- asked 2 years ago
- asked 3 years ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 9 months ago
- AWS OFFICIALUpdated 23 days ago
- AWS OFFICIALUpdated a year ago