Introduction
I'm running greengrass with aws.greengrass.Nucleus=2.9.3
and ran into a problem with IoT Core topic subscription.
My code is pasted below and here's what the code is doing in English:
- publishe to the reserved topic
$aws/things/{thing_name}/jobs/get
.
- subscribe to both
$aws/things/{thing_name}/jobs/get/accepted
and $aws/things/{thing_name}/jobs/get/rejected
The goal is to get the response from any of the two subscriptions.
try:
# publish to jobs/get
topic = f"$aws/things/{self.device_name}/jobs/get"
self.logger.debug(f"pub to {topic}")
self.ipc_client_v2.publish_to_iot_core(topic_name=topic, qos=QOS.AT_LEAST_ONCE, payload="")
# subscribe to jobs/get/accepted
topic = f"$aws/things/{self.device_name}/jobs/get/accepted"
self.logger.info(f"sub to {topic}")
_, subscribe_accepted_op = self.ipc_client_v2.subscribe_to_iot_core(topic_name=topic, qos=QOS.AT_LEAST_ONCE, on_stream_event=self.on_iot_core_event, on_stream_error=self.on_iot_core_error, on_stream_closed=self.on_iot_core_closed)
# subscribe to jobs/get/rejected
topic = f"$aws/things/{self.device_name}/jobs/get/rejected"
self.logger.info(f"sub to {topic}")
_, subscribe_rejected_op = self.ipc_client_v2.subscribe_to_iot_core(topic_name=topic, qos=QOS.AT_LEAST_ONCE, on_stream_event=self.on_iot_core_event, on_stream_error=self.on_iot_core_error, on_stream_closed=self.on_iot_core_closed)
while True:
# loop forever
time.sleep(1)
except Exception:
self.logger.exception("Exception while running")
exit_code = 1
Here are the event handlers
def on_iot_core_event(self, event: IoTCoreMessage) -> None:
try:
# use monotonic timestamp for same reason as in check_timeout
self.logger.info("got response!")
except:
self.logger.exception("Error handling get response")
def on_iot_core_error(self, error: Exception) -> bool:
# Handle error.
self.logger.error(f"IoT Core stream error = {error}")
return False # Return True to close stream, False to keep stream open.
def on_iot_core_closed(self) -> None:
self.logger.info("IoT Core stream closed")
# Handle close.
pass
Problem
When I deploy this component using the CLI, I see the following in the log:
2023-02-10 15:07:28,801 - sc - DEBUG - pub to $aws/things/**ubuntu/jobs/get
2023-02-10 15:07:28,801 - sc - INFO - sub to $aws/things/**-ubuntu/jobs/get/accepted
2023-02-10 15:07:29,177 - sc - ERROR - IoT Core stream error = ('Unexpected response stream event type: aws.greengrass#SubscribeToIoTCoreResponse, expected: aws.greengrass#IoTCoreMessage', b'{}')
2023-02-10 15:07:29,177 - sc - INFO - IoT Core stream closed
2023-02-10 15:07:29,177 - sc - ERROR - Exception while running
Traceback (most recent call last):
File "/home/me/greengrass/v2/packages/artifacts/error/1.0.0/main.py", line 61, in main
_, subscribe_accepted_op = self.ipc_client_v2.subscribe_to_iot_core(topic_name=topic, qos=QOS.AT_LEAST_ONCE,on_stream_event=self.on_iot_core_event,
File "/usr/local/lib/python3.8/dist-packages/awsiot/greengrasscoreipc/clientv2.py", line 893, in subscribe_to_iot_core
return fut.result(), op
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/usr/local/lib/python3.8/dist-packages/awsiot/eventstreamrpc.py", line 711, in _on_continuation_message
self._handle_data(model_name, payload)
File "/usr/local/lib/python3.8/dist-packages/awsiot/eventstreamrpc.py", line 739, in _handle_data
raise UnmappedDataError(msg, payload)
awsiot.eventstreamrpc.UnmappedDataError: ('Unexpected response type: aws.greengrass#IoTCoreMessage, expected: aws.greengrass#SubscribeToIoTCoreResponse', b'{"message":{"topicName":"$aws/things/**-ubuntu/jobs/get/accepted","payload":"eyJ0aW1lc3RhbX*****"}}')
Analysis & Question
The log is suggesting that
my event handler got the aws.greengrass#SubscribeToIoTCoreResponse
message while it's expecting aws.greengrass#IoTCoreMessage
and my subscribe operation got the aws.greengrass#IoTCoreMessage
while it's expecting aws.greengrass#SubscribeToIoTCoreResponse
If I change the order of subscription and publish (subscribe first, then publish) then the code works as expected. But this feels like a bug to me that subscription fails when a message is being published to the topic I'm trying to subscribe to. Is this a bug?