Subscribing to when a shadow changes and retrieving that change

0

I am having issues using this mostly default code to get a change in shadow. The code is very slightly adapted from AWS tutorial code. I just removed the arg section and changes it to hard coded variables. I also added a print statement in the callback.

import sys
import time
import traceback

from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
    SubscriptionResponseMessage,
    UnauthorizedError
)


def main():

    try:
        message = 'script initialized'
        topic = 'dsvvefvfq'
        ipc_client = GreengrassCoreIPCClientV2()
        # Subscription operations return a tuple with the response and the operation.
        _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event,
                                                     on_stream_error=on_stream_error, on_stream_closed=on_stream_closed)
        print('Successfully subscribed to topic: ' + topic)

        # Keep the main thread alive, or the process will exit.
        try:
            while True:
                time.sleep(10)
        except InterruptedError:
            print('Subscribe interrupted.')

        # To stop subscribing, close the stream.
        operation.close()
    except UnauthorizedError:
        print('Unauthorized error while subscribing to topic')
        traceback.print_exc()
        exit(1)
    except Exception:
        print('Exception occurred', file=sys.stderr)
        traceback.print_exc()
        exit(1)


def on_stream_event(event: SubscriptionResponseMessage) -> None:
    try:
        print("Callback was involked")
        message = str(event.binary_message.message, 'utf-8')
        topic = event.binary_message.context.topic
        print('Received new message on topic %s: %s' % (topic, message))
    except:
        traceback.print_exc()


def on_stream_error(error: Exception) -> bool:
    print('Received a stream error.', file=sys.stderr)
    traceback.print_exc()
    return False  # Return True to close stream, False to keep stream open.


def on_stream_closed() -> None:
    print('Subscribe to topic stream closed.')


if __name__ == '__main__':
    main()

My shadowmanager configuration looks like this:

{
  "reset": [],
  "merge": {
    "reset": [],
    "merge": {
      "strategy": {
        "type": "realTime"
      },
      "synchronize": {
        "coreThing": {
          "classic": true
        },
        "namedShadows": [
          "test",
          "dsvvefvfq"
        ],
        "direction": "betweenDeviceAndCloud"
      }
    }
  }
}

My component code looks like this:

{
  "accessControl": {
    "aws.greengrass.ShadowManager": {
      "com.xxxxxxxx.productivity.cycle_count:shadow:1": {
        "policyDescription": "allow access to shadows",
        "operations": [
          "aws.greengrass#GetThingShadow",
          "aws.greengrass#UpdateThingShadow",
          "aws.greengrass#ListNamedShadowsForThing"
        ],
        "resources": [
          "*"
        ]
      }
    },
    "aws.greengrass.ipc.pubsub": {
      "com.xxxxxxxx.productivity.cycle_count:pubsub:1": {
        "policyDescription": "Allows access to publish/subscribe to all topics.",
        "operations": [
          "aws.greengrass#PublishToTopic",
          "aws.greengrass#SubscribeToTopic"
        ],
        "resources": [
          "*"
        ]
      }
    }
  }
}

I am expecting that python code to "see" that the shadow has changed and execute the callback function on_stream_event. It is not.

Why?

flycast
질문됨 2년 전369회 조회
2개 답변
1

Hi again @flycast,

A couple things I notice:

  1. Your shadow manager configuration is not in the correct format
{
  "reset": [],
  "merge": {
    "reset": [],
    "merge": {

You have merge inside of another merge. Please make sure that the configuration is correctly set.

  1. You are subscribing to the topic named dsvvefvfq. Who or what do you expect to be publishing to dsvvefvfq?

Shadow manager publishes to local pubsub topics using the device shadow topics (https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-mqtt.html) as explained here: https://docs.aws.amazon.com/greengrass/v2/developerguide/interact-with-shadows-in-components.html#react-shadow-events.

Therefore, you should be subscribing to $aws/things/<your GG thing name here>/shadow/name/dsvvefvfq/#

You can subscribe to $aws/things/# which is a wildcard, you will then be able to see all traffic to the topics beginning with $aws/things/ and update your code accordingly.

Cheers,

Michael

AWS
전문가
답변함 2년 전
  • Whoa...I see your #1 above. I'll fix that. On #2 I'm confused. I am subscribing to * in the component as an allow everything. I will tighten that up when I get things working. The component will be able to see anything that way and that will eliminate misconfiguration as a problem correct?

  • Are you sure the wildcard wouldn't be $aws/things/* with an asterisk instead of hash?

  • Yes, I am quite sure that using * is not correct. Please see the documentation here: https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-publish-subscribe.html#ipc-operation-subscribetotopic which includes a note about using + or #.

    I think you're confusing the access control policy and the actual subscription. The access control policy that you set in the component configuration simply means that the component is allowed to perform an action. It does not mean that the component will do that action.

0

Too long for a comment: To answer your question #2 - I expect to publish data in a shadow for the core device to use to publish with.

When I publish an update to a shadow I am seeing the updates get logged in the greengrass.log so I know shadowmanager is getting them.

My current component config looks like:

{
  "accessControl": {
    "aws.greengrass.ShadowManager": {
      "com.xxxxxxxx.productivity.cycle_count:shadow:1": {
        "policyDescription": "",
        "operations": [
          "aws.greengrass#GetThingShadow",
          "aws.greengrass#UpdateThingShadow",
          "aws.greengrass#DeleteThingShadow",
          "aws.greengrass#ListNamedShadowsForThing"
        ],
        "resources": [
          "*"
        ]
      }
    },
    "aws.greengrass.ipc.pubsub": {
      "com.xxxxxxxx.productivity.cycle_count:pubsub:1": {
        "policyDescription": "",
        "operations": [
          "aws.greengrass#SubscribeToTopic"
        ],
        "resources": [
          "*"
        ]
      }
    }
  }
}

As I understand this the component configuration will get all shadow traffic from shadowManager and any mqtt packets from pubsub.

When I update the shadow dsvvefvfq I see the changes logged by shadowManager but nothing happens in my custom component log. I am expecting to see the callback execute. Bare minimum I expect to see the print("Callback was involked") message in the component log.

def on_stream_event(event: SubscriptionResponseMessage) -> None:
    try:
        print("Callback was involked")
        message = str(event.binary_message.message, 'utf-8')
        topic = event.binary_message.context.topic
        print('Received new message on topic %s: %s' % (topic, message))
    except:
        traceback.print_exc()

flycast
답변함 2년 전
  • In your comment below my answer you said that you are subscribing to *. This is not correct based on the code you provided. You are subscribing to a single topic in the call to subscribe_to_topic. The topic you are subscribing to is dsvvefvfq.

    Your component is allowed to subscribe to any topic, but at least in your code, you aren't subscribing to all topics, only a very specific topic. Please use my recommendation of subscribing to $aws/things/# instead.

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠