AWS Greengrass不会将数据发送到AWS Kinesis

0

【以下的问题经过翻译处理】 我的程序的主要目的是连接到一个传入的MQTT通道,并将接收到的数据发送到我的AWS Kinesis流中,名称为"MyKinesisStream"。 下面是我的代码: import argparse import logging import random

from paho.mqtt import client as mqtt_client from stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ReadMessagesOptions, )

broker = 'localhost' port = 1883 topic = "clients/test/hello/world" client_id = f'python-mqtt-{random.randint(0, 100)}' username = '...' password = '...'

logging.basicConfig(level=logging.INFO) logger = logging.getLogger()

args = ""

def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc)

client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client

def sendDataToKinesis( stream_name: str, kinesis_stream_name: str, payload, batch_size: int = None, ): try: print("Debug: sendDataToKinesis with params:", stream_name + " | ", kinesis_stream_name, " | ", batch_size) print("payload:", payload) print("type payload:", type(payload)) except Exception as e: print("Error while printing out the parameters", str(e)) logger.exception(e) try: # Create a client for the StreamManager kinesis_client = StreamManagerClient()

    # Try deleting the stream (if it exists) so that we have a fresh start
    try:
        kinesis_client.delete_message_stream(stream_name=stream_name)
    except ResourceNotFoundException:
        pass

    exports = ExportDefinition(
        kinesis=[KinesisConfig(
            identifier="KinesisExport" + stream_name,
            kinesis_stream_name=kinesis_stream_name,
            batch_size=batch_size,
        )]
    )
    kinesis_client.create_message_stream(
        MessageStreamDefinition(
            name=stream_name,
            strategy_on_full=StrategyOnFull.OverwriteOldestData,
            export_definition=exports
        )
    )

    sequence_no = kinesis_client.append_message(stream_name=stream_name, data=payload)
    print(
        "Successfully appended message to stream with sequence number ", sequence_no
    )

    readValue = kinesis_client.read_messages(stream_name, ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000))
    print("DEBUG read test: ", readValue)

except Exception as e:
    print("Exception while running: " + str(e))
    logger.exception(e)
finally:
    # Always close the client to avoid resource leaks
    print("closing connection")
    if kinesis_client:
        kinesis_client.close()

def subscribe(client: mqtt_client, args): def on_message(client, userdata, msg): print(f"Received {msg.payload.decode()} from {msg.topic} topic") sendDataToKinesis(args.greengrass_stream, args.kinesis_stream, msg.payload, args.batch_size)

client.subscribe(topic)
client.on_message = on_message

def run(args): mqtt_client_instance = connect_mqtt() subscribe(mqtt_client_instance, args) mqtt_client_instance.loop_forever()

def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument('--greengrass-stream', required=False, default='...') parser.add_argument('--kinesis-stream', required=False, default='MyKinesisStream') parser.add_argument('--batch-size', required=False, type=int, default=500) return parser.parse_args()

if name == 'main': args = parse_args() run(args) (敏感信息的部分已用省略号代替,但它们的值是正确的) 问题在于它无法将任何数据发送到我们的Kinesis流中。我从运行时获取到以下的STDOUT输出: 2022-11-25T12:13:47.640Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Connected to MQTT Broker!. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Received {"machineId":2, .... "timestamp":"2022-10-24T12:21:34.8777249Z","value":true} from clients/test/hello/world topic. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Debug: sendDataToKinesis with params: test | MyKinesisStream | 100. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. payload: b'{"machineId":2,... ,"timestamp":"2022-10-24T12:21:34.8777249Z","value":true}'. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. type payload: <class 'bytes'>. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Successfully appended message to stream with sequence number 0. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. DEBUG read test: [<Class Message. stream_name: 'test', sequence_number: 0, ingest_time: 1669376980985, payload: b'{"machineId":2,"mach'>]. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. closing connection. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}

所以我们可以看到数据从MQTT中到达,Python代码执行了追加消息的操作,而且似乎我的Kinesis流中有这些信息,因为它可以在下一步中读取到,然后在没有任何错误的情况下关闭连接。

但问题是,从AWS的角度来看,我们无法在流中看到数据到达: AWS控制台的屏幕截图https://i.stack.imgur.com/wN5I4.png

这里可能出现了什么问题?我们的Greengrass核心已正确配置,可以从AWS访问,并且组件也正在运行且正常: IoT Core状态的屏幕截图https://i.stack.imgur.com/JMJmn.png StreamManager组件的状态的屏幕截图https://i.stack.imgur.com/2qnfn.png

profile picture
专家
已提问 5 个月前42 查看次数
1 回答
0

【以下的回答经过翻译处理】 Hi,

每次追加消息时,您都会删除流。由于流中只包含一条消息,您可能没有达到StreamManager上传所需的批量大小的最低要求。

您应该只创建一次StreamManager客户端和流初始化,并在追加数据时重复使用它们。您还可以考虑减小批处理大小。

祝你好运,- Joe

profile picture
专家
已回答 5 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则