How to add "timestamp" field into each data record through msk connector?

0

Hello, I am using MSK Connector(S3SinkConnector) to write data into S3 from MSK cluster. Now I want to add a timestamp to mark the time of storing into s3. I found there is Kafka Connect Transformations InsertField feature (https://docs.confluent.io/platform/current/connect/transforms/insertfield.html#),

But when I add the below Transformations configuration into connector properties, the msk connector cannot be created successfully.

transforms=InsertField
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertField.timestamp.field=TimestampField

It seem msk connector cannot support transforms InsertField feature.

Is there any mistake from me or is there any way to add timestamp into data record through msk connector?

Below is my full connector properties:

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-east-1
partition.duration.ms=60000
transforms.InsertField.timestamp.field=TimestampField
topics.dir=<topic dir>
flush.size=9
schema.compatibility=NONE
tasks.max=3
topics=<topic>
timezone=UTC
transforms=InsertField
rotate.interval.ms=5000
locale=en-US
s3.compression.type=gzip
format.class=io.confluent.connect.s3.format.json.JsonFormat
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=<bucket>
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
AWS
질문됨 일 년 전597회 조회
1개 답변
0

I found these configurations are workable. Failed reason is the data in msk topic is json format, not Struct object. need to convert it to Struct if we use InsertField features.

AWS
답변함 일 년 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인