MQTT based file delivery - requested file ID not found

0

I am trying to set up MQTT based file delivery. See attached screenshot for details.

Procedure:

  1. Publish DescribeStream request to describe-topic.
  2. Publish GetStream request to get-topic with fileID supplied by description topic.
  3. Request rejected based on fileID.

Policies on device (italics are dummy values):

  • Allow - iot:Receive, Publish - arn:aws:iot:myregion:123123123:topic/$aws/things/thingName/streams/*
  • Allow - iot:Subscribe - arn:aws:iot:myregion:123123123:topicfilter/$aws/things/thingName/streams/*
  • Allow - iot:Connect - arn:aws:iot:myregion:123123123:client/clientName

Other

  • Sensored thingnames on image are identical
  • Region on device and bucket/file are identical
  • Behaviour on device is identical to MQTT test client
  • All other MQTT operations I have tried work as expected

I cannot understand what I'm doing wrong here, and I was not able to find any information on the ResourceNotFound rejection in the docs. I know that the stream works on boto3. I have set up different streams, same error. I have tried altering the request JSON, but then I get different errors eg. json format error, value type error, wrong stream version etc.

If anyone has any experience with this please share.

EDIT: Listed wrong arn for subscribe for sub policy. Was topic, is topicfilter.

mqtt test client log

gefragt vor 2 Jahren282 Aufrufe
2 Antworten
1

Hi,

the resource arn in an IoT policy is different for publish/receive and subscribe:

  • publish/receive: arn:aws:iot:region:123456789012:topic/YOUR_TOPIC
  • subscribern:aws:iot:region:123456789012:topicfilter/YOUR_TOPIC

Example:

  • Create a stream:

stream.json:

{
  "streamId": "e571acbc-16db-11ec-bb3a-026f9576feb5",
  "description": "This stream is used for MQTT-based file delivery.",
  "files": [
      {
          "fileId": 123,
          "s3Location": {
              "bucket":"REPLACE_WITH_S3_BUCKET",
              "key":"REPLACE_WITH_S3_OBJECT"
          }
      }
  ],
  "roleArn": "arn:aws:iam::123456789012:role/ROLE_WHICH_PROVIDES_IOT_ACCESS_TO_YOUR_S3_BUCKET"
}
aws iot create-stream --cli-input-json file://stream.json
  • Code snippets (Python)

Subscribe

sub_topic = f'$aws/things/{args.client_id}/streams/{args.stream_id}/#'
    print("Subscribing to topic '{}'...".format(sub_topic))
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=sub_topic,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_message_received)

on_message_received handler:

def on_message_received(topic, payload, dup, qos, retain, **kwargs):
    global BLOCKS_RECEIVED, FILE_ID, FILE_SIZE
    print("Received message from topic '{}': {}".format(topic, payload))
    if topic.endswith('/description/json'):
        print('stream description')
        FILE_ID = json.loads(payload.decode())['r'][0]['f']
        FILE_SIZE = json.loads(payload.decode())['r'][0]['z']

    if topic.endswith('/data/json'):
        BLOCKS_RECEIVED += 1
        block = json.loads(payload.decode())['i']
        decoded = json.loads(payload.decode())['p']

        file_name = f'{args.client_id}-{block}.part'
        f = open(f'{PARTIAL_DIR}/{file_name}', "w")
        f.write(decoded)
        f.close()
        print(f'block: {block} file_name: {file_name}')
        print(f'NUM_BLOCKS: {NUM_BLOCKS} BLOCKS_RECEIVED: {BLOCKS_RECEIVED}')

        if NUM_BLOCKS == BLOCKS_RECEIVED:
            print('ALL BLOCKS RECEIVED')
            received_all_event.set()

Cheers,
Philipp

AWS
EXPERTE
beantwortet vor 2 Jahren
  • Hi, Thanks a lot for your response. This was unfortunately just a typo on my part, the actual sub policy is topicfilter, not topic. rgds

0
Akzeptierte Antwort

I found the issue. When creating the stream I did not specify the fileId. Default fileId is 0. This does not work. After creating a new stream and specifying a fileId it works as expected.

beantwortet vor 2 Jahren

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen