How to add "timestamp" field into each data record through msk connector?

0

Hello, I am using MSK Connector(S3SinkConnector) to write data into S3 from MSK cluster. Now I want to add a timestamp to mark the time of storing into s3. I found there is Kafka Connect Transformations InsertField feature (https://docs.confluent.io/platform/current/connect/transforms/insertfield.html#),

But when I add the below Transformations configuration into connector properties, the msk connector cannot be created successfully.

transforms=InsertField
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertField.timestamp.field=TimestampField

It seem msk connector cannot support transforms InsertField feature.

Is there any mistake from me or is there any way to add timestamp into data record through msk connector?

Below is my full connector properties:

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-east-1
partition.duration.ms=60000
transforms.InsertField.timestamp.field=TimestampField
topics.dir=<topic dir>
flush.size=9
schema.compatibility=NONE
tasks.max=3
topics=<topic>
timezone=UTC
transforms=InsertField
rotate.interval.ms=5000
locale=en-US
s3.compression.type=gzip
format.class=io.confluent.connect.s3.format.json.JsonFormat
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=<bucket>
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
AWS
demandé il y a un an597 vues
1 réponse
0

I found these configurations are workable. Failed reason is the data in msk topic is json format, not Struct object. need to convert it to Struct if we use InsertField features.

AWS
répondu il y a un an

Vous n'êtes pas connecté. Se connecter pour publier une réponse.

Une bonne réponse répond clairement à la question, contient des commentaires constructifs et encourage le développement professionnel de la personne qui pose la question.

Instructions pour répondre aux questions