Greengrass v2 how to subscribe MQTT message from IoT Core

0

Hi,

Publishing works. IoT Core receives MQTT message from this program. But I can't get to receive MQTT message from IoT Core.
I have read below document and forum. The exact sample program in the document. It finishes the program right away. And doesn't wait to receive message from IoT core since they are not synchronized. I saw that I have to use queue. But I can't get to work. Could you help me?

https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-iot-core-mqtt.html
https://forums.aws.amazon.com/thread.jspa?threadID=334561&tstart=0

I also tried that I made two ipc_client for publish and ipc_client2 to subscribe just in case.

------------------------------Publish works, Subscribe doesn't----------------------------
import queue
import os
import json
import datetime
import time
import random
import awsiot.greengrasscoreipc.client as client
from awsiot.eventstreamrpc import Connection, LifecycleHandler, MessageAmendment
from awscrt.io import (
ClientBootstrap,
DefaultHostResolver,
EventLoopGroup,
SocketDomain,
SocketOptions,
)

from awsiot.greengrasscoreipc.model import (
IoTCoreMessage,
QOS,
SubscribeToIoTCoreRequest,
PublishToIoTCoreRequest
)

class IPCUtils:
def connect(self):
elg = EventLoopGroup()
resolver = DefaultHostResolver(elg)
bootstrap = ClientBootstrap(elg, resolver)
socket_options = SocketOptions()
socket_options.domain = SocketDomain.Local
amender = MessageAmendment.create_static_authtoken_amender(os.getenv("SVCUID"))
hostname = os.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT")
print(hostname)
connection = Connection(
host_name=hostname,
port=8033,
bootstrap=bootstrap,
socket_options=socket_options,
connect_message_amender=amender,
)
self.lifecycle_handler = LifecycleHandler()
connect_future = connection.connect(self.lifecycle_handler)
TIMEOUT = 10
connect_future.result(TIMEOUT)
return connection

ipc_utils = IPCUtils()
connection = ipc_utils.connect()
ipc_client = client.GreengrassCoreIPCClient(connection)

class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
def init(self):
super().init()

def on_stream_event(self, event: IoTCoreMessage) -> None:  
    message = str(event.message.payload, "utf-8")  
    print("MESSAGE RECEIVED: ")  
    print(message)  
    queue.put(message)  

def on_stream_error(self, error: Exception) -> bool:  
    # Handle error.  
    print('Error ---!')  
    return True  

def on_stream_closed(self) -> None:  
    pass  

dt=datetime.datetime.now().isoformat(timespec='seconds')
message = {"deviceid":"ggc", "timestamp": dt , "Temperature":random.randint(20,37)}
request = PublishToIoTCoreRequest()
request.topic_name = "ggc/topic"
request.payload = json.dumps(message).encode('utf-8')
request.qos = QOS.AT_LEAST_ONCE
operation = ipc_client.new_publish_to_iot_core()
operation.activate(request)
future = operation.get_response()
TIMEOUT=10
future.result(TIMEOUT)

queue = queue.Queue()
request = SubscribeToIoTCoreRequest()
request.topic_name = "ggc/#"
request.qos = QOS.AT_LEAST_ONCE
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_iot_core(handler)
future = operation.activate(request)
future.result(TIMEOUT)

i=0
while queue.empty():
i=i+1
print(i)
time.sleep(1)

print(queue.get())

queue.join()

jx2900
已提问 4 年前2037 查看次数
18 回答
0

Hi,
Can you please provide the logs from both your component and Greengrass? You may also want to enable debug logging in the Nucleus by updating its logging config level through a deployment (look at the logging section of https://docs.aws.amazon.com/greengrass/v2/developerguide/greengrass-nucleus-component.html#greengrass-nucleus-component-configuration). With debug logs, you will see if we are performing the subscribe request.

Also verify your IoT policy to make sure that it does allow you to subscribe to the topic which you're attempting to subscribe to.

Cheers,
Michael Dombrowski

AWS
专家
已回答 4 年前
0

Hi,

Are you trying to subscribe to the message that you publish here on the "ggc/topic" topic? If so, try swapping the order of the publish and subscribe requests, so that the program subscribes before it publishes.

Best regards,
Finn Thompson

AWS
已回答 4 年前
0

No, I want to exchange date between greengrass and IoT core/aws cloud using same ggc/topic.
So that when I use "test" on IoT core, I can see what I am receiving from GGC and when I publish from IoT core, GGC should subscribe message from IoT core.

GGC-->Publish_to_IoT core (ggc/topic)---->IoT copre (first 10-15 seconds)
GGC<--Subscribe_to_IoT core (ggc/topic)<---IoT core (After I made sure that IoT core test received MQTT message from GGC, then I manually publish MQTT message from IoT core test page.)

So the order is okay with the way I wrote. Publish first. Then subscribe.
Eventually I would like to publish and subscribe messages async repeatedly.

  1. Publish---every 1sec, GGC upload IoT data to IoT core-->Lambda-->DynamoDB
  2. Subscribe --- When needed, GGC receives some data or command from Lambda or program in EC2.

My recipe is in case there might be some mistakes.
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.mqttproxy:
HelloWorld:pubsub:1:
policyDescription: "Allows access to publish to ggc/topic."
operations:
- "*"
resources:
- "ggc/topic"

jx2900
已回答 4 年前
0

Can you please enable debug logging and provide the Greengrass logs?

Thank you,
Michael Dombrowski

AWS
专家
已回答 4 年前
0

Thank you, Michael,
I put "*" in recipe like below and I am being able to publish message. But I don't see this accessControl in effectiveConfig.yaml. I don't know why. Maybe I am deploying it locally.
My greengrass.log updated over last night and it doesn't have the log when I ran the program yesterday.
I will try to enable debug logging in the Nucleus on next Thusday and I will then post all proper logs.
I just wanted to reply since I have something else to take care of next few days.


ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.mqttproxy:
HelloWorld:pubsub:1:
policyDescription: "Allows access to publish to ggc/topic."
operations:
- "*"
resources:
- "ggc/topic"

jx2900
已回答 4 年前
0

The effective config file is only written when Greengrass starts and stops, so if you haven't restarted Greengrass then it would not necessarily reflect the current state of the configuration. You can use the CLI or the local debug console to view the current configuration of components.

Cheers,
Michael Dombrowski

AWS
专家
已回答 4 年前
0

Hi Michael,
Thank you for the information. I attached greengrass.log and HelloWorld.log.
<1> I added aws.greengrass.Nucleus component with aws.greengrasl.Cli in a deployment to Core Device GreengrassCore2-177b2fdecec from aws cloud console. I hope that this is one that you mentioned to enable Nucleus.

<2> I stopped and restarted greengrass core using systemctl. This added HelloWorld configuration into effectiveConfig.yaml.

But it seems like that HelloWorld doesn't subscribe but HelloWorld can publishes with the same topic "gcc/topic".

  1. How do I retrieve the message? Is it in def on_stream_event like below?
    class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
    :
    def on_stream_event(self, event: IoTCoreMessage) -> None:
    message = str(event.message.payload, "utf-8")

  2. How can I retrieve message using queue from a class StreamHandler? I tried below and didn't work.
    class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
    def init(self):
    super().init()
    self.q = queue.Queue() # <------

     def on_stream_event(self, event: IoTCoreMessage) -> None:  
          message = str(event.message.payload, "utf-8")  
          self.queue.put(message)   # <------  
    

    ipc_client2 = client.GreengrassCoreIPCClient(connection)
    request2 = SubscribeToIoTCoreRequest()
    request2.topic_name = "ggc/topic"
    request2.qos = QOS.AT_LEAST_ONCE
    handler = StreamHandler()
    operation2 = ipc_client2.new_subscribe_to_iot_core(handler)
    future2 = operation2.activate(request2)
    future2.result(TIMEOUT)
    i=0
    while handler.q.empty(): # <------
    i=i+1
    print(i)
    time.sleep(1)

    print(handler.q.get()) # <------

jx2900
已回答 4 年前
0

Hi,
Your reply does not contain the logs, only the effectiveConfig file.

  1. Yes when a message is published to the topic which you have successfully subscribe to the code which you put in the on_stream_event handler will be called.
  2. Please review the example code provided in https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-iot-core-mqtt.html#ipc-operation-subscribetoiotcore-examples and use the StreamHandler from this code example exactly as it is shown.

What is ipc_client2 in your code? You should not open multiple connections to Greengrass. Use a single connection for all operations.

Edited by: MichaelDombrowski-AWS on Feb 25, 2021 10:07 PM

AWS
专家
已回答 4 年前
0

I added logs this time. But when I attached effectiveConfig.txt, I saw that the icon of log file changed from image of clip to image of file. This might have caused the problem. Anyway, here are the logs.

Okay, I will delete on ipc_client2 and have just one.

It looks like I subscribed once.
HelloWorld.log Line
Line 4: This is message the I published from HelloWorld.py.

But I got stream_error at line 5.

And I am publishing different messages multiple times from IoT Core with the same topic name "ggc/topic". This is what I want to subscribe at ggc from IoT core. But I am not able to subscribe message from IoT core.

Edited by: jx2900 on Feb 25, 2021 10:48 PM
I tried the sample code. But it ends too quickly before subscribe. Someone asked the same question and you mentioned about using queue and while. So I am trying to use the queue and while.

jx2900
已回答 4 年前
0

Everything seems fine. You should print what the actual error is, not just "Error!". The function is provided with the error as an input.

You should also change how you are reading from the queue. Do not check if the queue is empty or not. Simply call queue.get(). That call will block until there is something to be read out of the queue.

Please also upgrade your Python dependency on awsiotsdk to version 1.5.7.

Your code also mixes "q" and "queue" please correct these as parts of the code are referencing different variables which may or may not exist.

Edited by: MichaelDombrowski-AWS on Feb 25, 2021 10:54 PM

AWS
专家
已回答 4 年前
0

Thank you, Michael,
Greengrass core is dead again. So I will have to re-install it to get the result.

  1. To upgrade awsiotsdk to 1.5.7, is the below okay in recipe?
    "Manifests": [
    {
    "Lifecycle": {
    "install": {
    "Timeout": 1000,
    "Script": "pip3 install awsiotsdk==1.5.7"

I corrected q and queue. Sorry, I mixed them up while I was trying different things with a lot of unresolved questions.
I will reinstall ggc and get the actual error and report back to you.

Edited by: jx2900 on Feb 26, 2021 6:52 AM
I was able to re-install ggc and up and running.

But I don't get any message any more. No error either. Stream_handler doesn't get error nor message anymore. IoT core gets message from HelloWorld.py. What am I doing wrong?
awsiotsdk is now 1.5.7.

jx2900
已回答 4 年前
0

Since I do not see any logs from your component, please use this guide to disable Python's output buffering: https://docs.aws.amazon.com/greengrass/v2/developerguide/troubleshooting.html#python-component-no-log-output

AWS
专家
已回答 4 年前
0

Hi,
Please find the attached files of sample recipe and artifact that achieves what you're trying to do. Note that the code provided is only a sample and needs refactoring.

This code publishes message to the cloud only once but logs the subscribed messages infinitely from the queue since I tried to keep it similar to your earlier code.

To test this, go to AWS IoT Console Test->MQTT Test client -> Publish to a topic -> topic = ggc/topic and message-> "Any message" . You should find the message sent in PubSub.log

Sample output in the /greengrass/v2/logs/PubSub.log

2021-02-26T18:08:48.794Z [INFO] (Copier) Subscribe: stdout. ['{\n  "message": "Hello from AWS IoT console"\n}']. {scriptName=services.PubSub.lifecycle.Run.script, serviceName=PubSub, currentState=RUNNING}
2021-02-26T18:09:04.863Z [INFO] (Copier) Subscribe: stdout. ['{\n  "message": "Hello from Greengrass"\n}']. {scriptName=services.PubSub.lifecycle.Run.script, serviceName=PubSub, currentState=RUNNING}
2021-02-26T18:09:20.344Z [INFO] (Copier) Subscribe: stdout. ['{\n  "message": "Hello again!"\n}']. {scriptName=services.PubSub.lifecycle.Run.script, serviceName=PubSub, currentState=RUNNING}

Thanks,
Saranya

Edited by: saranya-aws on Feb 26, 2021 10:30 AM

Edited by: saranya-aws on Feb 26, 2021 10:41 AM

AWS
Saranya
已回答 4 年前
0

Thank you very much for the sample program, Saranya,
It finally worked! I even did copy and pasted to HelloWord component which I had already deployed locally with its configuration. And this worked. So the recipe was okay.
I guess that there was only problem in my program which was using print(queue.get()) ---- Using while and print, HelloWorld.log didn't log stdout from print. When I replaced your logger.info(queue.get()) to print(queue.get()) like below. Then it didn't print the subscribed message to the HelloWorld.log.
#logger.info(queue.get())
print(queue.get()) <-------------------this

Hope these are my last questions.

  1. Why doesn't print work? You are using stdout for logger and print is stdout. So they should both work.

  2. I want to publish data every 1sec. And at the same time, I want to subscribe message without missing to subscribe. How can I do that? Should I make two operations under same ipc_utils and run them with two different threads?

Note: I am using Raspy Linux. So I had to delete two lines from the recipe or it failed to deploy. Otherwise, it is beautiful to see that ggc is subscribing.
architecture: amd64
Name: 64-bit x86_64 - Linux

jx2900
已回答 4 年前
0
  1. Please read https://docs.aws.amazon.com/greengrass/v2/developerguide/troubleshooting.html#python-component-no-log-output

  2. You do not need to use multiple threads, however you can choose to use multiple threads. If you just rewrite the while loop you can have it publish something every second in there. Or you can go ahead and create a separate thread to handle the publishing.

AWS
专家
已回答 4 年前
0

Thank you, Michael,

  1. I've read all troubleshooting sections but couldn't find this page in Japanese. In fact, when first I clicked the URL, it jumped to a Japanese page of troubleshooting. The page didn't even have a word "python". I refreshed the cache but the result was same in Japanese. But when I changed the language to English, the proper page appeared.

I used the below and it worked!
print(handler.q.get(),file=sys.stdout, flush=True)

  1. Thank you for your explanation. Now I think that I understand how the class object works for publish and subscribe.

Thank you very much, Michael and Saranya!

jx2900
已回答 4 年前
0

Hi. Thank you for posting this example. This is something I want to do as well. The part I'm interested in is using the cert that is already installed on the gg core device to authenticate to the IoT Core and listen to MQTT messages. It appears this is accomplished by means of a "MessageAmmendment" (right ?). Is this something that is possible using the java API ? I couldn't find any documentation about a class with that name.

已回答 3 年前
0

Hi,
For Java you should really use the Greengrass IPC SDK, see https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-iot-core-mqtt.html#ipc-operation-publishtoiotcore for information about publishing and subscribing with MQTT through Greengrass.

Cheers,
Michael

AWS
专家
已回答 3 年前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则