Skip to content

Custom S3 Sink Connector(Please Help)!!

0

I have created a custom S3 Sink Connector { "name": "s3-sink-connector", "config": { "connector.class": "com.example.S3SinkConnector", "tasks.max": "1", "topics": "new-test-topic", "s3.bucket.name": "demo-kafka-s3", "s3.region": "us-east-1", "s3.part.size": "5242880", "flush.size": "3", "s3.batch.size": "10000", "s3.batch.time.ms": "60000", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "schema.registry.url": "http://schema-registry:8081", } }

package com.example;

import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.PutObjectRequest; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.serializers.KafkaAvroDeserializer;

import java.io.File; import java.text.SimpleDateFormat; import java.util.*;

public class S3SinkTask extends SinkTask { private static final Logger log = LoggerFactory.getLogger(S3SinkTask.class);

private AmazonS3 s3Client;
private String bucketName;
private Map<String, List<GenericRecord>> topicBuffers;
private Map<String, Long> topicLastFlushTimes;
private int batchSize;
private long batchTimeMs;
private int eventCounter = 0;
private String schemaRegistryUrl;
private SchemaRegistryClient schemaRegistryClient;
private KafkaAvroDeserializer avroDeserializer;

@Override
public void start(Map<String, String> props) {
    props.forEach((key, value) -> log.info("Property: {} = {}", key, value));

    String accessKeyId = props.get(S3SinkConfig.AWS_ACCESS_KEY_ID);
    String secretAccessKey = props.get(S3SinkConfig.AWS_SECRET_ACCESS_KEY);
    bucketName = props.get(S3SinkConfig.S3_BUCKET_NAME);
    schemaRegistryUrl = props.get(S3SinkConfig.SCHEMA_REGISTRY_URL);

    if (schemaRegistryUrl == null) {
        log.error("Schema Registry URL is null");
    } else {
        log.info("Schema Registry URL: {}", schemaRegistryUrl);
    }

    BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey);
    s3Client = AmazonS3ClientBuilder.standard()
            .withRegion(Regions.fromName(props.get(S3SinkConfig.S3_REGION)))
            .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
            .build();

    topicBuffers = new HashMap<>();
    topicLastFlushTimes = new HashMap<>();

    batchSize = Integer.parseInt(props.get(S3SinkConfig.S3_BATCH_SIZE));
    batchTimeMs = Long.parseLong(props.get(S3SinkConfig.S3_BATCH_TIME_MS));

    try {
        schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100);
        avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
    } catch (Exception e) {
        log.error("Failed to create SchemaRegistryClient", e);
    }
}

@Override
public void put(Collection<SinkRecord> records) {
    for (SinkRecord record : records) {
        String topic = record.topic();
        try {
            Schema schema = getSchemaForTopic(topic);
            GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, (byte[]) record.value());
            topicBuffers.computeIfAbsent(topic, k -> new ArrayList<>()).add(avroRecord);
        } catch (Exception e) {
            log.error("Failed to process record: {}", record, e);
        }

        if (topicBuffers.get(topic).size() >= batchSize ||
                (System.currentTimeMillis() - topicLastFlushTimes.getOrDefault(topic, 0L)) >= batchTimeMs) {
            flushRecords(topic);
            topicLastFlushTimes.put(topic, System.currentTimeMillis());
        }
    }
}

private Schema getSchemaForTopic(String topic) throws Exception {
    String subject = topic + "-value";
    SchemaMetadata schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(subject);
    String schemaString = schemaMetadata.getSchema();
    return new Schema.Parser().parse(schemaString);
}

private void flushRecords(String topic) {
    if (!topicBuffers.get(topic).isEmpty()) {
        try {
            File tempFile = File.createTempFile("kafka-parquet-", ".parquet");
            tempFile.deleteOnExit();

            Path path = new Path(tempFile.getAbsolutePath());
            ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(path)
                    .withSchema(getSchemaForTopic(topic))
                    .withCompressionCodec(CompressionCodecName.SNAPPY)
                    .withConf(new Configuration())
                    .build();

            for (GenericRecord record : topicBuffers.get(topic)) {
                writer.write(record);
            }

            writer.close();

            String key = String.format("%s/%s", topic, generateFileKey());
            s3Client.putObject(new PutObjectRequest(bucketName, key, tempFile));
            topicBuffers.get(topic).clear();

            tempFile.delete();
        } catch (Exception e) {
            log.error("Failed to flush records for topic: {}", topic, e);
        }
    }
}

private String generateFileKey() {
    eventCounter++;
    String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
    return String.format("event%d-%s.parquet", eventCounter, timestamp);
}

@Override
public void stop() {
    for (String topic : topicBuffers.keySet()) {
        if (!topicBuffers.get(topic).isEmpty()) {
            flushRecords(topic);
        }
    }
}

@Override
public String version() {
    return "1.0";
}

}

ERRORS:

C:\Users\ADMIN\Downloads\Projects\custom-s3-sink-connector>curl -X GET http://localhost:8083/connectors/s3-sink-connector/status {"name":"s3-sink-connector","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.kafka.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.\n\tat org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493)\n\tat org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)\n\tat org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:112)\n\tat org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:132)\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.<init>(AbstractKafkaSchemaSerDeConfig.java:328)\n\tat io.confluent.connect.avro.AvroConverterConfig.<init>(AvroConverterConfig.java:27)\n\tat io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:68)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:328)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:618)\n\tat org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:521)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1722)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$31(DistributedHerder.java:1772)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n"}],"type":"sink"} C:\Users\ADMIN\Downloads\Projects\custom-s3-sink-connector>

[2024-07-13 07:35:58,798] ERROR Failed to start task s3-sink-connector-0 (org.apache.kafka.connect.runtime.Worker) org.apache.kafka.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value. at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:112) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:132) at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.<init>(AbstractKafkaSchemaSerDeConfig.java:328) at io.confluent.connect.avro.AvroConverterConfig.<init>(AvroConverterConfig.java:27) at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:68) at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:328) at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:618) at org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:521) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1722) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$31(DistributedHerder.java:1772) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

asked 2 years ago420 views
2 Answers
0

Hello, Based on the provided error message, issue looks to be with your custom code. Please note that, we will be able to answer queries, if you have issues when using Amazon MSK connect.

AWS
SUPPORT ENGINEER
answered a year ago
0

Hi, as per the Confluent documentation on schema registry, can you retry the connector providing the following property in the connector configuration:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

[+] https://docs.confluent.io/platform/current/schema-registry/connect.html#avro

This should help resolve the issue. Furthermore, if the connector is based on MSK Connect, please consider opening a support case so that AWS team can review the issue from the backend.

Thank you

AWS
SUPPORT ENGINEER
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.