Issue: Kinesis Dynamic Partitioning error "errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"Non JSON record provided"
I am having issues getting Kinesis Dynamic Partitioning to process logs coming from CloudWatch logs after they are transformed via a Lambda function.
Current flow: CloudWatch log groups to Kinesis Firehose Delivery stream (with data transformation via Lambda + dynamic partitioning configured) to S3.
The logs in S3 are showing "errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"Non JSON record provided", however if I turn off Dynamic Partitioning then the logs in S3 are showing correctly as JSON-formatted as per my Lambda function (i.e. {"Account": "123456789012","LogGroup":"<loggroupname>","Log":"<logmessage>"}).
The error codes also include the raw data albeit in compressed/encoded form (i.e. "H4sIAAAAAAAAAJVUXXPaO....") however manually decompressing/decoding the data shows data in format below (taken from https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchlogs.html) and not the data after Lambda transformation :
{ "messageType": "DATA_MESSAGE", "owner": "123456789012", "logGroup": "/aws/lambda/echo-nodejs", "logStream": "2019/03/13/[$LATEST]94fa867e5374431291a7fc14e2f56ae7", "subscriptionFilters": [ "LambdaStream_cloudwatchlogs-node" ], "logEvents": [ { "id": "34622316099697884706540976068822859012661220141643892546", "timestamp": 1552518348220, "message": "REPORT RequestId: 6234bffe-149a-b642-81ff-2e8e376d8aff\tDuration: 46.84 ms\tBilled Duration: 47 ms \tMemory Size: 192 MB\tMax Memory Used: 72 MB\t\n" } ] }
My understanding of Kinesis Dynamic partitioning is it will process the data after it has been transformed by my Lambda function but it seems this is not the case and it is processing the raw data from CloudWatch logs. Can anybody shed any light on this?
Here is the Kinesis terraform code I am using for reference:
extended_s3_configuration {
role_arn = aws_iam_role.<redacted>.arn
bucket_arn = "arn:aws:s3:::<redacted>"
prefix = "!{partitionKeyFromQuery:Account}/!{partitionKeyFromQuery:LogGroup}/"
error_output_prefix = "processing-errors/"
buffer_size = 64
buffer_interval = 300
dynamic_partitioning_configuration {
enabled = "true"
}
processing_configuration {
enabled = "true"
processors {
type = "Lambda"
parameters {
parameter_name = "LambdaArn"
parameter_value = "${aws_lambda_function.decode_cloudwatch_logs.arn}:$LATEST"
}
}
processors {
type = "MetadataExtraction"
parameters {
parameter_name = "MetadataExtractionQuery"
parameter_value = "{Account:.Account, LogGroup:.LogGroup}"
}
parameters {
parameter_name = "JsonParsingEngine"
parameter_value = "JQ-1.6"
}
}
processors {
type = "RecordDeAggregation"
parameters {
parameter_name = "SubRecordType"
parameter_value = "JSON"
}
}
}
}
}
Thanks in advance
José
F.Y.I. I'm so sorry, what I said was wrong. Dynamic Partitioning can parse the json record returned by Lambda transform.
I misunderstood the processing steps. I thought JSON de-aggregation after Lambda, but in reality, Lambda after JSON de-aggregation. If Lambda returns aggregated JSON records, it will not be de-aggregated to each JSON record, so Dynamic Partitioning cannot parse it as JSON.
https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-s3