I'm testing the use of MSK Connect with IcebergSinkConnector, but I'm having some difficulty making it work.

0

I'm testing the use of MSK Connect with IcebergSinkConnector, but I'm having some difficulty making it work. If anyone has experience with MSK and can help me, that would be great.

I have a topic in MSK being fed by an MSK Connect with Debezium, performing CDC from a RDS PostgreSQL. In this connect, I use the Glue Schema Registry to register the schemas. Subsequently, I am attempting to sink one of these topics into an Iceberg table, using Glue as the catalog.

My Iceberg sink configuration is as follows, and the connector is running in MSK, but nothing is created or saved in my table. In the CloudWatch logs, I see the error org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000 milliseconds while awaiting InitProducerId.

Can someone help me understand where the error might be?

`connector.class=io.tabular.iceberg.connect.IcebergSinkConnector tasks.max=1 topics=dbz.account errors.deadletterqueue.context.headers.enable=false consumer.auto.offset.reset=latest s3.region=us-east-1 errors.log.enable=true

Iceberg Configs

iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO iceberg.catalog.warehouse=s3://***********/iceberg_tables iceberg.catalog.client.region=us-east-1 iceberg.control.topic=iceberg-connector-control-v1 iceberg.tables.dynamic-enabled=false iceberg.tables.upsert-mode-enabled=true iceberg.tables.evolve-schema-enabled=true iceberg.tables.auto-create-enabled=true iceberg.tables=db.account iceberg.tables.cdc-field=__op iceberg.tables.default-id-columns=account_id iceberg.control.commit.interval-ms=60000

Glue Schema Registry Specific

key.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false key.converter.endpoint=https://glue.us-east-1.amazonaws.com value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter value.converter.schemaAutoRegistrationEnabled=true value.converter.region=us-east-1 value.converter.avroRecordType=GENERIC_RECORD value.converter.compatibility=NONE value.converter.registry.name=testregistry value.converter.endpoint=https://glue.us-east-1.amazonaws.com`

已提問 2 個月前檢視次數 216 次
2 個答案
0

Sorry, my configuration was messed up, I'll send it here in a better way:

connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
tasks.max=1
topics=dbz.account
errors.deadletterqueue.context.headers.enable=false
consumer.auto.offset.reset=latest
s3.region=us-east-1
errors.log.enable=true
# Iceberg Configs
iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
iceberg.catalog.warehouse=s3://***********/iceberg_tables
iceberg.catalog.client.region=us-east-1
iceberg.control.topic=iceberg-connector-control-v1
iceberg.tables.dynamic-enabled=false
iceberg.tables.upsert-mode-enabled=true
iceberg.tables.evolve-schema-enabled=true
iceberg.tables.auto-create-enabled=true
iceberg.tables=db.account
iceberg.tables.cdc-field=__op
iceberg.tables.default-id-columns=account_id
iceberg.control.commit.interval-ms=60000
# Glue Schema Registry Specific
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
key.converter.endpoint=https://glue.us-east-1.amazonaws.com
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter.schemaAutoRegistrationEnabled=true
value.converter.region=us-east-1
value.converter.avroRecordType=GENERIC_RECORD
value.converter.compatibility=NONE
value.converter.registry.name=testregistry
value.converter.endpoint=https://glue.us-east-1.amazonaws.com
已回答 2 個月前
0

Add the following to your connector configuration:

transactions.timeout.ms=90000 
retries=5
max.block.ms=90000

Start with slightly higher values to give the connector more leeway, then adjust accordingly once it's functional.

Review the MSK cluster's logs and metrics around the time the errors occur. Look for any broker restarts, configuration changes, scaling events, or unusual network patterns that could contribute to temporary slowdowns.

profile picture
專家
已回答 2 個月前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南