Intro
I'm verifying the retry capability of aws.greengrass.StreamManager == 2.1.1
and noticed some messages was missing during retries.
Setup
Diagram below depicts what I'm trying to do:
I'm running greengrass on a device that has internet access through ethernet. My custom component is appending to a StreamManager's message stream ~400 bytes of data every 15 seconds. The message stream is configured with kinesis export like so:
kinesis_export = ExportDefinition(
kinesis=[KinesisConfig(identifier="KinesisExport" + self.modbus_data_stream_name, kinesis_stream_name=kinesis_stream_name, batch_size=5)]
)
client.create_message_stream(
MessageStreamDefinition(
name=self.modbus_data_stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=kinesis_export
)
)
The aws.greengrass.StreamManager
is running version 2.1.1
and all default configurations.
On my PC, I'm running a script using boto3
to continuously read data from kinesis. As the program is written, whenever it reads a record from Kinesis, it prints two things: a) the current timestamp and b) the timestamp when the data was generated from the device. The script is attached to the end of the post.
Experimentation
Nominal Run
I ran my device with ethernet connected in parallel with the script on PC and the log appears as shown below.
A few observations that's consistent with my expectations:
- Each Record has ~ 5 messages (because batch size = 5)
- Time gap between each message is ~15 seconds (because messages appended to stream every 15 seconds)
- Records are hitting Kinesis every ~75 seconds (which I believe came from batch size [5 msgs] x append rate [15 second/msg] )
The format of the log is such:
Found record for MyDevice in Kinesis at [time when PC received record in YY-MM-DD_hh:mm:ss]
[time when message was created on device in YY-MM-DD_hh:mm:ss]
Fault Injection
Then I unplugged the ethernet from the device, effectively disconnected the device from the internet.
As I expected, the PC no longer receives any record from the cloud during this time.
Resume
I then re-connected the ethernet to the device. ~4 min later, I observed a surge of records hitting the PC program that has message time consecutive to when the internet was disconnected, which means the previously failed kinesis exports are being retried (Good!).
However, I'm also observing large time gaps (relative to 15 seconds expected time gap) between the retried messages indicating that some messages are missing.
In the log I captured below, the internet was disconnected at ~22-12-14_09:33:xx and resumed at ~22-12-14_10:28:xx
I marked the time gaps with !!! manually for better visibility
I noticed the following gaps in the device timestamps:
a. 22-12-14_09:34:54 to 22-12-14_09:37:39 - 2 minutes 45 seconds gap = 11 messages missing
b. 22-12-14_09:39:55 to 22-12-14_09:41:25 - 1 minute 30 seconds gap = 6 messages missing
c. 22-12-14_09:44:56 to 22-12-14_09:51:27 - 6 minutes 31 seconds gap = 26 messages missing
Found record for MyDevice in Kinesis at 22-12-14_09:32:27
22-12-14_09:31:23
Found record for MyDevice in Kinesis at 22-12-14_09:32:27
22-12-14_09:31:38
Found record for MyDevice in Kinesis at 22-12-14_09:32:27
22-12-14_09:31:53
Found record for MyDevice in Kinesis at 22-12-14_09:32:27
22-12-14_09:32:08
Found record for MyDevice in Kinesis at 22-12-14_09:32:27
22-12-14_09:32:24
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:32:39
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:32:54
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:33:09
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:33:24
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:33:39
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:33:54
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:34:09
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:34:24
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:34:39
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
!!! 22-12-14_09:34:54
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
!!! 22-12-14_09:37:39
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:37:54
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:38:10
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:38:25
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:38:40
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:38:55
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:39:10
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:39:25
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:39:40
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
!!! 22-12-14_09:39:55
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
!!! 22-12-14_09:41:25
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:41:40
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:41:55
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:42:10
Found record for MyDevice in Kinesis at 22-12-14_10:32:54
22-12-14_09:42:25
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:42:40
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:42:55
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:43:10
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:43:25
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:43:40
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:43:56
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:44:11
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:44:26
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:44:41
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
!!! 22-12-14_09:44:56
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
!!! 22-12-14_09:51:27
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:51:42
Found record for MyDevice in Kinesis at 22-12-14_10:32:55
22-12-14_09:51:57
Continue
I let the PC program keep running for another 5 hours thinking if the StreamManager would retry the missing messages again, but I did not see any trace of them.
Question
Based on my observation, aws.greengrass.StreamManager
does not reliably retransmit all previously failed-to-export data to Kinesis. Did I misunderstand the capability of StreamManager? Or is this a bug?
Kinesis Read Script
import logging as logger
from botocore.exceptions import ClientError
import boto3
from datetime import datetime
class KinesisStream:
"""Encapsulates a Kinesis stream."""
def __init__(self, kinesis_client):
"""
:param kinesis_client: A Boto3 Kinesis client.
"""
self.kinesis_client = kinesis_client
self.name = None
self.details = None
self.stream_exists_waiter = kinesis_client.get_waiter('stream_exists')
def get_records(self, max_records):
"""
Gets records from the stream. This function is a generator that first gets
a shard iterator for the stream, then uses the shard iterator to get records
in batches from the stream. Each batch of records is yielded back to the
caller until the specified maximum number of records has been retrieved.
:param max_records: The maximum number of records to retrieve.
:return: Yields the current batch of retrieved records.
"""
try:
response = self.kinesis_client.get_shard_iterator(
StreamName=self.name, ShardId=self.details['Shards'][0]['ShardId'],
ShardIteratorType='LATEST')
shard_iter = response['ShardIterator']
record_count = 0
while record_count < max_records:
response = self.kinesis_client.get_records(
ShardIterator=shard_iter, Limit=10)
shard_iter = response['NextShardIterator']
records = response['Records']
logger.info("Got %s records.", len(records))
record_count += len(records)
yield records
except ClientError:
logger.exception("Couldn't get records from stream %s.", self.name)
raise
def describe(self, name):
"""
Gets metadata about a stream.
:param name: The name of the stream.
:return: Metadata about the stream.
"""
try:
response = self.kinesis_client.describe_stream(StreamName=name)
self.name = name
self.details = response['StreamDescription']
logger.info("Got stream %s.", name)
except ClientError:
logger.exception("Couldn't get %s.", name)
raise
else:
return self.details
def loads_compressed_data(compressed_data: bytes) -> dict:
import zlib
import json
return json.loads(zlib.decompress(compressed_data).decode())
def read_kinesis(DeviceThingName):
while True:
for records in kinesis.get_records(10):
if records:
for r in records:
data = loads_compressed_data(r['Data'])
if data["device_id"] == DeviceThingName:
print(f"Found record for {DeviceThingName} in Kinesis at {datetime.now().strftime('%y-%m-%d_%H:%M:%S')}")
ts = data['measurement_ts']
measurement_ts = datetime.fromtimestamp(ts).strftime("%y-%m-%d_%H:%M:%S")
print(f"{measurement_ts}")
# Remember to update credential using aws-cli configure
session = boto3.Session(profile_name="ecotec-dev", region_name='us-west-2')
client = session.client('kinesis')
kinesis = KinesisStream(client)
# need to describe kinesis before getting record
details= kinesis.describe("raw_data")
print(f"details {details}")
read_kinesis("Moxa-UC-8112A-TBBCB1098353")
Edit Regarding ochoanel
's comment on 12/15/2022
Thanks for your follow-up and suggestions.
Changes I Made for Better Debug
I added the read_message
API to my device code to confirm that my message is successfully written to the StreamManger persistent files.
Here's a snippet of code I added:
The measurement_ts
is the timestamp of when the message is created.
I also changed the rate of append_message
to every 5 seconds instead of 15, just to speed up debug.
Logs Collected
I started my custom component and the PC program ~12:30, then I unplugged the ethernet ~12:33:20 for ~10 minutes.
The following is a part of the device log (I'm not able to paste it all here due to text limit)
Based on the logs, my messages are indeed hitting the StreamManager.
However, I'm still seeing missing messages on my PC program. Which could be caused by any of the following:
- StreamManager did not export the data to Kinesis
- My PC program did not read all data that reached Kinesis
I'm honestly suspecting No.2 to be the reason... I'm working on a new tool to replace my PC program.
Thank for your follow-up. please see my response in Edit section of the original post. I think the problem might have been my PC program.
Just to follow up, I was able to verify the data in Kinesis in a different way - using lambdas and cloudWatch - and I found all the "missing" messages. I will continue to soak test StreamManager but you may treat this post as a false alarm at the moment.
Nice to hear that, and happy to help. Keep us posted if you find any issues. Good luck!
@ictwist, your script to read from Kinesis only read a single shard in Kinesis. You may have configured more than 1 shard or autoscaling which may create more than one shard. Stream manager will write to shards randomly to distribute the load and increase maximum throughput. I'd suspect this is the problem given that you were able to find the data using other means.
@MichaelDombrowski-AWS You're correct