Not receiving the messages at MSK consumer end

0

I am able to send the data by using below code in python and not receiving the message at consumer end. I have enabled all the authentications. I am able to send and received with kafka commands. i need same thing through a python. I have tried with unauthorized mode and SASL_SSL protocol. both able to send but not received. Please find the below code and suggest me to resolve the issue.

Producer code: from confluent_kafka import Producer from datetime import datetime from time import strftime import json

bootstrap_servers = 'b-1.xxxxxxxxxxxxxxx.amazonaws.com:9096,b-2.xxxxxxxxxxxxxxxxxxxx.amazonaws.com:9096' producer = Producer({ 'bootstrap.servers': bootstrap_servers, 'security.protocol': 'SASL_SSL', 'sasl.username': 'cccccc', 'sasl.password': 'ccccccccccccccc', 'sasl.mechanism': 'SCRAM-SHA-512' })

data = { 'message': 'hello world', 'timestamp': datetime.now().strftime("%m/%d/%Y %H:%M:%S") } #print(producer.bootstrap_connected()) producer.produce('testTopic1', json.dumps(data).encode('utf-8')) print('message sent') producer.flush()

Consumer code: from confluent_kafka import Consumer from datetime import datetime from time import strftime import json

bootstrap_servers = 'b-1.xxxxxxxxxxxxxxxxxx.amazonaws.com:9096,b-2.xxxxxxxxxxxxxxxxxxxx.amazonaws.com:9096' consumer = Consumer({ 'bootstrap.servers': bootstrap_servers, 'security.protocol': 'SASL_SSL', 'sasl.username': 'cccccccccc', 'sasl.password': 'cccccccccccccc', 'sasl.mechanism': 'SCRAM-SHA-512' })

print('start reading') consumer.subscribe(['testTopic1'])

while True: msg = consumer.poll(timeout=1.0) if msg is None: continue

if msg.error():
    if msg.error().code() == KafkaError._PARTITION_EOF:
        # End of partition event
        print('topic {} in partition {} reached end at offset {}'.format(msg.topic(), msg.partition(), msg.offset()))
    elif msg.error():
        raise KafkaException(msg.error())
else:
    print("message.offset={}, message.key={}, message.value={}".format(msg.offset(), msg.key(), msg.value()))
kotesh
asked 7 months ago76 views
No Answers

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions