AWS MSK connector configuration for S3

0

I want the file generated in S3 bucket with TIMESTAMP for AWS MSK connector.

I tried below two tags/config. which resulted in error.

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner timestamp.extractor=Wallclock

also

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner timestamp.extractor=RECORD

can you please suggest something / need help on this..

Thanks,

Nagesh
asked a year ago1644 views
2 Answers
0

Hi,

It seems your query is regarding Confluent S3 Sink Connector with Amazon MSK.

As per the Confluent documentation[1], Time Based Partitioner in S3 Sink Connector requires the following connector configuration properties:

  • path.format
  • partition.duration.ms
  • locale
  • timezone
  • timestamp.extractor

Sample connector configuration when using Time Based Partitioner [1]:

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-east-1
partition.duration.ms=600000
flush.size=1
schema.compatibility=NONE
tasks.max=2
topics=TopicName
timezone=UTC
locale=en-US
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter=org.apache.kafka.connect.storage.StringConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
timestamp.extractor=Record
s3.bucket.name=S3BucketName
key.converter=org.apache.kafka.connect.storage.StringConverter

Please include the required configuration properties in your connector configuration as mentioned above.

If the issue still persists, we would require details that are non-public information in order to troubleshoot further. Please open a support case with AWS using the referenced link[2].

[1] https://docs.confluent.io/kafka-connectors/s3-sink/current/overview.html#partitioning-records-into-s3-objects

[2] https://console.aws.amazon.com/support/home#/case/create

AWS
SUPPORT ENGINEER
answered a year ago
  • MSK connector created successfully.

    In S3 bucket, we are getting the separate folder like below.

    Hour=10 / Hour=11 /

    folder got created.

    Inside the folder, file name is coming as below.

    ddpd-testd-db.dbo.customers+0+0000000000.snappy.parquet

    but , what we require is the timestamp prefixing the above file like below.

    ddpd-testd-db.dbo.Timestamp_customers+0+0000000000.snappy.parquet

    ==================

    How to get the timestamp along with the File ? Not the separate folder timestamp..

    Thanks,

0

As per Confluent doc[1], names of the S3 object uploaded by the S3 connector follow this format:

<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>

Here, <encodedPartition> is generated by the S3 connector’s partitioner. Therefore, the parameter path.format defined in connector config as required by Time Based Partitioner config can only have control over <encodedPartition> part of the above string.

Whenever there is forward slash / in object name, S3 will treat it as a folder-like structure. You can define path.format config in your connector to something like below so that whole timestamp will be intact, but wherever there is forward slash in object name, folder structure will be generated in S3.

path.format=YYYY-MM-dd-HH'thHour'

Above config will create timestamp such as 2022-12-30-20thHour

For more customization options over S3 object names via connector config, you can reach out to Confluent Community support since the respective connector is offered by Confluent.

[1] https://docs.confluent.io/kafka-connectors/s3-sink/current/overview.html#s3-object-names

AWS
SUPPORT ENGINEER
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions