How to add "timestamp" field into each data record through msk connector?

0

Hello, I am using MSK Connector(S3SinkConnector) to write data into S3 from MSK cluster. Now I want to add a timestamp to mark the time of storing into s3. I found there is Kafka Connect Transformations InsertField feature (https://docs.confluent.io/platform/current/connect/transforms/insertfield.html#),

But when I add the below Transformations configuration into connector properties, the msk connector cannot be created successfully.

transforms=InsertField
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertField.timestamp.field=TimestampField

It seem msk connector cannot support transforms InsertField feature.

Is there any mistake from me or is there any way to add timestamp into data record through msk connector?

Below is my full connector properties:

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-east-1
partition.duration.ms=60000
transforms.InsertField.timestamp.field=TimestampField
topics.dir=<topic dir>
flush.size=9
schema.compatibility=NONE
tasks.max=3
topics=<topic>
timezone=UTC
transforms=InsertField
rotate.interval.ms=5000
locale=en-US
s3.compression.type=gzip
format.class=io.confluent.connect.s3.format.json.JsonFormat
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=<bucket>
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
AWS
gefragt vor einem Jahr597 Aufrufe
1 Antwort
0

I found these configurations are workable. Failed reason is the data in msk topic is json format, not Struct object. need to convert it to Struct if we use InsertField features.

AWS
beantwortet vor einem Jahr

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen