I'm testing an example to subscribe to topic using Greengrass V2 and when I deploy lambda function works for a seconds and then I get this error:
In greengrass.log:
2022-12-10T01:58:15.724Z [DEBUG] (pool-7-thread-1) io.moquette.interception.BrokerInterceptor: Notifying MQTT PUBLISH message to interceptor. CId=ClienteDemoLab1, messageId=-1, topic=BlueMQTT/ClienteDemoLab1/0x00158d00054dc762, interceptorId=ClientDeviceConnectionTerminationListener. {}
2022-12-10T01:58:16.878Z [ERROR] (pool-1-thread-1) com.aws.greengrass.lambdamanager.WorkManager: work-item-timeout. lambda work item timed out. {workItem=c2f5ac43-c0a9-45ca-a54d-2ede26a45a40, arn=arn:aws:lambda:eu-west-1:812817525665:function:SubscribeToTopic:15}
2022-12-10T01:58:16.878Z [ERROR] (pool-1-thread-1) com.aws.greengrass.lambdamanager.UserLambdaService: service-errored. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=RUNNING}
com.aws.greengrass.lambdamanager.WorkItemTimeoutException: The work item with invocation id c2f5ac43-c0a9-45ca-a54d-2ede26a45a40 has timed out, worker lambda arn: arn:aws:lambda:eu-west-1:812817525665:function:SubscribeToTopic:15
at com.aws.greengrass.lambdamanager.WorkManager.lambda$getNextWork$12(WorkManager.java:332)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2022-12-10T01:58:16.879Z [DEBUG] (pool-1-thread-1) com.aws.greengrass.lambdamanager.UserLambdaService: service-report-state. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=RUNNING, newState=ERRORED}
2022-12-10T01:58:16.879Z [DEBUG] (pool-1-thread-1) com.aws.greengrass.lambdamanager.WorkManager: work-item-timeout. {InvocationType=Event, workItem=c2f5ac43-c0a9-45ca-a54d-2ede26a45a40, arn=arn:aws:lambda:eu-west-1:812817525665:function:SubscribeToTopic:15}
2022-12-10T01:58:16.879Z [INFO] (SubscribeToTopic-lifecycle) com.aws.greengrass.lambdamanager.UserLambdaService: service-set-state. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=RUNNING, newState=BROKEN}
In lambda function error:
2022-12-10T11:30:20.631Z [INFO] (pool-2-thread-454) SubscribeToTopic: shell-runner-start. {scriptName=services.SubscribeToTopic.lifecycle.shutdown.script, serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN, command=["/greengrass/v2/packages/artifacts/aws.greengrass.LambdaLauncher/2.0.10/lambda-..."]}
2022-12-10T11:30:20.633Z [DEBUG] (pool-2-thread-454) SubscribeToTopic: Created process with pid 20339. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:20.678Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: Destroy process. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:20.679Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: Process is running. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:20.679Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: signal process {"pid": 20331, "signal": "terminated"}. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:20.684Z [INFO] (pool-2-thread-458) SubscribeToTopic: lambda_runtime.py:370,Caught signal 15. Stopping runtime.. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:20.894Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: Process does not exist {"pid": 20331}. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:20.894Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: Destroy container. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:20.894Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: Run PostStop hooks. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:21.045Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: Unmounting with arguments {"dest": "/greengrass/v2/work/SubscribeToTopic/work/dns", "flags": 0}. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:21.055Z [DEBUG] (pool-2-thread-455) SubscribeToTopic: Unmounting with arguments {"dest": "/greengrass/v2/work/SubscribeToTopic/work/worker/w7idbgAuneQwQlZic8eKzIPDyP2j2Jr-PP3AY4L2v7M/overlays", "flags": 0}. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
2022-12-10T11:30:21.056Z [INFO] (pool-2-thread-458) SubscribeToTopic: Tearing down overlay tmpfs mounts. {serviceInstance=0, serviceName=SubscribeToTopic, currentState=BROKEN}
I tried to assing until 512MB of memory, and I checked it out of containers with the same response.
My function code is too simple, like samples:
import sys
import time
import traceback
import logging
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
SubscriptionResponseMessage,
UnauthorizedError
)
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
def lambda_handler(event, context):
print('Successfully BEGIN');
logger.info("SubscribeToTopic Lambda is ONLINE")
topic = "BlueMQTT/#"
try:
ipc_client = GreengrassCoreIPCClientV2()
# Subscription operations return a tuple with the response and the operation.
_, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event)
print('Successfully subscribed to topic: ' + topic)
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
# To stop subscribing, close the stream.
operation.close()
logger.info("SubscribeToTopic Lambda is END")
except UnauthorizedError:
print('Unauthorized error while subscribing to topic: ' +
topic, file=sys.stderr)
traceback.print_exc()
exit(1)
except Exception:
print('Exception occurred', file=sys.stderr)
traceback.print_exc()
exit(1)
return true;
def on_stream_event(event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, 'utf-8')
topic = event.binary_message.context.topic
print('Received new message on topic %s: %s' % (topic, message))
except:
traceback.print_exc()
def on_stream_error(error: Exception) -> bool:
print('Received a stream error.', file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed() -> None:
print('Subscribe to topic stream closed.')
Thanks
Thank you very much for answering Michael. The example that you indicate is the one that I have followed and as a component it works well.
I have the problem when using it in a lambda function, and as you say, it is more of a concept problem. If I understand you correctly, I shouldn't use the subscribe_to_topic method to read posts, just reading events and contexts would have everything I need. The
GreengrassCoreIPCClientV2()
should only be required to publish messages to AWS IoT. I'll make the changes and keep you informed, thank you very much. To be honest, I was obsessed with using a subscription in lambda functions when it's implicit within them.To end, why would it be better to use component than lambda function?
The example in the tutorial tells you to create a component and not a lambda function which is why the code doesn't work. Lambdas and components work quite differently.
You should prefer to use a standard component as lambda exists primarily for backward compatibility with Greengrass V1. The more natural and preferred way to run code in Greengrass V2 is with a component.