2回答
- 新しい順
- 投票が多い順
- コメントが多い順
1
Hi,
the resource arn in an IoT policy is different for publish/receive and subscribe:
- publish/receive: arn:aws:iot:region:123456789012:topic/YOUR_TOPIC
- subscribern:aws:iot:region:123456789012:topicfilter/YOUR_TOPIC
Example:
- Create a stream:
stream.json:
{
"streamId": "e571acbc-16db-11ec-bb3a-026f9576feb5",
"description": "This stream is used for MQTT-based file delivery.",
"files": [
{
"fileId": 123,
"s3Location": {
"bucket":"REPLACE_WITH_S3_BUCKET",
"key":"REPLACE_WITH_S3_OBJECT"
}
}
],
"roleArn": "arn:aws:iam::123456789012:role/ROLE_WHICH_PROVIDES_IOT_ACCESS_TO_YOUR_S3_BUCKET"
}
aws iot create-stream --cli-input-json file://stream.json
- Code snippets (Python)
Subscribe
sub_topic = f'$aws/things/{args.client_id}/streams/{args.stream_id}/#'
print("Subscribing to topic '{}'...".format(sub_topic))
subscribe_future, packet_id = mqtt_connection.subscribe(
topic=sub_topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message_received)
on_message_received handler:
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
global BLOCKS_RECEIVED, FILE_ID, FILE_SIZE
print("Received message from topic '{}': {}".format(topic, payload))
if topic.endswith('/description/json'):
print('stream description')
FILE_ID = json.loads(payload.decode())['r'][0]['f']
FILE_SIZE = json.loads(payload.decode())['r'][0]['z']
if topic.endswith('/data/json'):
BLOCKS_RECEIVED += 1
block = json.loads(payload.decode())['i']
decoded = json.loads(payload.decode())['p']
file_name = f'{args.client_id}-{block}.part'
f = open(f'{PARTIAL_DIR}/{file_name}', "w")
f.write(decoded)
f.close()
print(f'block: {block} file_name: {file_name}')
print(f'NUM_BLOCKS: {NUM_BLOCKS} BLOCKS_RECEIVED: {BLOCKS_RECEIVED}')
if NUM_BLOCKS == BLOCKS_RECEIVED:
print('ALL BLOCKS RECEIVED')
received_all_event.set()
Cheers,
Philipp
0
I found the issue. When creating the stream I did not specify the fileId. Default fileId is 0. This does not work. After creating a new stream and specifying a fileId it works as expected.
回答済み 2年前
Hi, Thanks a lot for your response. This was unfortunately just a typo on my part, the actual sub policy is topicfilter, not topic. rgds