I have created a custom S3 Sink Connector
{
"name": "s3-sink-connector",
"config": {
"connector.class": "com.example.S3SinkConnector",
"tasks.max": "1",
"topics": "new-test-topic",
"s3.bucket.name": "demo-kafka-s3",
"s3.region": "us-east-1",
"s3.part.size": "5242880",
"flush.size": "3",
"s3.batch.size": "10000",
"s3.batch.time.ms": "60000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"schema.registry.url": "http://schema-registry:8081",
}
}
package com.example;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.PutObjectRequest;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.*;
public class S3SinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(S3SinkTask.class);
private AmazonS3 s3Client;
private String bucketName;
private Map<String, List<GenericRecord>> topicBuffers;
private Map<String, Long> topicLastFlushTimes;
private int batchSize;
private long batchTimeMs;
private int eventCounter = 0;
private String schemaRegistryUrl;
private SchemaRegistryClient schemaRegistryClient;
private KafkaAvroDeserializer avroDeserializer;
@Override
public void start(Map<String, String> props) {
props.forEach((key, value) -> log.info("Property: {} = {}", key, value));
String accessKeyId = props.get(S3SinkConfig.AWS_ACCESS_KEY_ID);
String secretAccessKey = props.get(S3SinkConfig.AWS_SECRET_ACCESS_KEY);
bucketName = props.get(S3SinkConfig.S3_BUCKET_NAME);
schemaRegistryUrl = props.get(S3SinkConfig.SCHEMA_REGISTRY_URL);
if (schemaRegistryUrl == null) {
log.error("Schema Registry URL is null");
} else {
log.info("Schema Registry URL: {}", schemaRegistryUrl);
}
BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey);
s3Client = AmazonS3ClientBuilder.standard()
.withRegion(Regions.fromName(props.get(S3SinkConfig.S3_REGION)))
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();
topicBuffers = new HashMap<>();
topicLastFlushTimes = new HashMap<>();
batchSize = Integer.parseInt(props.get(S3SinkConfig.S3_BATCH_SIZE));
batchTimeMs = Long.parseLong(props.get(S3SinkConfig.S3_BATCH_TIME_MS));
try {
schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100);
avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
} catch (Exception e) {
log.error("Failed to create SchemaRegistryClient", e);
}
}
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record : records) {
String topic = record.topic();
try {
Schema schema = getSchemaForTopic(topic);
GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, (byte[]) record.value());
topicBuffers.computeIfAbsent(topic, k -> new ArrayList<>()).add(avroRecord);
} catch (Exception e) {
log.error("Failed to process record: {}", record, e);
}
if (topicBuffers.get(topic).size() >= batchSize ||
(System.currentTimeMillis() - topicLastFlushTimes.getOrDefault(topic, 0L)) >= batchTimeMs) {
flushRecords(topic);
topicLastFlushTimes.put(topic, System.currentTimeMillis());
}
}
}
private Schema getSchemaForTopic(String topic) throws Exception {
String subject = topic + "-value";
SchemaMetadata schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(subject);
String schemaString = schemaMetadata.getSchema();
return new Schema.Parser().parse(schemaString);
}
private void flushRecords(String topic) {
if (!topicBuffers.get(topic).isEmpty()) {
try {
File tempFile = File.createTempFile("kafka-parquet-", ".parquet");
tempFile.deleteOnExit();
Path path = new Path(tempFile.getAbsolutePath());
ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(path)
.withSchema(getSchemaForTopic(topic))
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withConf(new Configuration())
.build();
for (GenericRecord record : topicBuffers.get(topic)) {
writer.write(record);
}
writer.close();
String key = String.format("%s/%s", topic, generateFileKey());
s3Client.putObject(new PutObjectRequest(bucketName, key, tempFile));
topicBuffers.get(topic).clear();
tempFile.delete();
} catch (Exception e) {
log.error("Failed to flush records for topic: {}", topic, e);
}
}
}
private String generateFileKey() {
eventCounter++;
String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
return String.format("event%d-%s.parquet", eventCounter, timestamp);
}
@Override
public void stop() {
for (String topic : topicBuffers.keySet()) {
if (!topicBuffers.get(topic).isEmpty()) {
flushRecords(topic);
}
}
}
@Override
public String version() {
return "1.0";
}
}
ERRORS:
C:\Users\ADMIN\Downloads\Projects\custom-s3-sink-connector>curl -X GET http://localhost:8083/connectors/s3-sink-connector/status
{"name":"s3-sink-connector","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.kafka.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.\n\tat org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493)\n\tat org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)\n\tat org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:112)\n\tat org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:132)\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.<init>(AbstractKafkaSchemaSerDeConfig.java:328)\n\tat io.confluent.connect.avro.AvroConverterConfig.<init>(AvroConverterConfig.java:27)\n\tat io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:68)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:328)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:618)\n\tat org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:521)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1722)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$31(DistributedHerder.java:1772)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n"}],"type":"sink"}
C:\Users\ADMIN\Downloads\Projects\custom-s3-sink-connector>
[2024-07-13 07:35:58,798] ERROR Failed to start task s3-sink-connector-0 (org.apache.kafka.connect.runtime.Worker)
org.apache.kafka.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:112)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:132)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.<init>(AbstractKafkaSchemaSerDeConfig.java:328)
at io.confluent.connect.avro.AvroConverterConfig.<init>(AvroConverterConfig.java:27)
at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:68)
at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:328)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:618)
at org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:521)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1722)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$31(DistributedHerder.java:1772)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)