I'm testing the use of MSK Connect with IcebergSinkConnector, but I'm having some difficulty making it work.

0

I'm testing the use of MSK Connect with IcebergSinkConnector, but I'm having some difficulty making it work. If anyone has experience with MSK and can help me, that would be great.

I have a topic in MSK being fed by an MSK Connect with Debezium, performing CDC from a RDS PostgreSQL. In this connect, I use the Glue Schema Registry to register the schemas. Subsequently, I am attempting to sink one of these topics into an Iceberg table, using Glue as the catalog.

My Iceberg sink configuration is as follows, and the connector is running in MSK, but nothing is created or saved in my table. In the CloudWatch logs, I see the error org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000 milliseconds while awaiting InitProducerId.

Can someone help me understand where the error might be?

`connector.class=io.tabular.iceberg.connect.IcebergSinkConnector tasks.max=1 topics=dbz.account errors.deadletterqueue.context.headers.enable=false consumer.auto.offset.reset=latest s3.region=us-east-1 errors.log.enable=true

Iceberg Configs

iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO iceberg.catalog.warehouse=s3://***********/iceberg_tables iceberg.catalog.client.region=us-east-1 iceberg.control.topic=iceberg-connector-control-v1 iceberg.tables.dynamic-enabled=false iceberg.tables.upsert-mode-enabled=true iceberg.tables.evolve-schema-enabled=true iceberg.tables.auto-create-enabled=true iceberg.tables=db.account iceberg.tables.cdc-field=__op iceberg.tables.default-id-columns=account_id iceberg.control.commit.interval-ms=60000

Glue Schema Registry Specific

key.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false key.converter.endpoint=https://glue.us-east-1.amazonaws.com value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter value.converter.schemaAutoRegistrationEnabled=true value.converter.region=us-east-1 value.converter.avroRecordType=GENERIC_RECORD value.converter.compatibility=NONE value.converter.registry.name=testregistry value.converter.endpoint=https://glue.us-east-1.amazonaws.com`

질문됨 2달 전216회 조회
2개 답변
0

Sorry, my configuration was messed up, I'll send it here in a better way:

connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
tasks.max=1
topics=dbz.account
errors.deadletterqueue.context.headers.enable=false
consumer.auto.offset.reset=latest
s3.region=us-east-1
errors.log.enable=true
# Iceberg Configs
iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
iceberg.catalog.warehouse=s3://***********/iceberg_tables
iceberg.catalog.client.region=us-east-1
iceberg.control.topic=iceberg-connector-control-v1
iceberg.tables.dynamic-enabled=false
iceberg.tables.upsert-mode-enabled=true
iceberg.tables.evolve-schema-enabled=true
iceberg.tables.auto-create-enabled=true
iceberg.tables=db.account
iceberg.tables.cdc-field=__op
iceberg.tables.default-id-columns=account_id
iceberg.control.commit.interval-ms=60000
# Glue Schema Registry Specific
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
key.converter.endpoint=https://glue.us-east-1.amazonaws.com
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter.schemaAutoRegistrationEnabled=true
value.converter.region=us-east-1
value.converter.avroRecordType=GENERIC_RECORD
value.converter.compatibility=NONE
value.converter.registry.name=testregistry
value.converter.endpoint=https://glue.us-east-1.amazonaws.com
답변함 2달 전
0

Add the following to your connector configuration:

transactions.timeout.ms=90000 
retries=5
max.block.ms=90000

Start with slightly higher values to give the connector more leeway, then adjust accordingly once it's functional.

Review the MSK cluster's logs and metrics around the time the errors occur. Look for any broker restarts, configuration changes, scaling events, or unusual network patterns that could contribute to temporary slowdowns.

profile picture
전문가
답변함 2달 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠