I'm testing the use of MSK Connect with IcebergSinkConnector, but I'm having some difficulty making it work.

0

I'm testing the use of MSK Connect with IcebergSinkConnector, but I'm having some difficulty making it work. If anyone has experience with MSK and can help me, that would be great.

I have a topic in MSK being fed by an MSK Connect with Debezium, performing CDC from a RDS PostgreSQL. In this connect, I use the Glue Schema Registry to register the schemas. Subsequently, I am attempting to sink one of these topics into an Iceberg table, using Glue as the catalog.

My Iceberg sink configuration is as follows, and the connector is running in MSK, but nothing is created or saved in my table. In the CloudWatch logs, I see the error org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000 milliseconds while awaiting InitProducerId.

Can someone help me understand where the error might be?

`connector.class=io.tabular.iceberg.connect.IcebergSinkConnector tasks.max=1 topics=dbz.account errors.deadletterqueue.context.headers.enable=false consumer.auto.offset.reset=latest s3.region=us-east-1 errors.log.enable=true

Iceberg Configs

iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO iceberg.catalog.warehouse=s3://***********/iceberg_tables iceberg.catalog.client.region=us-east-1 iceberg.control.topic=iceberg-connector-control-v1 iceberg.tables.dynamic-enabled=false iceberg.tables.upsert-mode-enabled=true iceberg.tables.evolve-schema-enabled=true iceberg.tables.auto-create-enabled=true iceberg.tables=db.account iceberg.tables.cdc-field=__op iceberg.tables.default-id-columns=account_id iceberg.control.commit.interval-ms=60000

Glue Schema Registry Specific

key.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false key.converter.endpoint=https://glue.us-east-1.amazonaws.com value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter value.converter.schemaAutoRegistrationEnabled=true value.converter.region=us-east-1 value.converter.avroRecordType=GENERIC_RECORD value.converter.compatibility=NONE value.converter.registry.name=testregistry value.converter.endpoint=https://glue.us-east-1.amazonaws.com`

demandé il y a 2 mois216 vues
2 réponses
0

Sorry, my configuration was messed up, I'll send it here in a better way:

connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
tasks.max=1
topics=dbz.account
errors.deadletterqueue.context.headers.enable=false
consumer.auto.offset.reset=latest
s3.region=us-east-1
errors.log.enable=true
# Iceberg Configs
iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
iceberg.catalog.warehouse=s3://***********/iceberg_tables
iceberg.catalog.client.region=us-east-1
iceberg.control.topic=iceberg-connector-control-v1
iceberg.tables.dynamic-enabled=false
iceberg.tables.upsert-mode-enabled=true
iceberg.tables.evolve-schema-enabled=true
iceberg.tables.auto-create-enabled=true
iceberg.tables=db.account
iceberg.tables.cdc-field=__op
iceberg.tables.default-id-columns=account_id
iceberg.control.commit.interval-ms=60000
# Glue Schema Registry Specific
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
key.converter.endpoint=https://glue.us-east-1.amazonaws.com
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter.schemaAutoRegistrationEnabled=true
value.converter.region=us-east-1
value.converter.avroRecordType=GENERIC_RECORD
value.converter.compatibility=NONE
value.converter.registry.name=testregistry
value.converter.endpoint=https://glue.us-east-1.amazonaws.com
répondu il y a 2 mois
0

Add the following to your connector configuration:

transactions.timeout.ms=90000 
retries=5
max.block.ms=90000

Start with slightly higher values to give the connector more leeway, then adjust accordingly once it's functional.

Review the MSK cluster's logs and metrics around the time the errors occur. Look for any broker restarts, configuration changes, scaling events, or unusual network patterns that could contribute to temporary slowdowns.

profile picture
EXPERT
répondu il y a 2 mois

Vous n'êtes pas connecté. Se connecter pour publier une réponse.

Une bonne réponse répond clairement à la question, contient des commentaires constructifs et encourage le développement professionnel de la personne qui pose la question.

Instructions pour répondre aux questions