Not receiving the messages at MSK consumer end

0

I am able to send the data by using below code in python and not receiving the message at consumer end. I have enabled all the authentications. I am able to send and received with kafka commands. i need same thing through a python. I have tried with unauthorized mode and SASL_SSL protocol. both able to send but not received. Please find the below code and suggest me to resolve the issue.

Producer code: from confluent_kafka import Producer from datetime import datetime from time import strftime import json

bootstrap_servers = 'b-1.xxxxxxxxxxxxxxx.amazonaws.com:9096,b-2.xxxxxxxxxxxxxxxxxxxx.amazonaws.com:9096' producer = Producer({ 'bootstrap.servers': bootstrap_servers, 'security.protocol': 'SASL_SSL', 'sasl.username': 'cccccc', 'sasl.password': 'ccccccccccccccc', 'sasl.mechanism': 'SCRAM-SHA-512' })

data = { 'message': 'hello world', 'timestamp': datetime.now().strftime("%m/%d/%Y %H:%M:%S") } #print(producer.bootstrap_connected()) producer.produce('testTopic1', json.dumps(data).encode('utf-8')) print('message sent') producer.flush()

Consumer code: from confluent_kafka import Consumer from datetime import datetime from time import strftime import json

bootstrap_servers = 'b-1.xxxxxxxxxxxxxxxxxx.amazonaws.com:9096,b-2.xxxxxxxxxxxxxxxxxxxx.amazonaws.com:9096' consumer = Consumer({ 'bootstrap.servers': bootstrap_servers, 'security.protocol': 'SASL_SSL', 'sasl.username': 'cccccccccc', 'sasl.password': 'cccccccccccccc', 'sasl.mechanism': 'SCRAM-SHA-512' })

print('start reading') consumer.subscribe(['testTopic1'])

while True: msg = consumer.poll(timeout=1.0) if msg is None: continue

if msg.error():
    if msg.error().code() == KafkaError._PARTITION_EOF:
        # End of partition event
        print('topic {} in partition {} reached end at offset {}'.format(msg.topic(), msg.partition(), msg.offset()))
    elif msg.error():
        raise KafkaException(msg.error())
else:
    print("message.offset={}, message.key={}, message.value={}".format(msg.offset(), msg.key(), msg.value()))
kotesh
gefragt vor 8 Monaten82 Aufrufe
Keine Antworten

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen