- Newest
- Most votes
- Most comments
Hello, Ibrar.
This type of connectivity is challenging to troubleshoot. There are a few reasons for that to not work:
- You client/producer is not configured to use the correct IAM library
- Your client/producer is not using the proper IAM role which has the permissions to use your MSK cluster
- You client/producer is not able to reach your brokers on the given ports
- Your MSK cluster is not configured with IAM role-based authentication
Find below an example of a Glue Job in Scala that works fine using IAM role-based-authentication. BTW, the Glue Job has a network connection associated with it, so it connects using private network.
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.Trigger object GlueApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("SparkKafkaStreamingJob") .getOrCreate() import spark.implicits._ val bootstrapServers = "<broker1address>:9098,<broker2address>:9098,<broker3address>:9098" val sourceTopic = "topicoOrigem3Particoes" val destinationTopic = "topicoDestino" val kafkaSourceDF = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("subscribe", sourceTopic) .option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.mechanism", "AWS_MSK_IAM") .option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;") .option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler") .option("startingOffsets", "earliest") .load() val transformedDF = kafkaSourceDF .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] val query = transformedDF .writeStream .foreachBatch { (batchDF: org.apache.spark.sql.Dataset[(String, String)], batchId: Long) => // Inside this block, you can write the batch DataFrame to Kafka or perform any transformation batchDF .selectExpr("key", "value") .write .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("topic", destinationTopic) .option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.mechanism", "AWS_MSK_IAM") .option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;") .option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler") .save() } .trigger(Trigger.ProcessingTime("5 seconds")) .start() query.awaitTermination() } }
Please accept this answer if it helps. Otherwise, please share screenshots of the items above so we can troubleshoot.
Regards, Pablo Silva
Thank you, Ibrar.
I've enabled transaction and tested with success the following code:
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.Trigger object GlueApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("SparkKafkaStreamingJob") .getOrCreate() import spark.implicits._ val bootstrapServers = "b-4.exextratolancamentoinf.d34470.c2.kafka.sa-east-1.amazonaws.com:9098,b-5.exextratolancamentoinf.d34470.c2.kafka.sa-east-1.amazonaws.com:9098,b-1.exextratolancamentoinf.d34470.c2.kafka.sa-east-1.amazonaws.com:9098" val sourceTopic = "pablo20240327" val destinationTopic = "pablo20240327target" val kafkaSourceDF = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("subscribe", sourceTopic) .option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.mechanism", "AWS_MSK_IAM") .option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;") .option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler") .option("startingOffsets", "earliest") .load() val transformedDF = kafkaSourceDF .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] val query = transformedDF .writeStream .foreachBatch { (batchDF: org.apache.spark.sql.Dataset[(String, String)], batchId: Long) => batchDF .selectExpr("key", "value") .write .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("topic", destinationTopic) .option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.mechanism", "AWS_MSK_IAM") .option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;") .option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler") .option("enable.idempotence", "true") // Ensure idempotence .option("transactional.id", s"transaction_${batchId}") // Set transactional ID .option("transaction.timeout.ms", "30000") // Optional: Set transaction timeout .save() } .trigger(Trigger.ProcessingTime("5 seconds")) .start() query.awaitTermination() } }
Explanation:
In the context of using Apache Kafka with Apache Spark Structured Streaming, you don't manually commit transactions when writing to Kafka, as Spark handles this internally. When you use the writeStream API in Spark with Kafka as a sink and configure it for exactly-once semantics (by setting enable.idempotence to true and providing a transactional.id), Spark manages the transaction lifecycle, including the initiation and completion of transactions, automatically.
Here's what happens under the hood:
Starting a Transaction: When a batch of data is ready to be written to Kafka, Spark starts a transaction using the configured transactional.id. This ensures that all records in the batch are treated as a single atomic unit.
Writing Records: Spark writes the records in the batch to Kafka. If enable.idempotence is set to true, Kafka ensures that these records are written exactly once, even in the case of retries.
Committing the Transaction: After successfully writing all records in the batch, Spark commits the transaction. This commit marks the records as consumable by Kafka consumers that are reading from the topic.
At this moment, I am unable to test with confluent libraries, but would you be able to review/compare with the following?
import json from confluent_kafka import Producer as cf_producer def oauth_cb(oauth_config): auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token("eu-central-1") # Convert expiry time from milliseconds to seconds return auth_token, expiry_ms / 1000 # Create the producer with the necessary configuration transactional_p = cf_producer({ 'debug': 'all', 'bootstrap.servers': '<bootstrap-urls>', 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'OAUTHBEARER', 'oauth_cb': oauth_cb, 'transactional.id': "transaction_test1", 'acks': 'all', 'transaction.timeout.ms': 10000, 'request.timeout.ms': 4000 }) # Initialize transactions and produce messages within a transaction try: transactional_p.init_transactions(5) transactional_p.begin_transaction() # Produce a message - ensure the value is serialized to a string if necessary transactional_p.produce('iam-test-topic', json.dumps({"test": "test"}), "123", partition=-1) # Wait for all messages to be sent transactional_p.flush(5) # Commit the transaction transactional_p.commit_transaction() except Exception as e: print(e)
Regards, Pablo Silva
Thanks for checking this out, and interesting to know that it works in scala.
The python snippet you asked me to try is exactly what I have tried, it time out at init. I retried again to double check in case I was missing something, same issue, here are the logs from producer:
state INIT -> TRY_CONNECT %7|1711554638.682|BROADCAST|rdkafka#producer-1| [thrd:sasl_ssl://b-2.bedevmskec1clusterm.zmklwn.c2.kafka.eu-central-1]: Broadcasting state change %7|1711554638.684|CONNECT|rdkafka#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 47ms: acquire ProducerID %7|1711554638.684|PIDBROKER|rdkafka#producer-1| [thrd:main]: No brokers available for Transactions (4 broker(s) known) %7|1711554638.684|TXNCOORD|rdkafka#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (4 broker(s) known) %7|1711554639.184|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: acquire ProducerID %7|1711554639.185|PIDBROKER|rdkafka#producer-1| [thrd:main]: No brokers available for Transactions (4 broker(s) known) %7|1711554639.185|TXNCOORD|rdkafka#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (4 broker(s) known) %7|1711554639.682|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: no cluster connection
Glad to help. Have you investigated the network already? In other words, are you able to test your application (without transaction) from a subnet that is able to reach the brokers on port 9098?
This would be my troubleshooting plan:
Check Network Connectivity: Ensure that your producer can reach the Kafka brokers over the network. Verify security groups, firewall rules, and VPC configurations.
Verify Broker Details: Make sure that the broker addresses in the producer configuration are correct and that the brokers are running.
Check for Broker Logs: Look into the logs of the Kafka brokers to see if there are any error messages that can give more insight into why the connections are failing.
Regards,
Pablo Silva
Yes, as mentioned in my original question, using SASAL/SCRAM as auth mechanism, transactions work fine. So this seems like a combination of IAMauth+confuent_kafka library issue.
Indeed, Ibrar. Would you like to share your python code for me to try? If so, please remove any confidential.
Regards, Pablo Silva
Relevant content
- asked 2 years ago
- asked 2 years ago
- asked 2 years ago
- Accepted Answerasked a year ago
- How do I troubleshoot common issues when using my Amazon MSK cluster with SASL/SCRAM authentication?AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 5 months ago
Hi,
Thanks for your comment Pablo. But I think you are not using transactions in your example. My issue is specifically occurring with the use of transactions.
In place of screenshots, I posted my code as well as the corresponding logs from kafka producer. Let me know if that is not sufficient to troubleshoot.