MSK s3 sink connector with Glue schema registry(avro)

0

MSK connectors are trying to create s3 sink connectors. At this time, schema uses the schema in the Glue schema registry, and the goal is to store avro schema in s3 in parquet format.

Below is the configuration of the msk connector.

s3.bucket.name=test
s3.region=ap-northeast-2
connector.class=io.confluent.connect.s3.S3SinkConnector
storage.class=io.confluent.connect.s3.storage.S3Storage
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
flush.size=1
tasks.max=2

topics=test-004

key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.region=ap-northeast-2
value.converter.region=ap-northeast-2
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
key.converter.avroRecordType=GENERIC_RECORD
value.converter.avroRecordType=GENERIC_RECORD
key.converter.schemaName=test3
value.converter.schemaName=test3
key.converter.registry.name=test
value.converter.registry.name=test

The required jar files were put into the plug-in, and access authentication to the msk cluster is 'none'. The worker configuration used the MSK default configuration.

When I create a connector, the schema id seems to be recognized and imported, but I don't know why the error occurs as below.

[Worker-0695cbfe2622795bc] [2023-12-14 14:50:18,970] ERROR [test-004|task-0] WorkerSinkTask{id=test-004-0} Error converting message value in topic 'test-004' partition 0 at offset 0 and timestamp 1702564893574: Converting byte[] to Kafka Connect data failed due to serialization error:  (org.apache.kafka.connect.runtime.WorkerSinkTask:547)
[Worker-0695cbfe2622795bc] org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
[Worker-0695cbfe2622795bc] 	at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:118)
[Worker-0695cbfe2622795bc] 	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
[Worker-0695cbfe2622795bc] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
[Worker-0695cbfe2622795bc] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
[Worker-0695cbfe2622795bc] 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
[Worker-0695cbfe2622795bc] 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
[Worker-0695cbfe2622795bc] 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
[Worker-0695cbfe2622795bc] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
[Worker-0695cbfe2622795bc] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
[Worker-0695cbfe2622795bc] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
[Worker-0695cbfe2622795bc] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
[Worker-0695cbfe2622795bc] 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
[Worker-0695cbfe2622795bc] 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
[Worker-0695cbfe2622795bc] 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-0695cbfe2622795bc] 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-0695cbfe2622795bc] 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-0695cbfe2622795bc] 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-0695cbfe2622795bc] 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[Worker-0695cbfe2622795bc] 	at java.base/java.lang.Thread.run(Thread.java:829)
[Worker-0695cbfe2622795bc] Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Failed to get schema version Id = @@@@@@@@
[Worker-0695cbfe2622795bc] 	at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade.retrieveSchemaRegistrySchema(GlueSchemaRegistryDeserializationFacade.java:219)
[Worker-0695cbfe2622795bc] 	at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade.getAwsDeserializerSchema(GlueSchemaRegistryDeserializationFacade.java:201)
[Worker-0695cbfe2622795bc] 	at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade.deserialize(GlueSchemaRegistryDeserializationFacade.java:167)
[Worker-0695cbfe2622795bc] 	at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:149)
[Worker-0695cbfe2622795bc] 	at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)
[Worker-0695cbfe2622795bc] 	at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:116)
[Worker-0695cbfe2622795bc] 	... 18 more
[Worker-0695cbfe2622795bc] Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Failed to get schema version Id = @@@@@@@@
[Worker-0695cbfe2622795bc] 	at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getSchemaVersionResponse(AWSSchemaRegistryClient.java:177)
[Worker-0695cbfe2622795bc] 	at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade$GlueSchemaRegistryDeserializationCacheLoader.load(GlueSchemaRegistryDeserializationFacade.java:257)
[Worker-0695cbfe2622795bc] 	at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade$GlueSchemaRegistryDeserializationCacheLoader.load(GlueSchemaRegistryDeserializationFacade.java:253)
[Worker-0695cbfe2622795bc] 	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3570)
[Worker-0695cbfe2622795bc] 	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2312)
[Worker-0695cbfe2622795bc] 	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2189)
[Worker-0695cbfe2622795bc] 	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2079)
[Worker-0695cbfe2622795bc] 	at com.google.common.cache.LocalCache.get(LocalCache.java:4011)
[Worker-0695cbfe2622795bc] 	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4034)
[Worker-0695cbfe2622795bc] 	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5010)
[Worker-0695cbfe2622795bc] 	at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade.retrieveSchemaRegistrySchema(GlueSchemaRegistryDeserializationFacade.java:217)
[Worker-0695cbfe2622795bc] 	... 23 more
[Worker-0695cbfe2622795bc] Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: connect timed out
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:102)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:47)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:211)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:83)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.services.glue.DefaultGlueClient.getSchemaVersion(DefaultGlueClient.java:7868)
[Worker-0695cbfe2622795bc] 	at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getSchemaVersionResponse(AWSSchemaRegistryClient.java:173)
[Worker-0695cbfe2622795bc] 	... 33 more
[Worker-0695cbfe2622795bc] Caused by: java.net.SocketTimeoutException: connect timed out
[Worker-0695cbfe2622795bc] 	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
[Worker-0695cbfe2622795bc] 	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
[Worker-0695cbfe2622795bc] 	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
[Worker-0695cbfe2622795bc] 	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
[Worker-0695cbfe2622795bc] 	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
[Worker-0695cbfe2622795bc] 	at java.base/java.net.Socket.connect(Socket.java:609)
[Worker-0695cbfe2622795bc] 	at java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:305)
[Worker-0695cbfe2622795bc] 	at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)
[Worker-0695cbfe2622795bc] 	at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:509)
[Worker-0695cbfe2622795bc] 	at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:604)
[Worker-0695cbfe2622795bc] 	at java.base/sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:266)
[Worker-0695cbfe2622795bc] 	at java.base/sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:373)
[Worker-0695cbfe2622795bc] 	at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:207)
[Worker-0695cbfe2622795bc] 	at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1187)
[Worker-0695cbfe2622795bc] 	at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1081)
[Worker-0695cbfe2622795bc] 	at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:193)
[Worker-0695cbfe2622795bc] 	at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java:168)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.call(UrlConnectionHttpClient.java:291)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.call(UrlConnectionHttpClient.java:269)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:63)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77)
[Worker-0695cbfe2622795bc] 	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56)
asked 4 months ago327 views
No Answers

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions