By using AWS re:Post, you agree to the Terms of Use

Questions tagged with Amazon Kinesis Data Streams

Sort by most recent

Browse through the questions and answers listed below or filter and sort to narrow down your results.

Kinesis Firehose component for AWS Greengrass not sending data streams

Hello, I'm having a problem with linking the Kinesis Firehose aws greengrass component to the AWS Kinesis Service, so i would like to know why it's not working even with following the documentation ; In my Raspberry PI I deployed couple of components but for the sake of this question, i'm only going to invoke the Kinesis Firehose component and my custom python component to send data. in the deployment configs * aws.greengrass.KinesisFirehose ``` { "containerMode": "GreengrassContainer", "containerParams": { "devices": {}, "memorySize": 65535, "mountROSysfs": false, "volumes": {} }, "inputPayloadEncodingType": "binary", "lambdaExecutionParameters": { "EnvironmentVariables": { "DEFAULT_DELIVERY_STREAM_ARN": "arn:aws:firehose:eu-central-1:xxxxx:deliverystream/Tiny-video-stream", "DELIVERY_STREAM_QUEUE_SIZE": "5000", "PUBLISH_INTERVAL": "10" } }, "maxIdleTimeInSeconds": 60, "maxInstancesCount": 100, "maxQueueSize": 1000, "pinned": true, "pubsubTopics": { "0": { "topic": "kinesisfirehose/message/binary/#", "type": "PUB_SUB" }, "1": { "topic": "kinesisfirehose/message", "type": "PUB_SUB" }, "2": { "topic": "tinyml/message", "type": "PUB_SUB" } }, "statusTimeoutInSeconds": 60, "timeoutInSeconds": 10 } ``` * com.example.HelloWorld ``` { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "My first AWS IoT Greengrass component.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } }, "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToTopic", "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } }, "Message": "world" } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "Run": "python3 -u {artifacts:path}/hello_world.py \"{configuration:/Message}\"" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "Run": "py -3 -u {artifacts:path}/hello_world.py \"{configuration:/Message}\"" } } ] } ``` According to the documentation, kinesis component accepts : > JSON data on the kinesisfirehose/message topic > Binary data on the kinesisfirehose/message/binary/# topic And both of them through local topics So here is my python code where I send a message on the local json topic and subscribe to the "kinesisfirehose/message/status" : ``` import json import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, JsonMessage, BinaryMessage ) TIMEOUT = 30 ipc_client = awsiot.greengrasscoreipc.connect() topic = "kinesisfirehose/message" message = "Hello, World" message_data = { "request": { "data": "Data to send to the delivery stream." }, "id": "request123" } request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.json_message = JsonMessage() publish_message.json_message.message = message_data request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future = operation.get_response() future.result(TIMEOUT) print(f"{operation} ============= {future}") import time import traceback import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( IoTCoreMessage, QOS, SubscribeToIoTCoreRequest ) TIMEOUT = 10 class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: IoTCoreMessage) -> None: try: message = str(event.message.payload, "utf-8") topic_name = event.message.topic_name # Handle message. print(f"RECIEVED =======: {topic_name} --------- {message}") except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "kinesisfirehose/message/status" qos = QOS.AT_MOST_ONCE request = SubscribeToIoTCoreRequest() request.topic_name = topic request.qos = qos handler = StreamHandler() operation = ipc_client.new_subscribe_to_iot_core(handler) future = operation.activate(request) future.result(TIMEOUT) # Keep the main thread alive, or the process will exit. try : while True: time.sleep(10) except Exception as err : print(f"{err} =====================") finally: # To stop subscribing, close the operation stream. operation.close() ``` Policy attached to the greengrass's iam role : ``` { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": "arn:aws:s3:::s3-name-xxxx/*" }, { "Action": [ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect": "Allow", "Resource": [ "arn:aws:firehose:eu-central-1:xxxxx:deliverystream/Tiny-video-stream" ] } ] } ``` After multiple tests i noticed : * I can send MQTT * I can send to local topics * No new logs in the aws.greengrass.Kinesis Any ideas what am i have forgot to do?
1
answers
0
votes
95
views
asked 3 months ago

When exactly does KCL drop records? Can't recreate (but need to handle)

From [AWS docs](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html): "The most common cause of skipped records is an unhandled exception thrown from processRecords. The Kinesis Client Library (KCL) relies on your processRecords code to handle any exceptions that arise from processing the data records. Any exception thrown from processRecords is absorbed by the KCL. To avoid infinite retries on a recurring failure, the KCL does not resend the batch of records processed at the time of the exception. The KCL then calls processRecords for the next batch of data records without restarting the record processor. This effectively results in consumer applications observing skipped records. To prevent skipped records, handle all exceptions within processRecords appropriately." Our KCL client is written in node, following [this example](https://github.com/awslabs/amazon-kinesis-client-nodejs/blob/master/samples/basic_sample/consumer/sample_kcl_app.js). No matter how we tried crashing it, the KCL function simply exits and next time we start it starts from the SAME checkpoint. In other words the above statement doesn't seem to hold true. It does not skip records. For our application we can't afford dropped records and need to be 100% sure this can't happen. Can someone with more kinesis/aws experience comment how exactly the above can happen on the consumer side (we're handling the producer already)?
0
answers
0
votes
15
views
asked 5 months ago

Why Records: [] is empty when i consume data from kinesis stream by python script?

i am trying to consume data using python script from kinesis data stream which is created successfully and data is produced or streamed to it successfully , but when running consumer script in python : ``` import boto3 import json from datetime import datetime import time my_stream_name = 'stream_name' kinesis_client = boto3.client('kinesis', region_name='us-east-1') response = kinesis_client.describe_stream(StreamName=my_stream_name) my_shard_id = response['StreamDescription']['Shards'][0]['ShardId'] shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name, ShardId=my_shard_id, ShardIteratorType='LATEST') my_shard_iterator = shard_iterator['ShardIterator'] record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator, Limit=2) while 'NextShardIterator' in record_response: record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'], Limit=2) print(record_response) # wait for 5 seconds time.sleep(5) ``` But the output of the message data is empty ('Records': []): ``` {'Records': [], 'NextShardIterator': 'AAAAAAAAAAFFVFpvvveOquLUe7WO9nZAcYNQdcS6f6a+YGrrrjZo1gULcu/ZYxC7AB+xVlUhgL9UFPrQ22qmcQa6iIsmuKWl26buBk3utXlVqiGuDUYSgqMOtkp0Y7pJwa6N/I0fYfl2PLTXp5Qz8+5ZYuTW1KDt+PeSU3992bwgdOm7744cxcSnYFaQuHqfa0vLlaRBTOACVz4fwjggUBN01WdsoEjKmgtfNmuHSA7s9LLNzAapMg==', 'MillisBehindLatest': 0, 'ResponseMetadata': {'RequestId': 'e451dd27-c867-cf3d-be83-edbe95e9da9f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e451dd27-c867-cf3d-be83-edbe95e9da9f', 'x-amz-id-2': 'ClSlC3gRJuEqL9YJcHgC2N/TLSv56o+6406ki2+Zohnfo/erFVMDpPqkEWT+XAeeHXCdhYBbnOeZBPyesbXnVs45KQG78eRU', 'date': 'Thu, 14 Apr 2022 14:23:21 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '308'}, 'RetryAttempts': 0}} ```
2
answers
0
votes
123
views
asked 6 months ago