Kinesis Dynamic Partitioning "Non JSON record provided" error

0

Issue: Kinesis Dynamic Partitioning error "errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"Non JSON record provided"

I am having issues getting Kinesis Dynamic Partitioning to process logs coming from CloudWatch logs after they are transformed via a Lambda function.

Current flow: CloudWatch log groups to Kinesis Firehose Delivery stream (with data transformation via Lambda + dynamic partitioning configured) to S3.

The logs in S3 are showing "errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"Non JSON record provided", however if I turn off Dynamic Partitioning then the logs in S3 are showing correctly as JSON-formatted as per my Lambda function (i.e. {"Account": "123456789012","LogGroup":"<loggroupname>","Log":"<logmessage>"}).

The error codes also include the raw data albeit in compressed/encoded form (i.e. "H4sIAAAAAAAAAJVUXXPaO....") however manually decompressing/decoding the data shows data in format below (taken from https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchlogs.html) and not the data after Lambda transformation : { "messageType": "DATA_MESSAGE", "owner": "123456789012", "logGroup": "/aws/lambda/echo-nodejs", "logStream": "2019/03/13/[$LATEST]94fa867e5374431291a7fc14e2f56ae7", "subscriptionFilters": [ "LambdaStream_cloudwatchlogs-node" ], "logEvents": [ { "id": "34622316099697884706540976068822859012661220141643892546", "timestamp": 1552518348220, "message": "REPORT RequestId: 6234bffe-149a-b642-81ff-2e8e376d8aff\tDuration: 46.84 ms\tBilled Duration: 47 ms \tMemory Size: 192 MB\tMax Memory Used: 72 MB\t\n" } ] }

My understanding of Kinesis Dynamic partitioning is it will process the data after it has been transformed by my Lambda function but it seems this is not the case and it is processing the raw data from CloudWatch logs. Can anybody shed any light on this?

Here is the Kinesis terraform code I am using for reference: extended_s3_configuration { role_arn = aws_iam_role.<redacted>.arn bucket_arn = "arn:aws:s3:::<redacted>" prefix = "!{partitionKeyFromQuery:Account}/!{partitionKeyFromQuery:LogGroup}/" error_output_prefix = "processing-errors/" buffer_size = 64 buffer_interval = 300

dynamic_partitioning_configuration {
  enabled = "true"
}

processing_configuration {
  enabled = "true"

  processors {
    type = "Lambda"

    parameters {
      parameter_name  = "LambdaArn"
      parameter_value = "${aws_lambda_function.decode_cloudwatch_logs.arn}:$LATEST"
    }
  }
  processors {
    type = "MetadataExtraction"

    parameters {
        parameter_name  = "MetadataExtractionQuery"
        parameter_value = "{Account:.Account, LogGroup:.LogGroup}"
    }
    parameters {
        parameter_name  = "JsonParsingEngine"
        parameter_value = "JQ-1.6"
    }
  }
  processors {
    type = "RecordDeAggregation"

    parameters {
        parameter_name  = "SubRecordType"
        parameter_value = "JSON"
    }
  }
}

} }

Thanks in advance José

2 Respuestas
0

thanks suzuats, do you have a source for that info regarding the the inability of Kinesis Dynamic Partitioning to process data transformed by Lambda? i will also try what you suggested, thanks for the suggestion

respondido hace 3 años
  • F.Y.I. I'm so sorry, what I said was wrong. Dynamic Partitioning can parse the json record returned by Lambda transform.

    I misunderstood the processing steps. I thought JSON de-aggregation after Lambda, but in reality, Lambda after JSON de-aggregation. If Lambda returns aggregated JSON records, it will not be de-aggregated to each JSON record, so Dynamic Partitioning cannot parse it as JSON.

    https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-s3

0

As I remember, in the case, KFH Lambda Transform needs to return the records encorded as base64, but KFH Dynamic Partitioning(DP) cannot query it. KFH Lambda Transform can return the metadata like as 'metadata': {'partitionKeys': {'key': 'value'}} in the response and you can specify the following format "!{partitionKeyFromLambda:key}" as prefix in DP. Have a try partitionKeyFromLambda not partitionKeyFromQuery.

https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html#dynamic-partitioning-partitioning-keys

hope this helps.

AWS
suzuki
respondido hace 3 años

No has iniciado sesión. Iniciar sesión para publicar una respuesta.

Una buena respuesta responde claramente a la pregunta, proporciona comentarios constructivos y fomenta el crecimiento profesional en la persona que hace la pregunta.

Pautas para responder preguntas