How to add "timestamp" field into each data record through msk connector?

0

Hello, I am using MSK Connector(S3SinkConnector) to write data into S3 from MSK cluster. Now I want to add a timestamp to mark the time of storing into s3. I found there is Kafka Connect Transformations InsertField feature (https://docs.confluent.io/platform/current/connect/transforms/insertfield.html#),

But when I add the below Transformations configuration into connector properties, the msk connector cannot be created successfully.

transforms=InsertField
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertField.timestamp.field=TimestampField

It seem msk connector cannot support transforms InsertField feature.

Is there any mistake from me or is there any way to add timestamp into data record through msk connector?

Below is my full connector properties:

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-east-1
partition.duration.ms=60000
transforms.InsertField.timestamp.field=TimestampField
topics.dir=<topic dir>
flush.size=9
schema.compatibility=NONE
tasks.max=3
topics=<topic>
timezone=UTC
transforms=InsertField
rotate.interval.ms=5000
locale=en-US
s3.compression.type=gzip
format.class=io.confluent.connect.s3.format.json.JsonFormat
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=<bucket>
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
AWS
posta un anno fa596 visualizzazioni
1 Risposta
0

I found these configurations are workable. Failed reason is the data in msk topic is json format, not Struct object. need to convert it to Struct if we use InsertField features.

AWS
con risposta un anno fa

Accesso non effettuato. Accedi per postare una risposta.

Una buona risposta soddisfa chiaramente la domanda, fornisce un feedback costruttivo e incoraggia la crescita professionale del richiedente.

Linee guida per rispondere alle domande