AWS Greengrass doesn't send data to AWS Kinesis

0

The main purpose of my program is to connect to an incoming MQTT channel, and send the data received to my AWS Kinesis Stream called "MyKinesisStream".

Here is my code:

import argparse
import logging
import random

from paho.mqtt import client as mqtt_client
from stream_manager import (
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient, ReadMessagesOptions,
)

broker = 'localhost'
port = 1883
topic = "clients/test/hello/world"
client_id = f'python-mqtt-{random.randint(0, 100)}'
username = '...'
password = '...'

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

args = ""

def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def sendDataToKinesis(
        stream_name: str,
        kinesis_stream_name: str,
        payload,
        batch_size: int = None,
):
    try:
        print("Debug: sendDataToKinesis with params:", stream_name + " | ", kinesis_stream_name, " | ", batch_size)
        print("payload:", payload)
        print("type payload:", type(payload))
    except Exception as e:
        print("Error while printing out the parameters", str(e))
        logger.exception(e)
    try:
        # Create a client for the StreamManager
        kinesis_client = StreamManagerClient()

        # Try deleting the stream (if it exists) so that we have a fresh start
        try:
            kinesis_client.delete_message_stream(stream_name=stream_name)
        except ResourceNotFoundException:
            pass

        exports = ExportDefinition(
            kinesis=[KinesisConfig(
                identifier="KinesisExport" + stream_name,
                kinesis_stream_name=kinesis_stream_name,
                batch_size=batch_size,
            )]
        )
        kinesis_client.create_message_stream(
            MessageStreamDefinition(
                name=stream_name,
                strategy_on_full=StrategyOnFull.OverwriteOldestData,
                export_definition=exports
            )
        )

        sequence_no = kinesis_client.append_message(stream_name=stream_name, data=payload)
        print(
            "Successfully appended message to stream with sequence number ", sequence_no
        )

        readValue = kinesis_client.read_messages(stream_name, ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000))
        print("DEBUG read test: ", readValue)

    except Exception as e:
        print("Exception while running: " + str(e))
        logger.exception(e)
    finally:
        # Always close the client to avoid resource leaks
        print("closing connection")
        if kinesis_client:
            kinesis_client.close()


def subscribe(client: mqtt_client, args):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
        sendDataToKinesis(args.greengrass_stream, args.kinesis_stream, msg.payload, args.batch_size)

    client.subscribe(topic)
    client.on_message = on_message


def run(args):
    mqtt_client_instance = connect_mqtt()
    subscribe(mqtt_client_instance, args)
    mqtt_client_instance.loop_forever()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument('--greengrass-stream', required=False, default='...')
    parser.add_argument('--kinesis-stream', required=False, default='MyKinesisStream')
    parser.add_argument('--batch-size', required=False, type=int, default=500)
    return parser.parse_args()

if __name__ == '__main__':
    args = parse_args()
    run(args)

(the dotted parts ... are commented out as they are sensitive information, but they are correct values.)

The problem is that it just won't send any data to our kinesis stream. I get the following STDOUT from the run:

2022-11-25T12:13:47.640Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Connected to MQTT Broker!. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Received `{"machineId":2, .... "timestamp":"2022-10-24T12:21:34.8777249Z","value":true}` from `clients/test/hello/world` topic. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Debug: sendDataToKinesis with params: test |  MyKinesisStream  |  100. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. payload: b'{"machineId":2,... ,"timestamp":"2022-10-24T12:21:34.8777249Z","value":true}'. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. type payload: <class 'bytes'>. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Successfully appended message to stream with sequence number  0. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. DEBUG read test:  [<Class Message. stream_name: 'test', sequence_number: 0, ingest_time: 1669376980985, payload: b'{"machineId":2,"mach'>]. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. closing connection. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}

So we can see that the data arrives from MQTT, the python code executes the append message, and it seems that my kinesis streams have the information as it can read it in the next step... then closes the connection without any error.

But the problem is, that from AWS side, we cannot see the data arriving on the stream: screnshot of the aws console

What can be the problem here? Our greengrass core is configured properly, can be accessed from the AWS, and the Component is running and healthy also: Screenshot of IoT Core status Screenshot of the state if the StreamManager component

ForestG
asked a year ago264 views
1 Answer
1
Accepted Answer

Hi,

You are deleting your stream every time you append a message to it. Since the stream only ever contains a single message, you likely aren't hitting the batch_size minimum in order for StreamManager to upload.

You'll want create your StreamManager client and stream initialization a single time, and then re-use them when appending data. You may also want to consider reducing your batch size.

Good luck, -joe

AWS
answered a year ago
  • Thank you, your solution worked for us. I guess we misunderstood the official examples from here https://docs.aws.amazon.com/greengrass/v2/developerguide/work-with-streams.html#streammanagerclient-append-message where they explicitly tell that we should close a connection :\

  • Great! In regards to the confusing documentation, are you referring to this note? If yes, this refers specifically to Lambda functions

    If you do instantiate StreamManagerClient in the handler, you must explicitly call the close() method when the client completes its work. Otherwise, the client keeps the connection open and another thread running until the script exits.
    

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions