Kinesis Dynamic Partitioning "Non JSON record provided" error

0

Issue: Kinesis Dynamic Partitioning error "errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"Non JSON record provided"

I am having issues getting Kinesis Dynamic Partitioning to process logs coming from CloudWatch logs after they are transformed via a Lambda function.

Current flow: CloudWatch log groups to Kinesis Firehose Delivery stream (with data transformation via Lambda + dynamic partitioning configured) to S3.

The logs in S3 are showing "errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"Non JSON record provided", however if I turn off Dynamic Partitioning then the logs in S3 are showing correctly as JSON-formatted as per my Lambda function (i.e. {"Account": "123456789012","LogGroup":"<loggroupname>","Log":"<logmessage>"}).

The error codes also include the raw data albeit in compressed/encoded form (i.e. "H4sIAAAAAAAAAJVUXXPaO....") however manually decompressing/decoding the data shows data in format below (taken from https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchlogs.html) and not the data after Lambda transformation : { "messageType": "DATA_MESSAGE", "owner": "123456789012", "logGroup": "/aws/lambda/echo-nodejs", "logStream": "2019/03/13/[$LATEST]94fa867e5374431291a7fc14e2f56ae7", "subscriptionFilters": [ "LambdaStream_cloudwatchlogs-node" ], "logEvents": [ { "id": "34622316099697884706540976068822859012661220141643892546", "timestamp": 1552518348220, "message": "REPORT RequestId: 6234bffe-149a-b642-81ff-2e8e376d8aff\tDuration: 46.84 ms\tBilled Duration: 47 ms \tMemory Size: 192 MB\tMax Memory Used: 72 MB\t\n" } ] }

My understanding of Kinesis Dynamic partitioning is it will process the data after it has been transformed by my Lambda function but it seems this is not the case and it is processing the raw data from CloudWatch logs. Can anybody shed any light on this?

Here is the Kinesis terraform code I am using for reference: extended_s3_configuration { role_arn = aws_iam_role.<redacted>.arn bucket_arn = "arn:aws:s3:::<redacted>" prefix = "!{partitionKeyFromQuery:Account}/!{partitionKeyFromQuery:LogGroup}/" error_output_prefix = "processing-errors/" buffer_size = 64 buffer_interval = 300

dynamic_partitioning_configuration {
  enabled = "true"
}

processing_configuration {
  enabled = "true"

  processors {
    type = "Lambda"

    parameters {
      parameter_name  = "LambdaArn"
      parameter_value = "${aws_lambda_function.decode_cloudwatch_logs.arn}:$LATEST"
    }
  }
  processors {
    type = "MetadataExtraction"

    parameters {
        parameter_name  = "MetadataExtractionQuery"
        parameter_value = "{Account:.Account, LogGroup:.LogGroup}"
    }
    parameters {
        parameter_name  = "JsonParsingEngine"
        parameter_value = "JQ-1.6"
    }
  }
  processors {
    type = "RecordDeAggregation"

    parameters {
        parameter_name  = "SubRecordType"
        parameter_value = "JSON"
    }
  }
}

} }

Thanks in advance José

2 réponses
0

thanks suzuats, do you have a source for that info regarding the the inability of Kinesis Dynamic Partitioning to process data transformed by Lambda? i will also try what you suggested, thanks for the suggestion

répondu il y a 3 ans
  • F.Y.I. I'm so sorry, what I said was wrong. Dynamic Partitioning can parse the json record returned by Lambda transform.

    I misunderstood the processing steps. I thought JSON de-aggregation after Lambda, but in reality, Lambda after JSON de-aggregation. If Lambda returns aggregated JSON records, it will not be de-aggregated to each JSON record, so Dynamic Partitioning cannot parse it as JSON.

    https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-s3

0

As I remember, in the case, KFH Lambda Transform needs to return the records encorded as base64, but KFH Dynamic Partitioning(DP) cannot query it. KFH Lambda Transform can return the metadata like as 'metadata': {'partitionKeys': {'key': 'value'}} in the response and you can specify the following format "!{partitionKeyFromLambda:key}" as prefix in DP. Have a try partitionKeyFromLambda not partitionKeyFromQuery.

https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html#dynamic-partitioning-partitioning-keys

hope this helps.

AWS
suzuki
répondu il y a 3 ans

Vous n'êtes pas connecté. Se connecter pour publier une réponse.

Une bonne réponse répond clairement à la question, contient des commentaires constructifs et encourage le développement professionnel de la personne qui pose la question.

Instructions pour répondre aux questions