How to add "timestamp" field into each data record through msk connector?

0

Hello, I am using MSK Connector(S3SinkConnector) to write data into S3 from MSK cluster. Now I want to add a timestamp to mark the time of storing into s3. I found there is Kafka Connect Transformations InsertField feature (https://docs.confluent.io/platform/current/connect/transforms/insertfield.html#),

But when I add the below Transformations configuration into connector properties, the msk connector cannot be created successfully.

transforms=InsertField
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertField.timestamp.field=TimestampField

It seem msk connector cannot support transforms InsertField feature.

Is there any mistake from me or is there any way to add timestamp into data record through msk connector?

Below is my full connector properties:

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-east-1
partition.duration.ms=60000
transforms.InsertField.timestamp.field=TimestampField
topics.dir=<topic dir>
flush.size=9
schema.compatibility=NONE
tasks.max=3
topics=<topic>
timezone=UTC
transforms=InsertField
rotate.interval.ms=5000
locale=en-US
s3.compression.type=gzip
format.class=io.confluent.connect.s3.format.json.JsonFormat
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=<bucket>
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
AWS
已提問 1 年前檢視次數 597 次
1 個回答
0

I found these configurations are workable. Failed reason is the data in msk topic is json format, not Struct object. need to convert it to Struct if we use InsertField features.

AWS
已回答 1 年前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南