MSK TimeBasedPartition resulting in error

0

Hi Team, I want the s3 bucket files to be prefixed with Timestamp of generation . i tried using the below tags.

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner timestamp.extractor=Wallclock

or

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner timestamp.extractor=Record

both of the resulting in some java error.

log look like:

[Worker-0d72378639a66c465] org.glassfish.jersey.internal.Errors logErrors

and

Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)

Nagesh
asked a year ago322 views
1 Answer
0

Hello

I understand that you would like to add the timestamp to the s3 objects written by the s3-sink connector.

The s3 sink connector writes with the ojects to s3 with the format "<topic>+<kafkaPartition>+<startOffset>.<format>", refer [1] for the details of the format.

You can modify the object name either by using lamdba function to read the messages in Kafka topics, refer[2] for further details Kinesis firehose can also be used to write the messages in kafka topic to s3 with required format, refer[3] for further details.

With regards to the error that you have mentioned, can you kindly open a case with AWS Support to investigate with Cluster, connector and IAM details.

References:

[1] https://docs.confluent.io/kafka-connectors/s3-sink/current/overview.html#s3-object-names/

[2] https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/

[3] https://aws.amazon.com/blogs/big-data/kinesis-data-firehose-now-supports-dynamic-partitioning-to-amazon-s3/

AWS
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions

Relevant content