By using AWS re:Post, you agree to the Terms of Use

Questions tagged with Amazon Kinesis Data Streams

Sort by most recent
  • 1
  • 12 / page

Browse through the questions and answers listed below or filter and sort to narrow down your results.

Kinesis data stream iterator age spikes

Hi, I am working on a system that uses kinesis to ingest data, this data is then processed by a lambda function and kinesis delivery stream. We have CloudWatch alarms set up that trigger if the iterator age for a stream goes above 10 seconds, as this means that lambda has crashed and can no longer process data, causing a build up of data. Over the last couple of weeks, we have seen an increase in alarm triggers. The cause is that the iterator age spikes to an insanely high number and then drops again within 1 to 2 minutes. There does not appear to be any pattern to the spikes, sometimes it is multiple a day, sometimes not for a couple of days. Some articles and post seem to mention that this could be caused by data built up or hitting a limit of the stream. However, when these spikes happen, there is usually no data being added to the stream. There have even been instances where only the stream was configured, but there was no system to put any data on the stream, and the spikes still occurred. This is what the spikes look like over a 3-day period. ![Enter image description here](https://repost.aws/media/postImages/original/IMNvNIagSNQs6N55IZCIODCw) I have already checked the following causes, and it is None of these issues: * Lambda stuck processing * Lambda error (there is also an alarm for this) * A lot of data added in a short time The stream is configured with 1 shard, enhanced fan-out is disabled, server-side encryption is enabled, the data retention is set to 1 day, and there are only 2 consumers. The kinesis delivery stream is configured with convert record format enabled, converting data to Parquet from JSON using standard AWS functions, there is no custom lambda. The delivery stream reports no failures. The lambda has starting position "LATEST" and max record age at "-1". What could be the cause of these spikes? Or how could I investigate this and figure out a solution?
1
answers
0
votes
22
views
asked 12 days ago

Kinesis Firehose component for AWS Greengrass not sending data streams

Hello, I'm having a problem with linking the Kinesis Firehose aws greengrass component to the AWS Kinesis Service, so i would like to know why it's not working even with following the documentation ; In my Raspberry PI I deployed couple of components but for the sake of this question, i'm only going to invoke the Kinesis Firehose component and my custom python component to send data. in the deployment configs * aws.greengrass.KinesisFirehose ``` { "containerMode": "GreengrassContainer", "containerParams": { "devices": {}, "memorySize": 65535, "mountROSysfs": false, "volumes": {} }, "inputPayloadEncodingType": "binary", "lambdaExecutionParameters": { "EnvironmentVariables": { "DEFAULT_DELIVERY_STREAM_ARN": "arn:aws:firehose:eu-central-1:xxxxx:deliverystream/Tiny-video-stream", "DELIVERY_STREAM_QUEUE_SIZE": "5000", "PUBLISH_INTERVAL": "10" } }, "maxIdleTimeInSeconds": 60, "maxInstancesCount": 100, "maxQueueSize": 1000, "pinned": true, "pubsubTopics": { "0": { "topic": "kinesisfirehose/message/binary/#", "type": "PUB_SUB" }, "1": { "topic": "kinesisfirehose/message", "type": "PUB_SUB" }, "2": { "topic": "tinyml/message", "type": "PUB_SUB" } }, "statusTimeoutInSeconds": 60, "timeoutInSeconds": 10 } ``` * com.example.HelloWorld ``` { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "My first AWS IoT Greengrass component.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } }, "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToTopic", "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } }, "Message": "world" } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "Run": "python3 -u {artifacts:path}/hello_world.py \"{configuration:/Message}\"" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "Run": "py -3 -u {artifacts:path}/hello_world.py \"{configuration:/Message}\"" } } ] } ``` According to the documentation, kinesis component accepts : > JSON data on the kinesisfirehose/message topic > Binary data on the kinesisfirehose/message/binary/# topic And both of them through local topics So here is my python code where I send a message on the local json topic and subscribe to the "kinesisfirehose/message/status" : ``` import json import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, JsonMessage, BinaryMessage ) TIMEOUT = 30 ipc_client = awsiot.greengrasscoreipc.connect() topic = "kinesisfirehose/message" message = "Hello, World" message_data = { "request": { "data": "Data to send to the delivery stream." }, "id": "request123" } request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.json_message = JsonMessage() publish_message.json_message.message = message_data request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future = operation.get_response() future.result(TIMEOUT) print(f"{operation} ============= {future}") import time import traceback import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( IoTCoreMessage, QOS, SubscribeToIoTCoreRequest ) TIMEOUT = 10 class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: IoTCoreMessage) -> None: try: message = str(event.message.payload, "utf-8") topic_name = event.message.topic_name # Handle message. print(f"RECIEVED =======: {topic_name} --------- {message}") except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "kinesisfirehose/message/status" qos = QOS.AT_MOST_ONCE request = SubscribeToIoTCoreRequest() request.topic_name = topic request.qos = qos handler = StreamHandler() operation = ipc_client.new_subscribe_to_iot_core(handler) future = operation.activate(request) future.result(TIMEOUT) # Keep the main thread alive, or the process will exit. try : while True: time.sleep(10) except Exception as err : print(f"{err} =====================") finally: # To stop subscribing, close the operation stream. operation.close() ``` Policy attached to the greengrass's iam role : ``` { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": "arn:aws:s3:::s3-name-xxxx/*" }, { "Action": [ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect": "Allow", "Resource": [ "arn:aws:firehose:eu-central-1:xxxxx:deliverystream/Tiny-video-stream" ] } ] } ``` After multiple tests i noticed : * I can send MQTT * I can send to local topics * No new logs in the aws.greengrass.Kinesis Any ideas what am i have forgot to do?
1
answers
0
votes
56
views
asked a month ago
  • 1
  • 12 / page