AWS MSK connector configuration for S3

0

I want the file generated in S3 bucket with TIMESTAMP for AWS MSK connector.

I tried below two tags/config. which resulted in error.

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner timestamp.extractor=Wallclock

also

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner timestamp.extractor=RECORD

can you please suggest something / need help on this..

Thanks,

Nagesh
질문됨 일 년 전1702회 조회
2개 답변
0

Hi,

It seems your query is regarding Confluent S3 Sink Connector with Amazon MSK.

As per the Confluent documentation[1], Time Based Partitioner in S3 Sink Connector requires the following connector configuration properties:

  • path.format
  • partition.duration.ms
  • locale
  • timezone
  • timestamp.extractor

Sample connector configuration when using Time Based Partitioner [1]:

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=us-east-1
partition.duration.ms=600000
flush.size=1
schema.compatibility=NONE
tasks.max=2
topics=TopicName
timezone=UTC
locale=en-US
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter=org.apache.kafka.connect.storage.StringConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
timestamp.extractor=Record
s3.bucket.name=S3BucketName
key.converter=org.apache.kafka.connect.storage.StringConverter

Please include the required configuration properties in your connector configuration as mentioned above.

If the issue still persists, we would require details that are non-public information in order to troubleshoot further. Please open a support case with AWS using the referenced link[2].

[1] https://docs.confluent.io/kafka-connectors/s3-sink/current/overview.html#partitioning-records-into-s3-objects

[2] https://console.aws.amazon.com/support/home#/case/create

AWS
지원 엔지니어
답변함 일 년 전
  • MSK connector created successfully.

    In S3 bucket, we are getting the separate folder like below.

    Hour=10 / Hour=11 /

    folder got created.

    Inside the folder, file name is coming as below.

    ddpd-testd-db.dbo.customers+0+0000000000.snappy.parquet

    but , what we require is the timestamp prefixing the above file like below.

    ddpd-testd-db.dbo.Timestamp_customers+0+0000000000.snappy.parquet

    ==================

    How to get the timestamp along with the File ? Not the separate folder timestamp..

    Thanks,

0

As per Confluent doc[1], names of the S3 object uploaded by the S3 connector follow this format:

<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>

Here, <encodedPartition> is generated by the S3 connector’s partitioner. Therefore, the parameter path.format defined in connector config as required by Time Based Partitioner config can only have control over <encodedPartition> part of the above string.

Whenever there is forward slash / in object name, S3 will treat it as a folder-like structure. You can define path.format config in your connector to something like below so that whole timestamp will be intact, but wherever there is forward slash in object name, folder structure will be generated in S3.

path.format=YYYY-MM-dd-HH'thHour'

Above config will create timestamp such as 2022-12-30-20thHour

For more customization options over S3 object names via connector config, you can reach out to Confluent Community support since the respective connector is offered by Confluent.

[1] https://docs.confluent.io/kafka-connectors/s3-sink/current/overview.html#s3-object-names

AWS
지원 엔지니어
답변함 일 년 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠