AWS Greengrass不会将数据发送到AWS Kinesis

0

【以下的问题经过翻译处理】 我的程序的主要目的是连接到一个传入的MQTT通道,并将接收到的数据发送到我的AWS Kinesis流中,名称为"MyKinesisStream"。 下面是我的代码: import argparse import logging import random

from paho.mqtt import client as mqtt_client from stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ReadMessagesOptions, )

broker = 'localhost' port = 1883 topic = "clients/test/hello/world" client_id = f'python-mqtt-{random.randint(0, 100)}' username = '...' password = '...'

logging.basicConfig(level=logging.INFO) logger = logging.getLogger()

args = ""

def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc)

client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client

def sendDataToKinesis( stream_name: str, kinesis_stream_name: str, payload, batch_size: int = None, ): try: print("Debug: sendDataToKinesis with params:", stream_name + " | ", kinesis_stream_name, " | ", batch_size) print("payload:", payload) print("type payload:", type(payload)) except Exception as e: print("Error while printing out the parameters", str(e)) logger.exception(e) try: # Create a client for the StreamManager kinesis_client = StreamManagerClient()

    # Try deleting the stream (if it exists) so that we have a fresh start
    try:
        kinesis_client.delete_message_stream(stream_name=stream_name)
    except ResourceNotFoundException:
        pass

    exports = ExportDefinition(
        kinesis=[KinesisConfig(
            identifier="KinesisExport" + stream_name,
            kinesis_stream_name=kinesis_stream_name,
            batch_size=batch_size,
        )]
    )
    kinesis_client.create_message_stream(
        MessageStreamDefinition(
            name=stream_name,
            strategy_on_full=StrategyOnFull.OverwriteOldestData,
            export_definition=exports
        )
    )

    sequence_no = kinesis_client.append_message(stream_name=stream_name, data=payload)
    print(
        "Successfully appended message to stream with sequence number ", sequence_no
    )

    readValue = kinesis_client.read_messages(stream_name, ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000))
    print("DEBUG read test: ", readValue)

except Exception as e:
    print("Exception while running: " + str(e))
    logger.exception(e)
finally:
    # Always close the client to avoid resource leaks
    print("closing connection")
    if kinesis_client:
        kinesis_client.close()

def subscribe(client: mqtt_client, args): def on_message(client, userdata, msg): print(f"Received {msg.payload.decode()} from {msg.topic} topic") sendDataToKinesis(args.greengrass_stream, args.kinesis_stream, msg.payload, args.batch_size)

client.subscribe(topic)
client.on_message = on_message

def run(args): mqtt_client_instance = connect_mqtt() subscribe(mqtt_client_instance, args) mqtt_client_instance.loop_forever()

def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument('--greengrass-stream', required=False, default='...') parser.add_argument('--kinesis-stream', required=False, default='MyKinesisStream') parser.add_argument('--batch-size', required=False, type=int, default=500) return parser.parse_args()

if name == 'main': args = parse_args() run(args) (敏感信息的部分已用省略号代替,但它们的值是正确的) 问题在于它无法将任何数据发送到我们的Kinesis流中。我从运行时获取到以下的STDOUT输出: 2022-11-25T12:13:47.640Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Connected to MQTT Broker!. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Received {"machineId":2, .... "timestamp":"2022-10-24T12:21:34.8777249Z","value":true} from clients/test/hello/world topic. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Debug: sendDataToKinesis with params: test | MyKinesisStream | 100. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. payload: b'{"machineId":2,... ,"timestamp":"2022-10-24T12:21:34.8777249Z","value":true}'. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. type payload: <class 'bytes'>. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Successfully appended message to stream with sequence number 0. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. DEBUG read test: [<Class Message. stream_name: 'test', sequence_number: 0, ingest_time: 1669376980985, payload: b'{"machineId":2,"mach'>]. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. closing connection. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}

所以我们可以看到数据从MQTT中到达,Python代码执行了追加消息的操作,而且似乎我的Kinesis流中有这些信息,因为它可以在下一步中读取到,然后在没有任何错误的情况下关闭连接。

但问题是,从AWS的角度来看,我们无法在流中看到数据到达: AWS控制台的屏幕截图https://i.stack.imgur.com/wN5I4.png

这里可能出现了什么问题?我们的Greengrass核心已正确配置,可以从AWS访问,并且组件也正在运行且正常: IoT Core状态的屏幕截图https://i.stack.imgur.com/JMJmn.png StreamManager组件的状态的屏幕截图https://i.stack.imgur.com/2qnfn.png

profile picture
EXPERTE
gefragt vor 5 Monaten46 Aufrufe
1 Antwort
0

【以下的回答经过翻译处理】 Hi,

每次追加消息时,您都会删除流。由于流中只包含一条消息,您可能没有达到StreamManager上传所需的批量大小的最低要求。

您应该只创建一次StreamManager客户端和流初始化,并在追加数据时重复使用它们。您还可以考虑减小批处理大小。

祝你好运,- Joe

profile picture
EXPERTE
beantwortet vor 5 Monaten

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen