Greengrass StreamManager export error

0

I have a custom gg component that collects multiple data streams and publishes to a single Kinesis stream. Stream manager ingests data correctly (I can see the data in the work folder), but I also have a lot of log errors that prevent data from coming to Kinesis.

  • All data streams are newly created and were not used before
  • 3 data streams that collect different data in the same gg component
  • StreamManagerClient instance is reused among them

Error log that I see a lot for a different streams (including new ones)

2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. 2023 Oct 06 14:04:39,680 [ERROR] (pool-7-thread-2) com.amazonaws.iot.greengrass.streammanager.export.upload.MessageUploaderTask: Encountered Throwable when exporting messages. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. com.amazonaws.iot.greengrass.streammanager.store.exceptions.InvalidStreamPositionException: Unable to find sequence number 2888279 in log file for stream iot. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at com.amazonaws.iot.greengrass.streammanager.store.log.FileLogSegment.read(FileLogSegment.java:274) ~[AWSGreengrassGreenlake-1.0-super.jar:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at com.amazonaws.iot.greengrass.streammanager.store.log.MessageStreamLog.read(MessageStreamLog.java:379) ~[AWSGreengrassGreenlake-1.0-super.jar:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at com.amazonaws.iot.greengrass.streammanager.store.log.MessageInputStreamHandleLogImpl.read(MessageInputStreamHandleLogImpl.java:32) ~[AWSGreengrassGreenlake-1.0-super.jar:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at com.amazonaws.iot.greengrass.streammanager.export.upload.MessageUploaderTask.upload(MessageUploaderTask.java:66) ~[AWSGreengrassGreenlake-1.0-super.jar:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at java.lang.Thread.run(Thread.java:829) [?:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}

Component logic


STREAM_MANAGER_CLIENT = StreamManagerClient()

def on_a():
    ...
    STREAM_MANAGER_CLIENT.append_message("Av1", data=encoded_msg)

def on_b():
    ...
    STREAM_MANAGER_CLIENT.append_message("Bv1", data=encoded_msg)

def on_c():
    ...
    STREAM_MANAGER_CLIENT.append_message("Cv1", data=encoded_msg)


def create_stream_definition(identifier, version):
    kinesisConfig = KinesisConfig(identifier="KinesisExport" + identifier + version, kinesis_stream_name=STREAM_NAME)
    exportDefinition = ExportDefinition(kinesis=[kinesisConfig])

    streamDefinition = MessageStreamDefinition(
        name=identifier + version,  # Required.
        stream_segment_size=1024,
        strategy_on_full=StrategyOnFull.OverwriteOldestData,  # Required.
        export_definition=exportDefinition
    )

    return streamDefinition


def init_stream_client():
    try:
        STREAM_MANAGER_CLIENT.create_message_stream(create_stream_definition("A", "v1"))
        STREAM_MANAGER_CLIENT.create_message_stream(create_stream_definition("B", "v1"))
        STREAM_MANAGER_CLIENT.create_message_stream(create_stream_definition("C", "v1"))
    except InvalidRequestException as e:
        print("Exception occurred", file=sys.stderr)
        traceback.print_exc()

        if "the message stream already exists" in str(e):
            print("Message stream exists. Ignoring exception. ")
        else:
            raise

if __name__ == '__main__':
    init_stream_client()
  • Hello, did you manually delete anything from the disk? Did the device lose power or shutdown unsafely? This error indicates that data was lost due to corruption, but ultimately it should not really be a problem. Stream manager will retry forever and will eventually recover as old data is overwritten. You may want to delete the stream using the delete stream API or reset the Kinesis export for the effected stream to start from sequence number 0.

  • Hi,

    An external battery powers my device, and I may lose power without notification. When this happens, StreamManager will not upload existing data until the "corrupted" block is overwritten, which may never happen due to new "corrupted" blocks happening.

    Is there any way to configure StreamManager to flush data on disk after each insert (to prevent a mismatch between data and log) or ignore corrupted blocks?

  • Not at the moment, no. We are working on resiliency for shadow manager, but you may want to make some changes on your end as well. Some things to consider; change the filesystem options to use data=journal for ext filesystems. You may also consider if you need shadow manager or if you can just use shadow directly over MQTT using the Greengrass IPC APIs which allow you access to MQTT.

Ruslan
질문됨 7달 전108회 조회
답변 없음

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인