Moving from Spring Kafka Sink Connector to MSK Connect for external Confluent Cluster

0

Hello, I am trying to create MSK Connector that will feed SQS queue with JSON events. However as I am trying to make a test one with just logging data I run into:

org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.

I believe this might be something with auth config. In Spring Boot app that is currently managing it we have such config:

fun consumerProperties() = mapOf(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to PropertyResolver.getProperty(EnvVars.ARE_KAFKA_URL),
        ConsumerConfig.GROUP_ID_CONFIG to PropertyResolver.getProperty(EnvVars.ARE_KAFKA_COST_LISTENER_GROUP_ID),
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
        ConsumerConfig.CLIENT_ID_CONFIG to InetAddress.getLocalHost().hostName,
        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
        SaslConfigs.SASL_MECHANISM to "PLAIN",
        SaslConfigs.SASL_JAAS_CONFIG to
            " org.apache.kafka.common.security.plain.PlainLoginModule required " +
            "username=\"${PropertyResolver.getProperty(EnvVars.KAFKA_USERNAME)}\" " +
            "password=\"${PropertyResolver.getSecretByCipherPropertyName(EnvVars.KAFKA_PASSWORD_CIPHER)}\";",
    )

I tried rewriting it according to Kafka's docs into the box for properties during creation of MSK Connector I read here: https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-supported-worker-config-properties.html that for workers config types with prefix sasl are not supported, does that mean it is not possible to connect?

asked 7 days ago13 views
1 Answer
0

To connect your MSK Connector to an external Confluent Kafka cluster that uses SASL_SSL authentication, you need to understand the difference between worker configuration properties and connector configuration properties.

The documentation you referenced is correct that worker configuration properties with the "sasl" prefix are not supported directly in the worker configuration. However, this doesn't mean you can't connect to a SASL-authenticated Kafka cluster. Instead, you need to provide these authentication details in the connector configuration itself.

For your MSK Connector configuration, you should include the authentication details as part of your connector configuration JSON, not in the worker properties section. Your connector configuration should include something like:

{
"connector.class": "your.connector.class",
"tasks.max": "1",
"topics": "your-topic",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"consumer.security.protocol": "SASL_SSL",
"consumer.sasl.mechanism": "PLAIN",
"consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"your-username\" password=\"your-password\";",
"consumer.bootstrap.servers": "your-kafka-bootstrap-servers"
}

Note that the authentication properties are prefixed with "consumer." to indicate they apply to the consumer client used by the connector.

When migrating a sink connector from a self-managed environment to MSK Connect, you need to:

  1. Get the connector configuration from your existing setup
  2. Create a new MSK Connector with the same name
  3. Use the appropriate custom plugin for your connector type
  4. Include the authentication details in the connector configuration
  5. Verify the connector is working by checking the logs once it's active

Since Kafka Connect sink connectors use the Kafka group membership API and store offsets in the same __consumer_offset topics as typical consumer applications, the migration should be relatively straightforward once you've properly configured the authentication.
Sources
Migrate sink connectors to Amazon MSK Connect - Amazon Managed Streaming for Apache Kafka
Migrate source connectors to Amazon MSK Connect - Amazon Managed Streaming for Apache Kafka

answered 7 days ago
  • It still fails, and in logs I can see 2025-06-10T13:38:08.000Z [Worker-0a7fd0156af60afa7] sasl.mechanism = GSSAPI

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions