I'm testing the use of MSK Connect with IcebergSinkConnector, but I'm having some difficulty making it work.

0

I'm testing the use of MSK Connect with IcebergSinkConnector, but I'm having some difficulty making it work. If anyone has experience with MSK and can help me, that would be great.

I have a topic in MSK being fed by an MSK Connect with Debezium, performing CDC from a RDS PostgreSQL. In this connect, I use the Glue Schema Registry to register the schemas. Subsequently, I am attempting to sink one of these topics into an Iceberg table, using Glue as the catalog.

My Iceberg sink configuration is as follows, and the connector is running in MSK, but nothing is created or saved in my table. In the CloudWatch logs, I see the error org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000 milliseconds while awaiting InitProducerId.

Can someone help me understand where the error might be?

`connector.class=io.tabular.iceberg.connect.IcebergSinkConnector tasks.max=1 topics=dbz.account errors.deadletterqueue.context.headers.enable=false consumer.auto.offset.reset=latest s3.region=us-east-1 errors.log.enable=true

Iceberg Configs

iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO iceberg.catalog.warehouse=s3://***********/iceberg_tables iceberg.catalog.client.region=us-east-1 iceberg.control.topic=iceberg-connector-control-v1 iceberg.tables.dynamic-enabled=false iceberg.tables.upsert-mode-enabled=true iceberg.tables.evolve-schema-enabled=true iceberg.tables.auto-create-enabled=true iceberg.tables=db.account iceberg.tables.cdc-field=__op iceberg.tables.default-id-columns=account_id iceberg.control.commit.interval-ms=60000

Glue Schema Registry Specific

key.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false key.converter.endpoint=https://glue.us-east-1.amazonaws.com value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter value.converter.schemaAutoRegistrationEnabled=true value.converter.region=us-east-1 value.converter.avroRecordType=GENERIC_RECORD value.converter.compatibility=NONE value.converter.registry.name=testregistry value.converter.endpoint=https://glue.us-east-1.amazonaws.com`

asked a month ago168 views
2 Answers
0

Sorry, my configuration was messed up, I'll send it here in a better way:

connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
tasks.max=1
topics=dbz.account
errors.deadletterqueue.context.headers.enable=false
consumer.auto.offset.reset=latest
s3.region=us-east-1
errors.log.enable=true
# Iceberg Configs
iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
iceberg.catalog.warehouse=s3://***********/iceberg_tables
iceberg.catalog.client.region=us-east-1
iceberg.control.topic=iceberg-connector-control-v1
iceberg.tables.dynamic-enabled=false
iceberg.tables.upsert-mode-enabled=true
iceberg.tables.evolve-schema-enabled=true
iceberg.tables.auto-create-enabled=true
iceberg.tables=db.account
iceberg.tables.cdc-field=__op
iceberg.tables.default-id-columns=account_id
iceberg.control.commit.interval-ms=60000
# Glue Schema Registry Specific
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
key.converter.endpoint=https://glue.us-east-1.amazonaws.com
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter.schemaAutoRegistrationEnabled=true
value.converter.region=us-east-1
value.converter.avroRecordType=GENERIC_RECORD
value.converter.compatibility=NONE
value.converter.registry.name=testregistry
value.converter.endpoint=https://glue.us-east-1.amazonaws.com
answered a month ago
0

Add the following to your connector configuration:

transactions.timeout.ms=90000 
retries=5
max.block.ms=90000

Start with slightly higher values to give the connector more leeway, then adjust accordingly once it's functional.

Review the MSK cluster's logs and metrics around the time the errors occur. Look for any broker restarts, configuration changes, scaling events, or unusual network patterns that could contribute to temporary slowdowns.

profile picture
EXPERT
answered a month ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions