I have a sample lambda that uses the newly introduced StreamManager. The main idea is a device publishes data to a channel and Greengrass lambda is subscribed to that channel. When a data is received, it writes the data to StreamManager. StreamManager exports the data to Kinesis.
Sometimes (after deployments) I get the following error in the lambda log:
ERROR-streammanagerclient.py:177,Unable to read from socket, likely socket is closed or server died
My lambda is pinned (long-running) and the code is (Python 3.7):
import asyncio
import logging
import random
import time
from greengrasssdk.stream_manager import (
ExportDefinition,
KinesisConfig,
MessageStreamDefinition,
ReadMessagesOptions,
ResourceNotFoundException,
StrategyOnFull,
StreamManagerClient,
)
stream_name = "FarmDataStream"
iot_channel_name = "farmdatachannel"
kinesis_stream_name = "farmDataKinesisStream"
# Create a client for the StreamManager
client = StreamManagerClient()
try:
client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
pass
exports = ExportDefinition(
kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)]
)
client.create_message_stream(
MessageStreamDefinition(
name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
)
)
# initialize the logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info(event)
global stream_name
client.append_message(stream_name, event)
return