Skip to content

Amazon S3 Sink Connector

0

Can somebody help with writing a connector plugin code for provisioning s3 sink connector to read messages from MSK Kafka topic and dump the messages into S3 Bucket. I need help in writing the code for plugin.

asked 2 years ago420 views
1 Answer
-1

Prerequisites

  • Apache Kafka Cluster: You have an MSK cluster running.
  • S3 Bucket: You have an S3 bucket where you want to dump the messages.
  • Kafka Connect: You have Kafka Connect set up and running.

Step-by-Step Guide

  1. Install Kafka Connect and S3 Sink Connector Ensure Kafka Connect is installed. You can install the Confluent S3 Sink Connector by downloading the appropriate JAR files.

  2. Configure the S3 Sink Connector Create a configuration file s3-sink-connector.properties for the S3 Sink Connector with the necessary settings.

name=s3-sink-connector
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=your-topic-name

# AWS Credentials
aws.access.key.id=your-access-key-id
aws.secret.access.key=your-secret-access-key

# S3 bucket configuration
s3.bucket.name=your-s3-bucket
s3.region=us-east-1
s3.part.size=5242880

# File format and schema configuration
format.class=io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
flush.size=3

# Kafka configuration
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

# Other configurations
storage.class=io.confluent.connect.s3.storage.S3Storage
schema.compatibility=NONE

Replace the placeholders with your specific values:

  • your-topic-name: The name of your Kafka topic.
  • your-access-key-id and your-secret-access-key: Your AWS access key and secret key.
  • your-s3-bucket: The name of your S3 bucket.
  • us-east-1: The AWS region where your S3 bucket is located.
  1. Start Kafka Connect with the S3 Sink Connector

Run Kafka Connect with the configuration file. This can be done by using the Kafka Connect REST API or by starting the connector directly if you have Kafka Connect running as a standalone process.

Using Kafka Connect REST API:

curl -X POST -H "Content-Type: application/json" --data @s3-sink-connector.json http://localhost:8083/connectors

s3-sink-connector.json:

{
  "name": "s3-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "1",
    "topics": "your-topic-name",
    "aws.access.key.id": "your-access-key-id",
    "aws.secret.access.key": "your-secret-access-key",
    "s3.bucket.name": "your-s3-bucket",
    "s3.region": "us-east-1",
    "s3.part.size": "5242880",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "flush.size": "3",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "schema.compatibility": "NONE"
  }
}
  1. Verify the Connector

After deploying the connector, it should start consuming messages from the specified Kafka topic and writing them to your S3 bucket.

You can monitor the connector logs to ensure it’s working correctly and check your S3 bucket to verify that the messages are being dumped as expected.

Summary

By following these steps, you can set up an S3 Sink Connector to read messages from an MSK Kafka topic and dump them into an S3 bucket. The provided configurations should help you get started, but you may need to adjust them based on your specific requirements and environment. If you encounter any issues, refer to the Kafka Connect and Confluent documentation for further troubleshooting and configuration options.

EXPERT
answered 2 years ago
EXPERT
reviewed 2 years ago
  • Thank you so much for the answer But my team doesn't allow downloading s3 sink connector from confluence They want me to develop code for the plugin build jar file for the same Is there anything you could help here please

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.