By using AWS re:Post, you agree to the Terms of Use

Questions tagged with Amazon Kinesis Data Firehose

Sort by most recent

Browse through the questions and answers listed below or filter and sort to narrow down your results.

Kinesis Firehose component for AWS Greengrass not sending data streams

Hello, I'm having a problem with linking the Kinesis Firehose aws greengrass component to the AWS Kinesis Service, so i would like to know why it's not working even with following the documentation ; In my Raspberry PI I deployed couple of components but for the sake of this question, i'm only going to invoke the Kinesis Firehose component and my custom python component to send data. in the deployment configs * aws.greengrass.KinesisFirehose ``` { "containerMode": "GreengrassContainer", "containerParams": { "devices": {}, "memorySize": 65535, "mountROSysfs": false, "volumes": {} }, "inputPayloadEncodingType": "binary", "lambdaExecutionParameters": { "EnvironmentVariables": { "DEFAULT_DELIVERY_STREAM_ARN": "arn:aws:firehose:eu-central-1:xxxxx:deliverystream/Tiny-video-stream", "DELIVERY_STREAM_QUEUE_SIZE": "5000", "PUBLISH_INTERVAL": "10" } }, "maxIdleTimeInSeconds": 60, "maxInstancesCount": 100, "maxQueueSize": 1000, "pinned": true, "pubsubTopics": { "0": { "topic": "kinesisfirehose/message/binary/#", "type": "PUB_SUB" }, "1": { "topic": "kinesisfirehose/message", "type": "PUB_SUB" }, "2": { "topic": "tinyml/message", "type": "PUB_SUB" } }, "statusTimeoutInSeconds": 60, "timeoutInSeconds": 10 } ``` * com.example.HelloWorld ``` { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "My first AWS IoT Greengrass component.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } }, "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToTopic", "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } }, "Message": "world" } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "Run": "python3 -u {artifacts:path}/hello_world.py \"{configuration:/Message}\"" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "Run": "py -3 -u {artifacts:path}/hello_world.py \"{configuration:/Message}\"" } } ] } ``` According to the documentation, kinesis component accepts : > JSON data on the kinesisfirehose/message topic > Binary data on the kinesisfirehose/message/binary/# topic And both of them through local topics So here is my python code where I send a message on the local json topic and subscribe to the "kinesisfirehose/message/status" : ``` import json import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, JsonMessage, BinaryMessage ) TIMEOUT = 30 ipc_client = awsiot.greengrasscoreipc.connect() topic = "kinesisfirehose/message" message = "Hello, World" message_data = { "request": { "data": "Data to send to the delivery stream." }, "id": "request123" } request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.json_message = JsonMessage() publish_message.json_message.message = message_data request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future = operation.get_response() future.result(TIMEOUT) print(f"{operation} ============= {future}") import time import traceback import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( IoTCoreMessage, QOS, SubscribeToIoTCoreRequest ) TIMEOUT = 10 class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: IoTCoreMessage) -> None: try: message = str(event.message.payload, "utf-8") topic_name = event.message.topic_name # Handle message. print(f"RECIEVED =======: {topic_name} --------- {message}") except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "kinesisfirehose/message/status" qos = QOS.AT_MOST_ONCE request = SubscribeToIoTCoreRequest() request.topic_name = topic request.qos = qos handler = StreamHandler() operation = ipc_client.new_subscribe_to_iot_core(handler) future = operation.activate(request) future.result(TIMEOUT) # Keep the main thread alive, or the process will exit. try : while True: time.sleep(10) except Exception as err : print(f"{err} =====================") finally: # To stop subscribing, close the operation stream. operation.close() ``` Policy attached to the greengrass's iam role : ``` { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": "arn:aws:s3:::s3-name-xxxx/*" }, { "Action": [ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect": "Allow", "Resource": [ "arn:aws:firehose:eu-central-1:xxxxx:deliverystream/Tiny-video-stream" ] } ] } ``` After multiple tests i noticed : * I can send MQTT * I can send to local topics * No new logs in the aws.greengrass.Kinesis Any ideas what am i have forgot to do?
1
answers
0
votes
91
views
asked 3 months ago

How to create a kinesis firehose delivery stream with dynamic partitions enabled using python cdk?

I am trying to create a firehose delivery stream with dynamic partitions enabled. Below is what I have got so far. ``` analytics_delivery_stream = kinesisfirehose.CfnDeliveryStream( self, "AnalyticsDeliveryStream", delivery_stream_name='analytics', extended_s3_destination_configuration=kinesisfirehose.CfnDeliveryStream.ExtendedS3DestinationConfigurationProperty( bucket_arn=f'arn:aws:s3:::{analytic_bucket_name}', buffering_hints=kinesisfirehose.CfnDeliveryStream.BufferingHintsProperty( interval_in_seconds=60 ), dynamic_partitioning_configuration = kinesisfirehose.CfnDeliveryStream.DynamicPartitioningConfigurationProperty( enabled=True, retry_options=kinesisfirehose.CfnDeliveryStream.RetryOptionsProperty( duration_in_seconds=123 )), compression_format="UNCOMPRESSED", role_arn=firehose_role.role_arn, prefix="!{partitionKeyFromQuery:log_type}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/", error_output_prefix="errors/!{firehose:error-output-type}/!{timestamp:yyyy}/anyMonth/!{timestamp:dd}/", ) ) ``` When I run this, I get below error . `Processing Configuration is not enabled when DataPartitioning is enabled. ` I found below references to Processing Configuration in the docs ``` processing_configuration=kinesisfirehose.CfnDeliveryStream.ProcessingConfigurationProperty( enabled=False, processors=[kinesisfirehose.CfnDeliveryStream.ProcessorProperty( type="type", # the properties below are optional parameters=[kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty( parameter_name="parameterName", parameter_value="parameterValue" )] )] ), ``` I am not sure what values to put for **parameters** or **type** inside processing_configuration. I have logs being put into firehose with below structure. type A - {'log_type':'type_A_log',....other props....} type B - {'log_type':'type_B_log',....other props....} Using dynamic partitioning, I want to achieve the scenario where all logs of type A go into type_A_log directory inside s3 and type B log into type_B_log directory. Can someone please help here ? I am going down a rabbithole.
0
answers
0
votes
42
views
asked 4 months ago