Firehose to S3 with One Record Per Line

0

Hey all,

On this post there is a solution to have a target rule on a Firehose to add a newline char to every JSON event. However, the solution is for the JS CDK version and doesn't work for the Python version (1.134.0). We tried to find a way to have this solution on Python but seems that the CDK doesn't map all the needed properties from JS to Python.

For now, we have a very ugly workaround that manipulates the JSON template before sending it to CloudFormation.

To create the target firehose we use the code below, where the problem is the RuleTargetInput that have just a few options and doesn't enable a custom InputTransformerProperty.

        firehose_target = KinesisFirehoseStream(
            stream=self.delivery_stream,
            # Python CDK is not allowing Custom CfnRule.InputTransformerProperty
            # Makefile will make the workaround
            message=RuleTargetInput.from_text(f'{EventField.from_path("$")}'),
        )

Piece of the JSON template generated by the CDK:

        "Targets": [
          {
            "Arn": {
              "Fn::GetAtt": [
                "firehose",
                "Arn"
              ]
            },
            "Id": "Target0",
            "InputTransformer": {
              "InputPathsMap": {"f1":"$"},
              "InputTemplate": "\\"<f1>\\""
            },
            "RoleArn": {
              "Fn::GetAtt": [
                "firehoseEventsRole1814C701",
                "Arn"
              ]
            }
          }
        ]

To manipulate the InputTransformer, we run the code below before sending it to CloudFormation:

	jq -c . cdk.out/robotic-accounting-firehose.template.json \
		| sed -e 's/"InputTransformer":{"InputPathsMap":{"f1":"$$"},"InputTemplate":"\\"<f1>\\""}/"InputTransformer":{"InputPathsMap":{},"InputTemplate":"<aws.events.event>\\n"}/g' \
		| jq '.' > cdk.out/robotic-accounting-firehose.template.json.tmp
	rm cdk.out/robotic-accounting-firehose.template.json
	mv cdk.out/robotic-accounting-firehose.template.json.tmp cdk.out/robotic-accounting-firehose.template.json

That gives us the InputTransformer that we need and works:

        "Targets": [
          {
            "Arn": {
              "Fn::GetAtt": [
                "firehose",
                "Arn"
              ]
            },
            "Id": "Target0",
            "InputTransformer": {
              "InputPathsMap": {},
              "InputTemplate": "<aws.events.event>\n"
            },
            "RoleArn": {
              "Fn::GetAtt": [
                "firehoseEventsRole1814C701",
                "Arn"
              ]
            }
          }
        ]

We know, it's horrible, but it works.

Does someone else have this problem and a better solution? Does the CDK v2 solve this?

Tks, Daniel

demandé il y a 2 ans2342 vues
2 réponses
1

Hi there, you can implement the lambda transformation function to add new line to the records. Here the reference article - Append Newline to Amazon Kinesis Firehose JSON Formatted Records with Python and AWS Lambda - https://medium.com/analytics-vidhya/append-newline-to-amazon-kinesis-firehose-json-formatted-records-with-python-f58498d0177a

Another option is to use dynamic partitioning, but if you would like to have only a new line and does not require the partitioning in s3, this wouldn't be an option and Dynamic partitioning is expensive than lambda transformation. Considering the expense of dynamic partitioning and limitation of Python version of CDK, recommended approach would be to use Lambda transformation.

AWS
INGÉNIEUR EN ASSISTANCE TECHNIQUE
répondu il y a 2 ans
0

Hey Harshith,

I know that our current solution isn't the best one but, at least, we avoid having lambdas and paying for them. It's really unfortunate that the CDK Python version has such a limitation but your workaround is a cheaper option.

Tks, Daniel

répondu il y a 2 ans

Vous n'êtes pas connecté. Se connecter pour publier une réponse.

Une bonne réponse répond clairement à la question, contient des commentaires constructifs et encourage le développement professionnel de la personne qui pose la question.

Instructions pour répondre aux questions