Kinesis Firehose transformation json output


I am using a lambda to perform some transformation on records buffered into a kinesis stream.

This is my lambda (data is ingested in Gzip):

ef lambda_handler(event, context):
    output = []

    for record in event['records']:
        data = base64.b64decode(record['data'])
        content = gzip.decompress(data).decode('utf-8')
        payload = json.loads(content)
        message = payload["logEvents"][0]["message"]
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(message).encode('utf-8')).decode('utf-8')

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}

The issue is that Kinesis does not understand that the data sent back is in json and output as escaped strings:

"{\"id\": 123, \"name\" : \"Andrea\"}"

Any suggestion? I need the data back in json format to query that from Athena

profile picture
asked 2 months ago521 views
1 Answer

Solved it. The message extracted from:

message = payload["logEvents"][0]["message"]

is actually a string. I did not notice this at first sight. If it contains a json you must load it before dump again.

profile picture
answered 2 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions