Hello,
I'm having a problem with linking the Kinesis Firehose aws greengrass component to the AWS Kinesis Service, so i would like to know why it's not working even with following the documentation ;
In my Raspberry PI I deployed couple of components but for the sake of this question, i'm only going to invoke the Kinesis Firehose component and my custom python component to send data.
in the deployment configs
- aws.greengrass.KinesisFirehose
{
"containerMode": "GreengrassContainer",
"containerParams": {
"devices": {},
"memorySize": 65535,
"mountROSysfs": false,
"volumes": {}
},
"inputPayloadEncodingType": "binary",
"lambdaExecutionParameters": {
"EnvironmentVariables": {
"DEFAULT_DELIVERY_STREAM_ARN": "arn:aws:firehose:eu-central-1:xxxxx:deliverystream/Tiny-video-stream",
"DELIVERY_STREAM_QUEUE_SIZE": "5000",
"PUBLISH_INTERVAL": "10"
}
},
"maxIdleTimeInSeconds": 60,
"maxInstancesCount": 100,
"maxQueueSize": 1000,
"pinned": true,
"pubsubTopics": {
"0": {
"topic": "kinesisfirehose/message/binary/#",
"type": "PUB_SUB"
},
"1": {
"topic": "kinesisfirehose/message",
"type": "PUB_SUB"
},
"2": {
"topic": "tinyml/message",
"type": "PUB_SUB"
}
},
"statusTimeoutInSeconds": 60,
"timeoutInSeconds": 10
}
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.HelloWorld",
"ComponentVersion": "1.0.0",
"ComponentDescription": "My first AWS IoT Greengrass component.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"*"
]
}
},
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToTopic",
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
},
"Message": "world"
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Lifecycle": {
"Run": "python3 -u {artifacts:path}/hello_world.py \"{configuration:/Message}\""
}
},
{
"Platform": {
"os": "windows"
},
"Lifecycle": {
"Run": "py -3 -u {artifacts:path}/hello_world.py \"{configuration:/Message}\""
}
}
]
}
According to the documentation, kinesis component accepts :
JSON data on the kinesisfirehose/message topic
Binary data on the kinesisfirehose/message/binary/# topic
And both of them through local topics
So here is my python code where I send a message on the local json topic and subscribe to the "kinesisfirehose/message/status" :
import json
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
PublishToTopicRequest,
PublishMessage,
JsonMessage,
BinaryMessage
)
TIMEOUT = 30
ipc_client = awsiot.greengrasscoreipc.connect()
topic = "kinesisfirehose/message"
message = "Hello, World"
message_data = {
"request": {
"data": "Data to send to the delivery stream."
},
"id": "request123"
}
request = PublishToTopicRequest()
request.topic = topic
publish_message = PublishMessage()
publish_message.json_message = JsonMessage()
publish_message.json_message.message = message_data
request.publish_message = publish_message
operation = ipc_client.new_publish_to_topic()
operation.activate(request)
future = operation.get_response()
future.result(TIMEOUT)
print(f"{operation} ============= {future}")
import time
import traceback
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
IoTCoreMessage,
QOS,
SubscribeToIoTCoreRequest
)
TIMEOUT = 10
class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: IoTCoreMessage) -> None:
try:
message = str(event.message.payload, "utf-8")
topic_name = event.message.topic_name
# Handle message.
print(f"RECIEVED =======: {topic_name} --------- {message}")
except:
traceback.print_exc()
def on_stream_error(self, error: Exception) -> bool:
# Handle error.
return True # Return True to close stream, False to keep stream open.
def on_stream_closed(self) -> None:
# Handle close.
pass
topic = "kinesisfirehose/message/status"
qos = QOS.AT_MOST_ONCE
request = SubscribeToIoTCoreRequest()
request.topic_name = topic
request.qos = qos
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_iot_core(handler)
future = operation.activate(request)
future.result(TIMEOUT)
# Keep the main thread alive, or the process will exit.
try :
while True:
time.sleep(10)
except Exception as err :
print(f"{err} =====================")
finally:
# To stop subscribing, close the operation stream.
operation.close()
Policy attached to the greengrass's iam role :
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject"
],
"Resource": "arn:aws:s3:::s3-name-xxxx/*"
},
{
"Action": [
"firehose:PutRecord",
"firehose:PutRecordBatch"
],
"Effect": "Allow",
"Resource": [
"arn:aws:firehose:eu-central-1:xxxxx:deliverystream/Tiny-video-stream"
]
}
]
}
After multiple tests i noticed :
- I can send MQTT
- I can send to local topics
- No new logs in the aws.greengrass.Kinesis
Any ideas what am i have forgot to do?