MQTT based file delivery - requested file ID not found

0

I am trying to set up MQTT based file delivery. See attached screenshot for details.

Procedure:

  1. Publish DescribeStream request to describe-topic.
  2. Publish GetStream request to get-topic with fileID supplied by description topic.
  3. Request rejected based on fileID.

Policies on device (italics are dummy values):

  • Allow - iot:Receive, Publish - arn:aws:iot:myregion:123123123:topic/$aws/things/thingName/streams/*
  • Allow - iot:Subscribe - arn:aws:iot:myregion:123123123:topicfilter/$aws/things/thingName/streams/*
  • Allow - iot:Connect - arn:aws:iot:myregion:123123123:client/clientName

Other

  • Sensored thingnames on image are identical
  • Region on device and bucket/file are identical
  • Behaviour on device is identical to MQTT test client
  • All other MQTT operations I have tried work as expected

I cannot understand what I'm doing wrong here, and I was not able to find any information on the ResourceNotFound rejection in the docs. I know that the stream works on boto3. I have set up different streams, same error. I have tried altering the request JSON, but then I get different errors eg. json format error, value type error, wrong stream version etc.

If anyone has any experience with this please share.

EDIT: Listed wrong arn for subscribe for sub policy. Was topic, is topicfilter.

mqtt test client log

asked 2 years ago277 views
2 Answers
1

Hi,

the resource arn in an IoT policy is different for publish/receive and subscribe:

  • publish/receive: arn:aws:iot:region:123456789012:topic/YOUR_TOPIC
  • subscribern:aws:iot:region:123456789012:topicfilter/YOUR_TOPIC

Example:

  • Create a stream:

stream.json:

{
  "streamId": "e571acbc-16db-11ec-bb3a-026f9576feb5",
  "description": "This stream is used for MQTT-based file delivery.",
  "files": [
      {
          "fileId": 123,
          "s3Location": {
              "bucket":"REPLACE_WITH_S3_BUCKET",
              "key":"REPLACE_WITH_S3_OBJECT"
          }
      }
  ],
  "roleArn": "arn:aws:iam::123456789012:role/ROLE_WHICH_PROVIDES_IOT_ACCESS_TO_YOUR_S3_BUCKET"
}
aws iot create-stream --cli-input-json file://stream.json
  • Code snippets (Python)

Subscribe

sub_topic = f'$aws/things/{args.client_id}/streams/{args.stream_id}/#'
    print("Subscribing to topic '{}'...".format(sub_topic))
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=sub_topic,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_message_received)

on_message_received handler:

def on_message_received(topic, payload, dup, qos, retain, **kwargs):
    global BLOCKS_RECEIVED, FILE_ID, FILE_SIZE
    print("Received message from topic '{}': {}".format(topic, payload))
    if topic.endswith('/description/json'):
        print('stream description')
        FILE_ID = json.loads(payload.decode())['r'][0]['f']
        FILE_SIZE = json.loads(payload.decode())['r'][0]['z']

    if topic.endswith('/data/json'):
        BLOCKS_RECEIVED += 1
        block = json.loads(payload.decode())['i']
        decoded = json.loads(payload.decode())['p']

        file_name = f'{args.client_id}-{block}.part'
        f = open(f'{PARTIAL_DIR}/{file_name}', "w")
        f.write(decoded)
        f.close()
        print(f'block: {block} file_name: {file_name}')
        print(f'NUM_BLOCKS: {NUM_BLOCKS} BLOCKS_RECEIVED: {BLOCKS_RECEIVED}')

        if NUM_BLOCKS == BLOCKS_RECEIVED:
            print('ALL BLOCKS RECEIVED')
            received_all_event.set()

Cheers,
Philipp

AWS
EXPERT
answered 2 years ago
  • Hi, Thanks a lot for your response. This was unfortunately just a typo on my part, the actual sub policy is topicfilter, not topic. rgds

0
Accepted Answer

I found the issue. When creating the stream I did not specify the fileId. Default fileId is 0. This does not work. After creating a new stream and specifying a fileId it works as expected.

answered 2 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions