By using AWS re:Post, you agree to the Terms of Use
/AWS Glue Schema Registry and MSK Connect Integration for AVRO Schema/

AWS Glue Schema Registry and MSK Connect Integration for AVRO Schema

0

We have a usecase to read Oracle table and publish the records into AWS MSK topic. For that purpose we are using MSKConnect and trying to deploy Confluent JDBCSourceConnector. We are using AWS Glue schema registry for schema management. We have used below configuration in our connector but its just giving the error and connector always goes to failed status.

  key.converter= org.apache.kafka.connect.storage.StringConverter
  key.converter.schemas.enable= false
  key.converter.avroRecordType= GENERIC_RECORD
  key.converter.region= us-east-1
  key.converter.registry.name= ebx-control-tbl-registryE
  key.converter.schemaAutoRegistrationEnabled= true
  value.converter= com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
  value.converter.schemas.enable= true
  value.converter.avroRecordType= GENERIC_RECORD
  value.converter.region= us-east-1
  value.converter.registry.name= ebx-control-tbl-registry
  value.converter.schemaAutoRegistrationEnabled= true

Its giving below error.

[Worker-0dc06f886ba9272ef] Caused by: org.apache.kafka.connect.errors.DataException: Converting Kafka Connect data to byte[] failed due to serialization error:

Has anyone successfully used Confluent JDBCSourceConnector with MSK connect and AWS Glue Schema registry?

1 Answers
0

Hello,

I am able to use the Confluent Kafka jdbc connect with MSK and integrated with Glue schema registry(GSR) with the below steps. Posting the steps here, in case if it helps Note: I am using mysql as my source instead of Oracle

  1. Collect the below jars
  • Build GSR avro schema converter jar
wget https://github.com/awslabs/aws-glue-schema-registry/archive/refs/tags/v1.1.8.zip
unzip v1.1.8.zip
cd aws-glue-schema-registry
mvn clean install
mvn dependency:copy-dependencies

A jar file with name schema-registry-kafkaconnect-converter-1.1.8.jar gets created in the directory avro-kafkaconnect-converter/target/

I zipped all the above 3 jars and uploaded the zip file into an s3 bucket

  1. I created an MSK custom plugin using the above file in s3 bucket
  2. I created a simple MSK cluster(without any authentication) in the private subnets of my VPC which has a route to internet via NAT gateway
  3. I created a topic with the same name as the mysql table
  4. I created an MSK connector from the plugin created in (2) with the config like below
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:mysql://myip:3306/mydb
connection.user=XXXXX
connection.password=XXXX
table.whitelist=mytbl
tasks.max=5
mode=bulk
key.converter= org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable= true
key.converter.avroRecordType=GENERIC_RECORD
key.converter.region=us-east-1
key.converter.registry.name=testregistry
key.converter.schemaAutoRegistrationEnabled=true
value.converter= com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter.schemas.enable=true
value.converter.avroRecordType=GENERIC_RECORD
value.converter.region=us-east-1
value.converter.registry.name=testregistry
value.converter.schemaAutoRegistrationEnabled= true

Ref links

https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html#jdbc-source-configs https://docs.confluent.io/platform/current/schema-registry/connect.html https://aws.amazon.com/blogs/big-data/evolve-json-schemas-in-amazon-msk-and-amazon-kinesis-data-streams-with-the-aws-glue-schema-registry/

After completing all of the above steps, the MSK JDBC connect is able to extract the table and push the rows into the MSK topic.

answered 2 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions