By using AWS re:Post, you agree to the Terms of Use
/Amazon Kinesis Data Firehose/

Questions tagged with Amazon Kinesis Data Firehose

Sort by most recent
  • 1
  • 90 / page

Browse through the questions and answers listed below or filter and sort to narrow down your results.

Why Records: [] is empty when i consume data from kinesis stream by python script?

i am trying to consume data using python script from kinesis data stream which is created successfully and data is produced or streamed to it successfully , but when running consumer script in python : ``` import boto3 import json from datetime import datetime import time my_stream_name = 'stream_name' kinesis_client = boto3.client('kinesis', region_name='us-east-1') response = kinesis_client.describe_stream(StreamName=my_stream_name) my_shard_id = response['StreamDescription']['Shards'][0]['ShardId'] shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name, ShardId=my_shard_id, ShardIteratorType='LATEST') my_shard_iterator = shard_iterator['ShardIterator'] record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator, Limit=2) while 'NextShardIterator' in record_response: record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'], Limit=2) print(record_response) # wait for 5 seconds time.sleep(5) ``` But the output of the message data is empty ('Records': []): ``` {'Records': [], 'NextShardIterator': 'AAAAAAAAAAFFVFpvvveOquLUe7WO9nZAcYNQdcS6f6a+YGrrrjZo1gULcu/ZYxC7AB+xVlUhgL9UFPrQ22qmcQa6iIsmuKWl26buBk3utXlVqiGuDUYSgqMOtkp0Y7pJwa6N/I0fYfl2PLTXp5Qz8+5ZYuTW1KDt+PeSU3992bwgdOm7744cxcSnYFaQuHqfa0vLlaRBTOACVz4fwjggUBN01WdsoEjKmgtfNmuHSA7s9LLNzAapMg==', 'MillisBehindLatest': 0, 'ResponseMetadata': {'RequestId': 'e451dd27-c867-cf3d-be83-edbe95e9da9f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e451dd27-c867-cf3d-be83-edbe95e9da9f', 'x-amz-id-2': 'ClSlC3gRJuEqL9YJcHgC2N/TLSv56o+6406ki2+Zohnfo/erFVMDpPqkEWT+XAeeHXCdhYBbnOeZBPyesbXnVs45KQG78eRU', 'date': 'Thu, 14 Apr 2022 14:23:21 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '308'}, 'RetryAttempts': 0}} ```
2
answers
0
votes
5
views
asked a month ago

How to set document id when delivering data from Kinesis Data Firehose to Opensearch index

What I'm trying to do: 1. I am using a kinesis data stream to ingest data from a python client sending in JSON's. 2. I setup a kinesis firehose delivery stream with source as the kinesis-data-stream from the previous step and destination as an index on opensearch. I also use a lambda to transform the stream before delivering to opensearch. Now i would like to set the document id for the record I'm transforming in the lambda . I tried setting the key on the transformed record object to be `id` but that creates a document attribute named `id` and sets a value to it. What i would like is to set the _id on the search doc.When i try to set the _id attribute directly by returning it within the transformed record back to firehose delivery stream it generated a destination error : ``` "message": "{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse field [_id] of type [_id] in document with id \\u002749627661727302149016659535428367209810930350656512327682.0\\u0027. Preview of field\\u0027s value: \\u00279b428743-b19a-4fb1-90f2-f132a423b2e8\\u0027\",\"caused_by\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.\"}}", "errorCode": "400", ``` Is there anyway to the doc _id for the documents loaded from firehose to opensearch or would i need to take a different approach by selecting the destination to be a http endpoint and using the rest API's provided by Opensearch (which would kinda be a hassle compared to directly being able to just set the _id attrib) ? What I'm really trying to do is update the indexed documents on a change event. I understand that firehose uses the bulk api from Opensearch , but am unsure about how the upsert operation is handled internally by the kinesis destination connector to opensearch. Hence specifying a fixed id from another DB would be ideal for both insert and update ops to Opensearch in my case.It would be super useful to atleast be able to dictate the type of operation based on some attribute of the kinesis record to opensearch with a reference id for the doc to update.
1
answers
0
votes
8
views
asked 2 months ago

Firehose to S3 with One Record Per Line

Hey all, On this [post](https://forums.aws.amazon.com/thread.jspa?threadID=244858&tstart=0&messageID=981022#981022) there is a solution to have a target rule on a Firehose to add a newline char to every JSON event. However, the solution is for the JS CDK version and doesn't work for the Python version (1.134.0). We tried to find a way to have this solution on Python but seems that the CDK doesn't map all the needed properties from JS to Python. For now, we have a very ugly workaround that manipulates the JSON template before sending it to CloudFormation. To create the target firehose we use the code below, where the problem is the RuleTargetInput that have just a few options and doesn't enable a custom InputTransformerProperty. ``` firehose_target = KinesisFirehoseStream( stream=self.delivery_stream, # Python CDK is not allowing Custom CfnRule.InputTransformerProperty # Makefile will make the workaround message=RuleTargetInput.from_text(f'{EventField.from_path("$")}'), ) ``` Piece of the JSON template generated by the CDK: ``` "Targets": [ { "Arn": { "Fn::GetAtt": [ "firehose", "Arn" ] }, "Id": "Target0", "InputTransformer": { "InputPathsMap": {"f1":"$"}, "InputTemplate": "\\"<f1>\\"" }, "RoleArn": { "Fn::GetAtt": [ "firehoseEventsRole1814C701", "Arn" ] } } ] ``` To manipulate the InputTransformer, we run the code below before sending it to CloudFormation: ``` jq -c . cdk.out/robotic-accounting-firehose.template.json \ | sed -e 's/"InputTransformer":{"InputPathsMap":{"f1":"$$"},"InputTemplate":"\\"<f1>\\""}/"InputTransformer":{"InputPathsMap":{},"InputTemplate":"<aws.events.event>\\n"}/g' \ | jq '.' > cdk.out/robotic-accounting-firehose.template.json.tmp rm cdk.out/robotic-accounting-firehose.template.json mv cdk.out/robotic-accounting-firehose.template.json.tmp cdk.out/robotic-accounting-firehose.template.json ``` That gives us the InputTransformer that we need and works: ``` "Targets": [ { "Arn": { "Fn::GetAtt": [ "firehose", "Arn" ] }, "Id": "Target0", "InputTransformer": { "InputPathsMap": {}, "InputTemplate": "<aws.events.event>\n" }, "RoleArn": { "Fn::GetAtt": [ "firehoseEventsRole1814C701", "Arn" ] } } ] ``` We know, it's horrible, but it works. Does someone else have this problem and a better solution? Does the CDK v2 solve this? Tks, Daniel
2
answers
0
votes
12
views
asked 4 months ago

Kinesis Dynamic Partitioning "Non JSON record provided" error

Issue: Kinesis Dynamic Partitioning error "errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"Non JSON record provided" I am having issues getting Kinesis Dynamic Partitioning to process logs coming from CloudWatch logs after they are transformed via a Lambda function. Current flow: CloudWatch log groups to Kinesis Firehose Delivery stream (with data transformation via Lambda + dynamic partitioning configured) to S3. The logs in S3 are showing "errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"Non JSON record provided", however if I turn off Dynamic Partitioning then the logs in S3 are showing correctly as JSON-formatted as per my Lambda function (i.e. {"Account": "123456789012","LogGroup":"<loggroupname>","Log":"<logmessage>"}). The error codes also include the raw data albeit in compressed/encoded form (i.e. "H4sIAAAAAAAAAJVUXXPaO....") however manually decompressing/decoding the data shows data in format below (taken from https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchlogs.html) and not the data after Lambda transformation : `{ "messageType": "DATA_MESSAGE", "owner": "123456789012", "logGroup": "/aws/lambda/echo-nodejs", "logStream": "2019/03/13/[$LATEST]94fa867e5374431291a7fc14e2f56ae7", "subscriptionFilters": [ "LambdaStream_cloudwatchlogs-node" ], "logEvents": [ { "id": "34622316099697884706540976068822859012661220141643892546", "timestamp": 1552518348220, "message": "REPORT RequestId: 6234bffe-149a-b642-81ff-2e8e376d8aff\tDuration: 46.84 ms\tBilled Duration: 47 ms \tMemory Size: 192 MB\tMax Memory Used: 72 MB\t\n" } ] }` My understanding of Kinesis Dynamic partitioning is it will process the data after it has been transformed by my Lambda function but it seems this is not the case and it is processing the raw data from CloudWatch logs. Can anybody shed any light on this? Here is the Kinesis terraform code I am using for reference: extended_s3_configuration { role_arn = aws_iam_role.<redacted>.arn bucket_arn = "arn:aws:s3:::<redacted>" prefix = "!{partitionKeyFromQuery:Account}/!{partitionKeyFromQuery:LogGroup}/" error_output_prefix = "processing-errors/" buffer_size = 64 buffer_interval = 300 dynamic_partitioning_configuration { enabled = "true" } processing_configuration { enabled = "true" processors { type = "Lambda" parameters { parameter_name = "LambdaArn" parameter_value = "${aws_lambda_function.decode_cloudwatch_logs.arn}:$LATEST" } } processors { type = "MetadataExtraction" parameters { parameter_name = "MetadataExtractionQuery" parameter_value = "{Account:.Account, LogGroup:.LogGroup}" } parameters { parameter_name = "JsonParsingEngine" parameter_value = "JQ-1.6" } } processors { type = "RecordDeAggregation" parameters { parameter_name = "SubRecordType" parameter_value = "JSON" } } } } } Thanks in advance José
2
answers
0
votes
176
views
asked 5 months ago
  • 1
  • 90 / page