Kafka Transactions not working while using IAM authentication with conlfuent-kafka python producer client

0

We have an MSK cluster , let's call it test-cluster.

While using IAM authentication with confluent-kafka producer client in python, producer tries to write data using transactions to a kafka topic which is not succeeding as the the producer timesout at the init_transactions(5) call. See code below:

#####code ############################# def oauth_cb(oauth_config): auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token("eu-central-1") # Note that this library expects oauth_cb to return expiry time in seconds since epoch, while the token generator returns expiry in ms return auth_token, expiry_ms/1000

from confluent_kafka import Producer as cf_producer

Now test the transactions

transactional_p = cf_producer({ 'debug': 'All', 'bootstrap.servers': <bootstrap-urls>, 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'OAUTHBEARER', 'oauth_cb': oauth_cb, 'transactional.id': "transaction_test1", 'acks': 'All', 'transaction.timeout.ms': 10000, 'request.timeout.ms': 4000 })

try: transactional_p.init_transactions(5) # Times out here transactional_p.begin_transaction() except Exception as e: print(e)

transactional_p.produce( 'iam-test-topic', json.dumps({"test": "test"}), "123", partition=-1
)

transactional_p.flush(5) transactional_p.commit_transaction() ############################# Here are some logs from the producer.init() call:

###logs##########################

%7|1711109238.481|CONNECT|rdkafka#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 47ms: acquire ProducerID %7|1711109238.481|PIDBROKER|rdkafka#producer-1| [thrd:main]: No brokers available for Transactions (4 broker(s) known) %7|1711109238.482|TXNCOORD|rdkafka#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (4 broker(s) known) %7|1711109238.982|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: acquire ProducerID %7|1711109238.982|PIDBROKER|rdkafka#producer-1| [thrd:main]: No brokers available for Transactions (4 broker(s) known) %7|1711109238.982|TXNCOORD|rdkafka#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (4 broker(s) known) %7|1711109239.479|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: no cluster connection %7|1711109239.482|CONNECT|rdkafka#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 46ms: acquire ProducerID %7|1711109239.482|PIDBROKER|rdkafka#producer-1| [thrd:main]: No brokers available for Transactions (4 broker(s) known) %7|1711109239.482|TXNCOORD|rdkafka#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (4 broker(s) known) %7|1711109239.982|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: acquire ProducerID %7|1711109239.982|PIDBROKER|rdkafka#producer-1| [thrd:main]: No brokers available for Transactions (4 broker(s) known) %7|1711109239.982|TXNCOORD|rdkafka#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (4 broker(s) known) %7|1711109240.479|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: no cluster connection %7|1711109240.482|CONNECT|rdkafka#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 46ms: acquire ProducerID %7|1711109240.482|PIDBROKER|rdkafka#producer-1| [thrd:main]: No brokers available for Transactions (4 broker(s) known) %7|1711109240.482|TXNCOORD|rdkafka#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (4 broker(s) known) %7|1711109240.982|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: acquire ProducerID %7|1711109240.982|PIDBROKER|rdkafka#producer-1| [thrd:main]: No brokers available for Transactions (4 broker(s) known) %7|1711109240.983|TXNCOORD|rdkafka#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (4 broker(s) known) %7|1711109241.479|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: no cluster connection %7|1711109241.483|CONNECT|rdkafka#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 46ms: acquire ProducerID %7|1711109241.483|PIDBROKER|rdkafka#producer-1| [thrd:main]: No brokers available for Transactions (4 broker(s) known) %7|1711109241.483|TXNCOORD|rdkafka#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (4 broker(s) known) %7|1711109241.983|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: acquire ProducerID %7|1711109241.983|PIDBROKER|rdkafka#producer-1| [thrd:main]: No brokers available for Transactions (4 broker(s) known) %7|1711109241.983|TXNCOORD|rdkafka#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (4 broker(s) known) %7|1711109242.479|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: no cluster connection #############################################

When using the same producer with SASL/SCRAM auth mechanism, the transaction is successfully able to produce data to the kafka topic.

The code is running on an EC2 instance that shares the subnet with the MSK cluster. The instance has a role which has following policy attached to it:

Policy######

{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": "kafka-cluster:", "Resource": "" } ] } ################### Anothe rperson faced the same issue in February and posted about it online, here are the links to his problem:

Thank you for your help.

-- Ibrar

4 Answers
0

Hello, Ibrar.

This type of connectivity is challenging to troubleshoot. There are a few reasons for that to not work:

  1. You client/producer is not configured to use the correct IAM library
  2. Your client/producer is not using the proper IAM role which has the permissions to use your MSK cluster
  3. You client/producer is not able to reach your brokers on the given ports
  4. Your MSK cluster is not configured with IAM role-based authentication

Find below an example of a Glue Job in Scala that works fine using IAM role-based-authentication. BTW, the Glue Job has a network connection associated with it, so it connects using private network.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

object GlueApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("SparkKafkaStreamingJob")
      .getOrCreate()

    import spark.implicits._

    val bootstrapServers = "<broker1address>:9098,<broker2address>:9098,<broker3address>:9098"
    val sourceTopic = "topicoOrigem3Particoes"
    val destinationTopic = "topicoDestino"

    val kafkaSourceDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", sourceTopic)
      .option("kafka.security.protocol", "SASL_SSL") 
      .option("kafka.sasl.mechanism", "AWS_MSK_IAM")
      .option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
      .option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
      .option("startingOffsets", "earliest")
      .load()

    val transformedDF = kafkaSourceDF
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]

    val query = transformedDF
      .writeStream
      .foreachBatch { (batchDF: org.apache.spark.sql.Dataset[(String, String)], batchId: Long) =>
        // Inside this block, you can write the batch DataFrame to Kafka or perform any transformation
        batchDF
          .selectExpr("key", "value")
          .write
          .format("kafka")
          .option("kafka.bootstrap.servers", bootstrapServers)
          .option("topic", destinationTopic)
          .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.sasl.mechanism", "AWS_MSK_IAM")
          .option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
          .option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
          .save()
      }
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start()

    query.awaitTermination()
  }
}

Please accept this answer if it helps. Otherwise, please share screenshots of the items above so we can troubleshoot.

Regards, Pablo Silva

profile pictureAWS
answered a month ago
profile picture
EXPERT
reviewed a month ago
  • Hi,

    Thanks for your comment Pablo. But I think you are not using transactions in your example. My issue is specifically occurring with the use of transactions.

  • In place of screenshots, I posted my code as well as the corresponding logs from kafka producer. Let me know if that is not sufficient to troubleshoot.

  • #####code ############################# 
    def oauth_cb(oauth_config):
      auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token("eu-central-1") # Note that this library expects oauth_cb to return expiry time in seconds since epoch, while the token generator returns expiry in ms return auth_token, expiry_ms/1000
    
    from confluent_kafka import Producer as cf_producer
    
    # Now test the transactions
    transactional_p = cf_producer({ 'debug': 'All', 'bootstrap.servers': <bootstrap-urls>, 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'OAUTHBEARER', 'oauth_cb': oauth_cb, 'transactional.id': "transaction_test1", 'acks': 'All', 'transaction.timeout.ms': 10000, 'request.timeout.ms': 4000 })
    
    try: 
        transactional_p.init_transactions(5) # Times out here
        transactional_p.begin_transaction() except Exception as e: print(e)
    
        transactional_p.produce( 'iam-test-topic', json.dumps({"test": "test"}), "123", partition=-1
        )
    
        transactional_p.flush(5) transactional_p.commit_transaction()
    
0

Thank you, Ibrar.

I've enabled transaction and tested with success the following code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

object GlueApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("SparkKafkaStreamingJob")
      .getOrCreate()

    import spark.implicits._

    val bootstrapServers = "b-4.exextratolancamentoinf.d34470.c2.kafka.sa-east-1.amazonaws.com:9098,b-5.exextratolancamentoinf.d34470.c2.kafka.sa-east-1.amazonaws.com:9098,b-1.exextratolancamentoinf.d34470.c2.kafka.sa-east-1.amazonaws.com:9098"
    val sourceTopic = "pablo20240327"
    val destinationTopic = "pablo20240327target"

    val kafkaSourceDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", sourceTopic)
      .option("kafka.security.protocol", "SASL_SSL") 
      .option("kafka.sasl.mechanism", "AWS_MSK_IAM")
      .option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
      .option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
      .option("startingOffsets", "earliest")
      .load()

    val transformedDF = kafkaSourceDF
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]

    val query = transformedDF
      .writeStream
      .foreachBatch { (batchDF: org.apache.spark.sql.Dataset[(String, String)], batchId: Long) =>
        batchDF
          .selectExpr("key", "value")
          .write
          .format("kafka")
          .option("kafka.bootstrap.servers", bootstrapServers)
          .option("topic", destinationTopic)
          .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.sasl.mechanism", "AWS_MSK_IAM")
          .option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
          .option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
          .option("enable.idempotence", "true") // Ensure idempotence
          .option("transactional.id", s"transaction_${batchId}") // Set transactional ID
          .option("transaction.timeout.ms", "30000") // Optional: Set transaction timeout
          .save()
      }
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start()

    query.awaitTermination()
  }
}

Explanation:

In the context of using Apache Kafka with Apache Spark Structured Streaming, you don't manually commit transactions when writing to Kafka, as Spark handles this internally. When you use the writeStream API in Spark with Kafka as a sink and configure it for exactly-once semantics (by setting enable.idempotence to true and providing a transactional.id), Spark manages the transaction lifecycle, including the initiation and completion of transactions, automatically.

Here's what happens under the hood:

Starting a Transaction: When a batch of data is ready to be written to Kafka, Spark starts a transaction using the configured transactional.id. This ensures that all records in the batch are treated as a single atomic unit.

Writing Records: Spark writes the records in the batch to Kafka. If enable.idempotence is set to true, Kafka ensures that these records are written exactly once, even in the case of retries.

Committing the Transaction: After successfully writing all records in the batch, Spark commits the transaction. This commit marks the records as consumable by Kafka consumers that are reading from the topic.

At this moment, I am unable to test with confluent libraries, but would you be able to review/compare with the following?

import json
from confluent_kafka import Producer as cf_producer

def oauth_cb(oauth_config):
    auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token("eu-central-1")
    # Convert expiry time from milliseconds to seconds
    return auth_token, expiry_ms / 1000

# Create the producer with the necessary configuration
transactional_p = cf_producer({
    'debug': 'all',
    'bootstrap.servers': '<bootstrap-urls>',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'OAUTHBEARER',
    'oauth_cb': oauth_cb,
    'transactional.id': "transaction_test1",
    'acks': 'all',
    'transaction.timeout.ms': 10000,
    'request.timeout.ms': 4000
})

# Initialize transactions and produce messages within a transaction
try:
    transactional_p.init_transactions(5)
    transactional_p.begin_transaction()

    # Produce a message - ensure the value is serialized to a string if necessary
    transactional_p.produce('iam-test-topic', json.dumps({"test": "test"}), "123", partition=-1)

    # Wait for all messages to be sent
    transactional_p.flush(5)

    # Commit the transaction
    transactional_p.commit_transaction()
except Exception as e:
    print(e)

Regards, Pablo Silva

profile pictureAWS
answered a month ago
  • Thanks for checking this out, and interesting to know that it works in scala.

    The python snippet you asked me to try is exactly what I have tried, it time out at init. I retried again to double check in case I was missing something, same issue, here are the logs from producer:

    state INIT -> TRY_CONNECT
    %7|1711554638.682|BROADCAST|rdkafka#producer-1| [thrd:sasl_ssl://b-2.bedevmskec1clusterm.zmklwn.c2.kafka.eu-central-1]: Broadcasting state change
    %7|1711554638.684|CONNECT|rdkafka#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 47ms: acquire ProducerID
    %7|1711554638.684|PIDBROKER|rdkafka#producer-1| [thrd:main]: No brokers available for Transactions (4 broker(s) known)
    %7|1711554638.684|TXNCOORD|rdkafka#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (4 broker(s) known)
    %7|1711554639.184|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: acquire ProducerID
    %7|1711554639.185|PIDBROKER|rdkafka#producer-1| [thrd:main]: No brokers available for Transactions (4 broker(s) known)
    %7|1711554639.185|TXNCOORD|rdkafka#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (4 broker(s) known)
    %7|1711554639.682|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: no cluster connection
    
0

Glad to help. Have you investigated the network already? In other words, are you able to test your application (without transaction) from a subnet that is able to reach the brokers on port 9098?

This would be my troubleshooting plan:

Check Network Connectivity: Ensure that your producer can reach the Kafka brokers over the network. Verify security groups, firewall rules, and VPC configurations.

Verify Broker Details: Make sure that the broker addresses in the producer configuration are correct and that the brokers are running.

Check for Broker Logs: Look into the logs of the Kafka brokers to see if there are any error messages that can give more insight into why the connections are failing.

Regards,

Pablo Silva

profile pictureAWS
answered a month ago
  • Yes, as mentioned in my original question, using SASAL/SCRAM as auth mechanism, transactions work fine. So this seems like a combination of IAMauth+confuent_kafka library issue.

0

Indeed, Ibrar. Would you like to share your python code for me to try? If so, please remove any confidential.

Regards, Pablo Silva

profile pictureAWS
answered a month ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions