aws greengrass lambda function WorkItemTimeoutException ERROR

0

I'm testing an example to subscribe to topic using Greengrass V2 and when I deploy lambda function works for a seconds and then I get this error:

In greengrass.log:

2022-12-10T01:58:15.724Z [DEBUG] (pool-7-thread-1) io.moquette.interception.BrokerInterceptor: Notifying MQTT PUBLISH message to interceptor. CId=ClienteDemoLab1, messageId=-1, topic=BlueMQTT/ClienteDemoLab1/0x00158d00054dc762, interceptorId=ClientDeviceConnectionTerminationListener. {}
2022-12-10T01:58:16.878Z [ERROR] (pool-1-thread-1) com.aws.greengrass.lambdamanager.WorkManager: work-item-timeout. lambda work item timed out. {workItem=c2f5ac43-c0a9-45ca-a54d-2ede26a45a40, arn=arn:aws:lambda:eu-west-1:812817525665:function:SubscribeToTopic:15}
2022-12-10T01:58:16.878Z [ERROR] (pool-1-thread-1) com.aws.greengrass.lambdamanager.UserLambdaService: service-errored. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=RUNNING}
com.aws.greengrass.lambdamanager.WorkItemTimeoutException: The work item with invocation id c2f5ac43-c0a9-45ca-a54d-2ede26a45a40 has timed out, worker lambda arn: arn:aws:lambda:eu-west-1:812817525665:function:SubscribeToTopic:15
        at com.aws.greengrass.lambdamanager.WorkManager.lambda$getNextWork$12(WorkManager.java:332)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

2022-12-10T01:58:16.879Z [DEBUG] (pool-1-thread-1) com.aws.greengrass.lambdamanager.UserLambdaService: service-report-state. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=RUNNING, newState=ERRORED}
2022-12-10T01:58:16.879Z [DEBUG] (pool-1-thread-1) com.aws.greengrass.lambdamanager.WorkManager: work-item-timeout. {InvocationType=Event, workItem=c2f5ac43-c0a9-45ca-a54d-2ede26a45a40, arn=arn:aws:lambda:eu-west-1:812817525665:function:SubscribeToTopic:15}
2022-12-10T01:58:16.879Z [INFO] (SubscribeToTopic-lifecycle) com.aws.greengrass.lambdamanager.UserLambdaService: service-set-state. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=RUNNING, newState=BROKEN}

In lambda function error:

2022-12-10T11:30:20.631Z [INFO] (pool-2-thread-454) SubscribeToTopic: shell-runner-start. {scriptName=services.SubscribeToTopic.lifecycle.shutdown.script, serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN, command=["/greengrass/v2/packages/artifacts/aws.greengrass.LambdaLauncher/2.0.10/lambda-..."]}
2022-12-10T11:30:20.633Z [DEBUG] (pool-2-thread-454) SubscribeToTopic: Created process with pid 20339. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:20.678Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: Destroy process. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:20.679Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: Process is running. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}

2022-12-10T11:30:20.679Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: signal process   {"pid": 20331, "signal": "terminated"}. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:20.684Z [INFO] (pool-2-thread-458) SubscribeToTopic: lambda_runtime.py:370,Caught signal 15. Stopping runtime.. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:20.894Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: Process does not exist   {"pid": 20331}. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:20.894Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: Destroy container. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:20.894Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: Run PostStop hooks. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:21.045Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: Unmounting with arguments        {"dest": "/greengrass/v2/work/SubscribeToTopic/work/dns", "flags": 0}. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:21.055Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: Unmounting with arguments        {"dest": "/greengrass/v2/work/SubscribeToTopic/work/worker/w7idbgAuneQwQlZic8eKzIPDyP2j2Jr-PP3AY4L2v7M/overlays", "flags": 0}. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:21.056Z [INFO] (pool-2-thread-458) SubscribeToTopic: Tearing down overlay tmpfs mounts. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}

I tried to assing until 512MB of memory, and I checked it out of containers with the same response.

My function code is too simple, like samples:

import sys
import time
import traceback
import logging

from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
    SubscriptionResponseMessage,
    UnauthorizedError
)

logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)


def lambda_handler(event, context):
    print('Successfully BEGIN');
    logger.info("SubscribeToTopic Lambda is ONLINE")
    topic = "BlueMQTT/#"

    try:
        ipc_client = GreengrassCoreIPCClientV2()
        # Subscription operations return a tuple with the response and the operation.
        _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event)
        print('Successfully subscribed to topic: ' + topic)

        # Keep the main thread alive, or the process will exit.
        try:
            while True:
                time.sleep(10)
        except InterruptedError:
            print('Subscribe interrupted.')

        # To stop subscribing, close the stream.
        operation.close()
        logger.info("SubscribeToTopic Lambda is END")
        
    except UnauthorizedError:
        print('Unauthorized error while subscribing to topic: ' +
              topic, file=sys.stderr)
        traceback.print_exc()
        exit(1)
    except Exception:
        print('Exception occurred', file=sys.stderr)
        traceback.print_exc()
        exit(1)
        
    return true;


def on_stream_event(event: SubscriptionResponseMessage) -> None:
    try:
        message = str(event.binary_message.message, 'utf-8')
        topic = event.binary_message.context.topic
        print('Received new message on topic %s: %s' % (topic, message))
    except:
        traceback.print_exc()


def on_stream_error(error: Exception) -> bool:
    print('Received a stream error.', file=sys.stderr)
    traceback.print_exc()
    return False  # Return True to close stream, False to keep stream open.


def on_stream_closed() -> None:
    print('Subscribe to topic stream closed.')

Thanks

2 Answers
0

Hello,

This is timing out because you have put blocking code inside the lambda_handler method. The lambda handler will be invoked for every event the lambda is configured to trigger for. This means, for example, every message sent to a topic which you've configured your lambda with as an "event source". You should not be subscribing and waiting for a message inside of the lambda handler, rather the lambda handler will be called automatically for each message that comes in.

In the lambda handler you put only the code needed to handle the incoming message and nothing more. See what the message looks like by logging the event and context data.

I would recommend you to use a standard component though, rather than a lambda. This tutorial should help: https://docs.aws.amazon.com/greengrass/v2/developerguide/client-devices-tutorial.html#develop-client-device-subscriber-component.

Cheers, Michael

AWS
EXPERT
answered a year ago
  • Thank you very much for answering Michael. The example that you indicate is the one that I have followed and as a component it works well.

    I have the problem when using it in a lambda function, and as you say, it is more of a concept problem. If I understand you correctly, I shouldn't use the subscribe_to_topic method to read posts, just reading events and contexts would have everything I need. The GreengrassCoreIPCClientV2() should only be required to publish messages to AWS IoT. I'll make the changes and keep you informed, thank you very much. To be honest, I was obsessed with using a subscription in lambda functions when it's implicit within them.

    To end, why would it be better to use component than lambda function?

  • The example in the tutorial tells you to create a component and not a lambda function which is why the code doesn't work. Lambdas and components work quite differently.

    You should prefer to use a standard component as lambda exists primarily for backward compatibility with Greengrass V1. The more natural and preferred way to run code in Greengrass V2 is with a component.

0

I have more logs from lambda function. I had to delete the previous function because it was having strange behaviors, I preferred to start again with other function. Logs are of redeploying lambda function.

These are the logs, I don't understand why lambda function exit, do you think there may be an error in the code?

Logs are very big, I published them in this url:

https://pastebin.com/dNG0q3s9

Thanks in advance.

cespar
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions