Greengrass Stream Manager | Missing Messages When Retrying Kinesis Export

0

Intro

I'm verifying the retry capability of aws.greengrass.StreamManager == 2.1.1 and noticed some messages was missing during retries.

Setup

Diagram below depicts what I'm trying to do:
I'm running greengrass on a device that has internet access through ethernet. My custom component is appending to a StreamManager's message stream ~400 bytes of data every 15 seconds. The message stream is configured with kinesis export like so:

kinesis_export = ExportDefinition(
            kinesis=[KinesisConfig(identifier="KinesisExport" + self.modbus_data_stream_name, kinesis_stream_name=kinesis_stream_name, batch_size=5)]
        )
client.create_message_stream(
            MessageStreamDefinition(
                name=self.modbus_data_stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=kinesis_export
            )
        )

The aws.greengrass.StreamManager is running version 2.1.1 and all default configurations.
On my PC, I'm running a script using boto3 to continuously read data from kinesis. As the program is written, whenever it reads a record from Kinesis, it prints two things: a) the current timestamp and b) the timestamp when the data was generated from the device. The script is attached to the end of the post. Enter image description here

Experimentation

Nominal Run

I ran my device with ethernet connected in parallel with the script on PC and the log appears as shown below.
A few observations that's consistent with my expectations:

  1. Each Record has ~ 5 messages (because batch size = 5)
  2. Time gap between each message is ~15 seconds (because messages appended to stream every 15 seconds)
  3. Records are hitting Kinesis every ~75 seconds (which I believe came from batch size [5 msgs] x append rate [15 second/msg] )

The format of the log is such:
Found record for MyDevice in Kinesis at [time when PC received record in YY-MM-DD_hh:mm:ss]
[time when message was created on device in YY-MM-DD_hh:mm:ss]

Enter image description here

Fault Injection

Then I unplugged the ethernet from the device, effectively disconnected the device from the internet.
As I expected, the PC no longer receives any record from the cloud during this time.

Resume

I then re-connected the ethernet to the device. ~4 min later, I observed a surge of records hitting the PC program that has message time consecutive to when the internet was disconnected, which means the previously failed kinesis exports are being retried (Good!).
However, I'm also observing large time gaps (relative to 15 seconds expected time gap) between the retried messages indicating that some messages are missing.

In the log I captured below, the internet was disconnected at ~22-12-14_09:33:xx and resumed at ~22-12-14_10:28:xx
I marked the time gaps with !!! manually for better visibility
I noticed the following gaps in the device timestamps:
a. 22-12-14_09:34:54 to 22-12-14_09:37:39 - 2 minutes 45 seconds gap = 11 messages missing
b. 22-12-14_09:39:55 to 22-12-14_09:41:25 - 1 minute 30 seconds gap = 6 messages missing
c. 22-12-14_09:44:56 to 22-12-14_09:51:27 - 6 minutes 31 seconds gap = 26 messages missing

Found record for MyDevice in Kinesis at 22-12-14_09:32:27
22-12-14_09:31:23
Found record for MyDevice in Kinesis at 22-12-14_09:32:27
22-12-14_09:31:38
Found record for MyDevice in Kinesis at 22-12-14_09:32:27
22-12-14_09:31:53
Found record for MyDevice in Kinesis at 22-12-14_09:32:27
22-12-14_09:32:08
Found record for MyDevice in Kinesis at 22-12-14_09:32:27
22-12-14_09:32:24
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:32:39
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:32:54
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:33:09
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:33:24
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:33:39
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:33:54
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:34:09
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:34:24
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:34:39
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
!!! 22-12-14_09:34:54
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
!!! 22-12-14_09:37:39
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:37:54
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:38:10
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:38:25
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:38:40
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:38:55
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:39:10
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:39:25
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:39:40
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
!!! 22-12-14_09:39:55
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
!!! 22-12-14_09:41:25
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:41:40
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:41:55
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:42:10
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:42:25
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:42:40
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:42:55
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:43:10
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:43:25
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:43:40
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:43:56
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:44:11
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:44:26
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:44:41
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
!!! 22-12-14_09:44:56
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
!!! 22-12-14_09:51:27
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:51:42
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:51:57

Continue

I let the PC program keep running for another 5 hours thinking if the StreamManager would retry the missing messages again, but I did not see any trace of them.

Question

Based on my observation, aws.greengrass.StreamManager does not reliably retransmit all previously failed-to-export data to Kinesis. Did I misunderstand the capability of StreamManager? Or is this a bug?


Kinesis Read Script

import logging as logger
from botocore.exceptions import ClientError
import boto3
from datetime import datetime

class KinesisStream:
    """Encapsulates a Kinesis stream."""
    def __init__(self, kinesis_client):
        """
        :param kinesis_client: A Boto3 Kinesis client.
        """
        self.kinesis_client = kinesis_client
        self.name = None
        self.details = None
        self.stream_exists_waiter = kinesis_client.get_waiter('stream_exists')

    def get_records(self, max_records):
        """
        Gets records from the stream. This function is a generator that first gets
        a shard iterator for the stream, then uses the shard iterator to get records
        in batches from the stream. Each batch of records is yielded back to the
        caller until the specified maximum number of records has been retrieved.

        :param max_records: The maximum number of records to retrieve.
        :return: Yields the current batch of retrieved records.
        """
        try:
            response = self.kinesis_client.get_shard_iterator(
                StreamName=self.name, ShardId=self.details['Shards'][0]['ShardId'],
                ShardIteratorType='LATEST')

            shard_iter = response['ShardIterator']
            record_count = 0
            while record_count < max_records:
                response = self.kinesis_client.get_records(
                    ShardIterator=shard_iter, Limit=10)
                shard_iter = response['NextShardIterator']
                records = response['Records']
                logger.info("Got %s records.", len(records))
                record_count += len(records)
                yield records
        except ClientError:
            logger.exception("Couldn't get records from stream %s.", self.name)
            raise

    def describe(self, name):
        """
        Gets metadata about a stream.
        :param name: The name of the stream.
        :return: Metadata about the stream.
        """
        try:
            response = self.kinesis_client.describe_stream(StreamName=name)
            self.name = name
            self.details = response['StreamDescription']
            logger.info("Got stream %s.", name)
        except ClientError:
            logger.exception("Couldn't get %s.", name)
            raise
        else:
            return self.details

def loads_compressed_data(compressed_data: bytes) -> dict:
    import zlib
    import json
    return json.loads(zlib.decompress(compressed_data).decode())

def read_kinesis(DeviceThingName):
    while True:
        for records in kinesis.get_records(10):
            if records:
                for r in records:
                    data = loads_compressed_data(r['Data'])
                    if data["device_id"] == DeviceThingName:
                        print(f"Found record for {DeviceThingName} in Kinesis at {datetime.now().strftime('%y-%m-%d_%H:%M:%S')}")
                        ts = data['measurement_ts']
                        measurement_ts = datetime.fromtimestamp(ts).strftime("%y-%m-%d_%H:%M:%S")
                        print(f"{measurement_ts}")

# Remember to update credential using aws-cli configure
session = boto3.Session(profile_name="ecotec-dev", region_name='us-west-2')
client = session.client('kinesis')
kinesis = KinesisStream(client)
# need to describe kinesis before getting record
details= kinesis.describe("raw_data")
print(f"details {details}")
read_kinesis("Moxa-UC-8112A-TBBCB1098353")

Edit Regarding ochoanel's comment on 12/15/2022

Thanks for your follow-up and suggestions.

Changes I Made for Better Debug

I added the read_message API to my device code to confirm that my message is successfully written to the StreamManger persistent files.
Here's a snippet of code I added:
Enter image description here

The measurement_ts is the timestamp of when the message is created.

I also changed the rate of append_message to every 5 seconds instead of 15, just to speed up debug.

Logs Collected

I started my custom component and the PC program ~12:30, then I unplugged the ethernet ~12:33:20 for ~10 minutes. The following is a part of the device log (I'm not able to paste it all here due to text limit) Enter image description here Enter image description here Enter image description here
Based on the logs, my messages are indeed hitting the StreamManager.

However, I'm still seeing missing messages on my PC program. Which could be caused by any of the following:

  1. StreamManager did not export the data to Kinesis
  2. My PC program did not read all data that reached Kinesis

I'm honestly suspecting No.2 to be the reason... I'm working on a new tool to replace my PC program.

ictwist
asked a year ago314 views
1 Answer
1
Accepted Answer

Hey ictwist,

Thanks for the detailed explanation. We will have to look further into this issue to determine if it is a bug on StreamManager or not. For us to debug the issue, could you provide:

  1. Anything you see on the logs between and around the times there are gaps
  2. (If possible and you are not exposing any private/proprietary information) provide the code on your component (you posted a few lines but having the full code could help us try to reproduce it on our end more easily)

It seems like your component is reading from a controller using modbus (this is just a guess based on modbus_data_stream_name), is there a chance the component fails to post new messages to the stream manager? (seeing what the custom component is doing would help). One thing you could do in the meantime is add on your custom component a call to client.read_messages and log what it returns. When a stream message gets appended to a stream manager stream, it gets stored into disk before being exported to kinesis, that way we could tell if there is one issue with:

  1. An issue with the custom component failing to post messages to the local stream manager stream on the device
  2. Messages not being saved by the stream manager to disk
  3. Failures exporting the messages to Kinesis
AWS
answered a year ago
  • Thank for your follow-up. please see my response in Edit section of the original post. I think the problem might have been my PC program.

  • Just to follow up, I was able to verify the data in Kinesis in a different way - using lambdas and cloudWatch - and I found all the "missing" messages. I will continue to soak test StreamManager but you may treat this post as a false alarm at the moment.

  • Nice to hear that, and happy to help. Keep us posted if you find any issues. Good luck!

  • @ictwist, your script to read from Kinesis only read a single shard in Kinesis. You may have configured more than 1 shard or autoscaling which may create more than one shard. Stream manager will write to shards randomly to distribute the load and increase maximum throughput. I'd suspect this is the problem given that you were able to find the data using other means.

  • @MichaelDombrowski-AWS You're correct

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions