Kinesis Firehose component for AWS Greengrass not sending data streams

1

Hello,

I'm having a problem with linking the Kinesis Firehose aws greengrass component to the AWS Kinesis Service, so i would like to know why it's not working even with following the documentation ;

In my Raspberry PI I deployed couple of components but for the sake of this question, i'm only going to invoke the Kinesis Firehose component and my custom python component to send data.

in the deployment configs

  • aws.greengrass.KinesisFirehose
{
  "containerMode": "GreengrassContainer",
  "containerParams": {
    "devices": {},
    "memorySize": 65535,
    "mountROSysfs": false,
    "volumes": {}
  },
  "inputPayloadEncodingType": "binary",
  "lambdaExecutionParameters": {
    "EnvironmentVariables": {
      "DEFAULT_DELIVERY_STREAM_ARN": "arn:aws:firehose:eu-central-1:xxxxx:deliverystream/Tiny-video-stream",
      "DELIVERY_STREAM_QUEUE_SIZE": "5000",
      "PUBLISH_INTERVAL": "10"
    }
  },
  "maxIdleTimeInSeconds": 60,
  "maxInstancesCount": 100,
  "maxQueueSize": 1000,
  "pinned": true,
  "pubsubTopics": {
    "0": {
      "topic": "kinesisfirehose/message/binary/#",
      "type": "PUB_SUB"
    },
    "1": {
      "topic": "kinesisfirehose/message",
      "type": "PUB_SUB"
    },
    "2": {
      "topic": "tinyml/message",
      "type": "PUB_SUB"
    }
  },
  "statusTimeoutInSeconds": 60,
  "timeoutInSeconds": 10
}
  • com.example.HelloWorld
{
    "RecipeFormatVersion": "2020-01-25",
    "ComponentName": "com.example.HelloWorld",
    "ComponentVersion": "1.0.0",
    "ComponentDescription": "My first AWS IoT Greengrass component.",
    "ComponentPublisher": "Amazon",
    "ComponentConfiguration": {
      "DefaultConfiguration": {
        "accessControl": {
            "aws.greengrass.ipc.mqttproxy": {
                "com.example.MyIoTCorePubSubComponent:mqttproxy:1": {
                  "policyDescription": "Allows access to publish/subscribe to all topics.",
                  "operations": [
                    "aws.greengrass#PublishToIoTCore",
                    "aws.greengrass#SubscribeToIoTCore"
                  ],
                  "resources": [
                    "*"
                  ]
                }
              },
            "aws.greengrass.ipc.pubsub": {
              "com.example.HelloWorld:pubsub:1": {
                "policyDescription": "Allows access to publish/subscribe to all topics.",
                "operations": [
                  "aws.greengrass#PublishToTopic",
                  "aws.greengrass#SubscribeToTopic"
                ],
                "resources": [
                  "*"
                ]
              }
            }
          },
        "Message": "world"
      }
    },
    "Manifests": [
      {
        "Platform": {
          "os": "linux"
        },
        "Lifecycle": {
          "Run": "python3 -u {artifacts:path}/hello_world.py \"{configuration:/Message}\""
        }
      },
      {
        "Platform": {
          "os": "windows"
        },
        "Lifecycle": {
          "Run": "py -3 -u {artifacts:path}/hello_world.py \"{configuration:/Message}\""
        }
      }
    ]
  }

According to the documentation, kinesis component accepts :

JSON data on the kinesisfirehose/message topic Binary data on the kinesisfirehose/message/binary/# topic And both of them through local topics

So here is my python code where I send a message on the local json topic and subscribe to the "kinesisfirehose/message/status" :

import json 
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
    PublishToTopicRequest,
    PublishMessage,
    JsonMessage,
    BinaryMessage
)

TIMEOUT = 30

ipc_client = awsiot.greengrasscoreipc.connect()
                    
topic = "kinesisfirehose/message"
message = "Hello, World"
message_data = {
  "request": {
    "data": "Data to send to the delivery stream."
  },
  "id": "request123"
}
request = PublishToTopicRequest()
request.topic = topic
publish_message = PublishMessage()
publish_message.json_message = JsonMessage()
publish_message.json_message.message = message_data
request.publish_message = publish_message
operation = ipc_client.new_publish_to_topic()
operation.activate(request)
future = operation.get_response()
future.result(TIMEOUT)


print(f"{operation} ============= {future}")




import time
import traceback
import awsiot.greengrasscoreipc.client as client

from awsiot.greengrasscoreipc.model import (
    IoTCoreMessage,
    QOS,
    SubscribeToIoTCoreRequest
)

TIMEOUT = 10


class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
    def __init__(self):
        super().__init__()

    def on_stream_event(self, event: IoTCoreMessage) -> None:
        try:
            message = str(event.message.payload, "utf-8")
            topic_name = event.message.topic_name
            # Handle message.
            print(f"RECIEVED =======:  {topic_name} --------- {message}")
        except:
            traceback.print_exc()

    def on_stream_error(self, error: Exception) -> bool:
        # Handle error.
        return True  # Return True to close stream, False to keep stream open.

    def on_stream_closed(self) -> None:
        # Handle close.
        pass


topic = "kinesisfirehose/message/status"
qos = QOS.AT_MOST_ONCE

request = SubscribeToIoTCoreRequest()
request.topic_name = topic
request.qos = qos
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_iot_core(handler)
future = operation.activate(request)
future.result(TIMEOUT)

# Keep the main thread alive, or the process will exit.
try : 
    while  True:
        time.sleep(10)
except Exception as err : 
    print(f"{err} =====================")
finally:
# To stop subscribing, close the operation stream.
    operation.close()

Policy attached to the greengrass's iam role :

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": "arn:aws:s3:::s3-name-xxxx/*"
        },
        {
            "Action": [
                "firehose:PutRecord",
                "firehose:PutRecordBatch"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:firehose:eu-central-1:xxxxx:deliverystream/Tiny-video-stream"
            ]
        }
    ]
}

After multiple tests i noticed :

  • I can send MQTT
  • I can send to local topics
  • No new logs in the aws.greengrass.Kinesis

Any ideas what am i have forgot to do?

1 Answer
0

Hi,

I am wondering if the LegacySubscriptionRouter is maybe not configured correctly and if that is causing an issue here.

https://docs.aws.amazon.com/greengrass/v2/developerguide/kinesis-firehose-component.html#kinesis-firehose-component-requirements

To receive output data from this component, you must merge the following configuration update for the legacy subscription router component (aws.greengrass.LegacySubscriptionRouter) when you deploy this component. This configuration specifies the topic where this component publishes responses.

Would you be able to provide any logs (/greengrass/v2/logs/aws.greengrass.KinesisFirehose.log and /greengrass/v2/logs/greengrass.log)? Adding debug statements to the Python code making Pub/Sub calls may also help debug further.

Since you are using Python code and making PubSub calls directly, it may not be able to receive status messages from the Kinesis Component, as LegacySubscriptionRouter is needed and the only 3 possible values for the target key here are Cloud, lambda name or ARN ( https://docs.aws.amazon.com/greengrass/v2/developerguide/legacy-subscription-router-component.html#legacy-subscription-router-component-configuration). If you can set target to cloud, you could use the MQTT Test Client in the Greengrass console to see if it is working to start with.

Let us know if that does not work.

Thanks!

AWS
answered 2 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions