By using AWS re:Post, you agree to the Terms of Use
/Amazon Kinesis Data Streams/

Questions tagged with Amazon Kinesis Data Streams

Sort by most recent
  • 1
  • 90 / page

Browse through the questions and answers listed below or filter and sort to narrow down your results.

When exactly does KCL drop records? Can't recreate (but need to handle)

From [AWS docs](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html): "The most common cause of skipped records is an unhandled exception thrown from processRecords. The Kinesis Client Library (KCL) relies on your processRecords code to handle any exceptions that arise from processing the data records. Any exception thrown from processRecords is absorbed by the KCL. To avoid infinite retries on a recurring failure, the KCL does not resend the batch of records processed at the time of the exception. The KCL then calls processRecords for the next batch of data records without restarting the record processor. This effectively results in consumer applications observing skipped records. To prevent skipped records, handle all exceptions within processRecords appropriately." Our KCL client is written in node, following [this example](https://github.com/awslabs/amazon-kinesis-client-nodejs/blob/master/samples/basic_sample/consumer/sample_kcl_app.js). No matter how we tried crashing it, the KCL function simply exits and next time we start it starts from the SAME checkpoint. In other words the above statement doesn't seem to hold true. It does not skip records. For our application we can't afford dropped records and need to be 100% sure this can't happen. Can someone with more kinesis/aws experience comment how exactly the above can happen on the consumer side (we're handling the producer already)?
0
answers
0
votes
2
views
asked a month ago

Why Records: [] is empty when i consume data from kinesis stream by python script?

i am trying to consume data using python script from kinesis data stream which is created successfully and data is produced or streamed to it successfully , but when running consumer script in python : ``` import boto3 import json from datetime import datetime import time my_stream_name = 'stream_name' kinesis_client = boto3.client('kinesis', region_name='us-east-1') response = kinesis_client.describe_stream(StreamName=my_stream_name) my_shard_id = response['StreamDescription']['Shards'][0]['ShardId'] shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name, ShardId=my_shard_id, ShardIteratorType='LATEST') my_shard_iterator = shard_iterator['ShardIterator'] record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator, Limit=2) while 'NextShardIterator' in record_response: record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'], Limit=2) print(record_response) # wait for 5 seconds time.sleep(5) ``` But the output of the message data is empty ('Records': []): ``` {'Records': [], 'NextShardIterator': 'AAAAAAAAAAFFVFpvvveOquLUe7WO9nZAcYNQdcS6f6a+YGrrrjZo1gULcu/ZYxC7AB+xVlUhgL9UFPrQ22qmcQa6iIsmuKWl26buBk3utXlVqiGuDUYSgqMOtkp0Y7pJwa6N/I0fYfl2PLTXp5Qz8+5ZYuTW1KDt+PeSU3992bwgdOm7744cxcSnYFaQuHqfa0vLlaRBTOACVz4fwjggUBN01WdsoEjKmgtfNmuHSA7s9LLNzAapMg==', 'MillisBehindLatest': 0, 'ResponseMetadata': {'RequestId': 'e451dd27-c867-cf3d-be83-edbe95e9da9f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e451dd27-c867-cf3d-be83-edbe95e9da9f', 'x-amz-id-2': 'ClSlC3gRJuEqL9YJcHgC2N/TLSv56o+6406ki2+Zohnfo/erFVMDpPqkEWT+XAeeHXCdhYBbnOeZBPyesbXnVs45KQG78eRU', 'date': 'Thu, 14 Apr 2022 14:23:21 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '308'}, 'RetryAttempts': 0}} ```
2
answers
0
votes
10
views
asked a month ago

Error when trying to parse OpenTelemetry 0.7.0 data from metric streams

I'm trying to parse open telemetry data from metrics stream and sent to s3 bucket, without any compressions. I downloaded the files from s3 (binary), they look like ``` ¹— µ— Ô  cloud.provider aws " cloud.account.id 935705392901  cloud.region us-west-2 x aws.exporter.arnd ...........// ARN... ê  NamespaceAWS/NetworkELB  MetricNameHealthyHostCount = ``` hex: ``` 00000000: aec2 020a aac2 020a d401 0a17 0a0e 636c ..............cl 00000010: 6f75 642e 7072 6f76 6964 6572 1205 0a03 oud.provider.... 00000020: 6177 730a 220a 1063 6c6f 7564 2e61 6363 aws."..cloud.acc // skipped 00000050: 2e72 6567 696f 6e12 0b0a 0975 732d 7765 .region....us-we 00000060: 7374 2d32 0a78 0a10 6177 732e 6578 706f st-2.x..aws.expo 00000070: 7274 6572 2e61 726e 1264 0a62 6172 6e3a rter.arn.d.barn: // skipped 000000e0: c002 12e7 020a 2f61 6d61 7a6f 6e61 7773 ....../amazonaws 000000f0: 2e63 6f6d 2f41 5753 2f4e 6574 776f 726b .com/AWS/Network 00000100: 454c 422f 556e 4865 616c 7468 7948 6f73 ELB/UnHealthyHos ... ``` and I followed the doc https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-parse.html trying to parse it using javascript, but I keep getting an error ``` Error: Unknown base64 encoding at char: ¹ ``` from this line in the sample code from the doc ``` const reader = new pb.BinaryReader(data) ``` the `google-protobuf` I used is `3.19.3`, and `metrics_service_pb` is generated from proto that from the doc as well. Does anyone know how to properly parse the binary data with javascript? Thanks, Bill
1
answers
0
votes
3
views
asked 4 months ago

Help processing Kinessis Records with KCL and Java

How am I supposed to process the actual record in Java using KCL? I'm following the guidance provided https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html, I can connect to the data stream, I can get the number of records available, however what the example doesn't show is how to actually get the record (Json string). From the example I can see that I can use `r.data()` to get the record's data, it comes as a read only `ByteBuffer`, I can convert this to string by using `StandardCharsets.US_ASCII.decode(r.data()).toString()`, however the resulting string is definitely encoded, I have tried doing Base64 decoding but I get error `java.lang.IllegalArgumentException: Illegal base64 character 3f`. So what is the simplest way to get the payload? Below is my `processRecords` method: ``` public void processRecords(ProcessRecordsInput processRecordsInput) { try { System.out.println("Processing " + processRecordsInput.records().size() + " record(s)"); processRecordsInput.records().forEach((r) -> { try { Decoder dec = Base64.getDecoder(); String myString = StandardCharsets.US_ASCII.decode(r.data()).toString(); byte[] bt = dec.decode(myString); } catch (Exception e) { e.printStackTrace(); } }); } catch (Throwable t) { System.out.println("Caught throwable while processing records. Aborting."); Runtime.getRuntime().halt(1); } finally { } } ``` From here I can get `myString` but when I get to `bt` I get the exception shown. I have not found a single resource explaining how to get the record. I post the record to kinesis using `aws kinesis put-record --stream-name testStream --partition-key 1 --data {"somedata":"This Data"}`
1
answers
0
votes
9
views
asked 5 months ago

DB Log Processing through Kinesis Data streams and Time Series DB

Hi Team, I have an architecture based question, How Postgre SQL DB log processing can be captured through AWS lambda , aws Kinisis Data streams and finally Data should loads into Time Stream Database. Providing High level scenario: Draft Data flow : **Aurora Postgre DB** ----DB Logs Processing to ---->** Lambda** --->Ingestion to ----> **Kinesis Data Streams ** ---Process and Join context data and insert --- Insert to --------> **Time Stream Database** I believe , we can process / loads the AWS IoT (sensors , device data) to Time Stream Database through Lambda , Kinesis Streams , Kinesis Data analytics and finally Time series Database and we can do analytics on time series data . But i am not sure How the postgre SQL db logs (write ahead logs) process through Lambda and ingest through Kinesis streams and finally load into Time Stream Database . and above flow also required to Joins some tables like Event driven tables with associated Account , Customer tables and then it will load into Time Series Database . would like to know if above flow would be accurate , as we are not processing any sensors / devices data ( where sensors data captures all measures and dimensions data from device and loads into Time Stream DB ) so Time Series database always a primary database . if anyone can through some lights , how postgre sql db logs can be integrated with Time Stream database through Kinesis Data streams , Lambda . Need your help Thanks
1
answers
0
votes
18
views
asked 5 months ago
  • 1
  • 90 / page