Skip to content

Subscribing to LocalMqtt topic from custom component in IoT GreengrassCoreV2

0

I have a greengrass core device gg_core_device_1 with a custom component that subscribes to topic sensor_data. The component starts up fine and also subscribes to the topic with no errors, but it never receives any of the published messages. I have 2 separate client things (client_device_1_1, client_device_1_2) one does pub, one does sub over LocalMqtt with no issues. My custom component does log that it subscribed to the topic ok, it just never gets any messages. I'm out of ideas. Any help appreciated.

deployment json: { "components": { "aws.greengrass.Nucleus": { "componentVersion": "2.13.0" }, "aws.greengrass.clientdevices.IPDetector": { "componentVersion": "2.2.0" }, "aws.greengrass.clientdevices.Auth": { "componentVersion": "2.5.1", "configurationUpdate": { "reset": [], "merge": "{"deviceGroups": {"formatVersion": "2021-03-05", "definitions": {"DeviceGroup": {"selectionRule": "thingName: ", "policyName": "ClientDevicePolicy"}}, "policies": {"ClientDevicePolicy": {"AllowConnect": {"statementDescription": "Allow client devices to connect.", "operations": ["mqtt:connect"], "resources": [""]}, "AllowPublish": {"statementDescription": "Allow client devices to publish to all topics.", "operations": ["mqtt:publish"], "resources": [""]}, "AllowSubscribe": {"statementDescription": "Allow client devices to subscribe to all topics.", "operations": ["mqtt:subscribe"], "resources": [""]}}}}}" } }, "aws.greengrass.clientdevices.mqtt.Moquette": { "componentVersion": "2.3.7" }, "aws.greengrass.clientdevices.mqtt.Bridge": { "componentVersion": "2.3.2", "configurationUpdate": { "reset": [], "merge": "{"mqttTopicMapping": {"LocalToLocalMapping": {"topic": "sensor_data", "source": "LocalMqtt", "target": "LocalMqtt"}}}" } }, "aws.greengrass.Cli": { "componentVersion": "2.13.0" }, "com.ecp.sensor_data.subscriber": { "componentVersion": "1.0.0" } }

recipe: { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.ecp.sensor_data.subscriber", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.ecp.sensor_data.subscriber:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "", "run": "python3 /greengrass/v2/shared_data/docker_code/lambdas/sensor_data_subscriber/index.py" } } ] } this is my component: import sys import time import traceback import os import json import logging from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError, QOS )

Set up logging

LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper() logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO), format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(name)

def main():

topic = os.getenv("MQTT_TOPIC")

try:
    logger.info(f"Connecting to Greengrass IPC and subscribing to topic: {topic}...")
    ipc_client = GreengrassCoreIPCClientV2()
    _, operation = ipc_client.subscribe_to_topic(
        topic=topic,
        on_stream_event=on_stream_event,
        on_stream_error=on_stream_error,
        on_stream_closed=on_stream_closed
    )
    logger.info(f"Successfully subscribed to topic: {topic}")

    # Keep the main thread alive
    try:
        while True:
            time.sleep(10)
    except KeyboardInterrupt:
        logger.info("Subscriber interrupted by user.")

    # Close the subscription stream
    operation.close()
except UnauthorizedError:
    logger.error(f"Unauthorized error while subscribing to topic: {topic}")
    traceback.print_exc()
    sys.exit(1)
except Exception as e:
    logger.error(f"Exception occurred while subscribing to topic {topic}: {e}")
    traceback.print_exc()
    sys.exit(1)

def on_stream_event(event: SubscriptionResponseMessage) -> None: try: # Decode the message payload message = event.binary_message.message.decode('utf-8') topic = event.binary_message.context.topic

    logger.info(f"Received new message on topic '{topic}': {message}")

    # Process the message
    process_message(message)
except Exception as e:
    logger.error(f"Error processing message: {e}")
    traceback.print_exc()

def on_stream_error(error: Exception) -> bool: logger.error(f"Received a stream error: {error}") traceback.print_exc() return False # Return True to close stream, False to keep stream open.

def on_stream_closed(): logger.info("Subscribe to topic stream closed.")

def process_message(message_str: str): try: # Parse the JSON message payload = json.loads(message_str)

    client_id = payload.get("client_id", "unknown_client")
    temperature = payload.get("temperature")
    humidity = payload.get("humidity")
    timestamp = payload.get("timestamp", time.strftime("%Y-%m-%dT%H:%M:%S"))

    # Define output directory from environment variable
    OUTPUT_DIR = os.getenv("OUTPUT_DIR")
    if not os.path.exists(OUTPUT_DIR):
        os.makedirs(OUTPUT_DIR)
        logger.debug(f"Created output directory at {OUTPUT_DIR}")

    # Create a unique file name based on client ID and current time
    file_name = f"{client_id}_{int(time.time())}.json"
    file_path = os.path.join(OUTPUT_DIR, file_name)

    # Log detailed payload information
    logger.info(f"Received payload for client ID: {client_id}")
    logger.debug(f"Payload: {json.dumps(payload, indent=2)}")
    logger.info(f"Saving data to {file_path}")

    # Save the payload to a JSON file
    with open(file_path, "w") as f:
        json.dump(payload, f)

    logger.info(f"Data successfully written to {file_path}")
except Exception as e:
    logger.error(f"Failed to process message: {e}")
    traceback.print_exc()

if name == 'main': logger.info(f"Starting the {'com.ecp.sensor_data.subscriber'} component...") main()

asked 2 years ago188 views
4 Answers
0

Hello rePost-User-6569438!

Really sorry about what you are going through.

Let's start by clarifying what you are trying to achieve:

  • 2 client devices that connect to a local MQTT Broker (The Moquette MQTT broker Greengrass component) - Client 1 does Pub, Client 2 does Sub
  • A custom component that collects data published by client devices.

From what I read in your code, your custom component is NOT subscribing to the local MQTT broker topics but rather to Greengrass Core IPC topics (which also works in a pub/sub mode but is NOT your local MQTT broker). Greengrass Core IPC function is to allow internal communication between Greengrass components.

In your case, you might want to leverage the MQTT Bridge component to forward messages from local MQTT plane (where client device are connecting) to local IPC plane (where your Custom component currently connects).

All you need to do is to update your "LocalToLocalMapping" topic mapping source to "LocalMqtt" and destination to "Pubsub", and ensure that the mapping is properly set (See here)

You should be able to receive messages after that. Let me know how that worked for you.

AWS
EXPERT
answered 2 years ago
  • I wasn't sure how to use this forum, and did not have enough space to put my response in this comment, so posted it in the answers.

  • see latest response at the bottom.

0

Thanks for the response. I spent some time on this and still see the same results. I updated my recipe and my deployment components json, provided below. Everything deploys fine and this message show up as expected: Successfully subscribed to topic: sensor_data. {scriptName=services.com.ecp.sensor_data.subscriber.lifecycle.run, serviceName=com.ecp.sensor_data.subscriber, currentState=RUNNING}

using this it works and writes the log file as expected: /greengrass/v2/bin/greengrass-cli pubsub pub --topic 'sensor_data' --message '{"test": "Test here...."}'

but still no love from the client things, they still communicate with each-other, but not my component: { "components": { "aws.greengrass.Nucleus": { "componentVersion": "2.13.0" }, "aws.greengrass.clientdevices.IPDetector": { "componentVersion": "2.2.0" }, "aws.greengrass.clientdevices.Auth": { "componentVersion": "2.5.1", "configurationUpdate": { "merge": "{\"deviceGroups\":{\"formatVersion\":\"2021-03-05\",\"definitions\":{\"DeviceGroup\":{\"selectionRule\":\"thingName: *\",\"policyName\":\"ClientDevicePolicy\"}},\"policies\":{\"ClientDevicePolicy\":{\"AllowConnect\":{\"statementDescription\":\"Allow client devices to connect.\",\"operations\":[\"mqtt:connect\"],\"resources\":[\"*\"]},\"AllowPublish\":{\"statementDescription\":\"Allow client devices to publish to all topics.\",\"operations\":[\"mqtt:publish\"],\"resources\":[\"*\"]},\"AllowSubscribe\":{\"statementDescription\":\"Allow client devices to subscribe to all topics.\",\"operations\":[\"mqtt:subscribe\"],\"resources\":[\"*\"]}}}}}" } }, "aws.greengrass.clientdevices.mqtt.Moquette": { "componentVersion": "2.3.7" }, "aws.greengrass.clientdevices.mqtt.Bridge": { "componentVersion": "2.3.2", "configurationUpdate": { "merge": "{\"mqttTopicMapping\":{\"LocalToPubsubMapping\":{\"topic\":\"#\",\"source\":\"LocalMqtt\",\"target\":\"Pubsub\"}},\"logLevel\":\"DEBUG\",\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"aws.greengrass.clientdevices.mqtt.Bridge\":{\"policyDescription\":\"Allows bridging messages to PubSub.\"}}}}" } }, "aws.greengrass.Cli": { "componentVersion": "2.13.0" }, "com.ecp.sensor_data.subscriber": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\": {\"aws.greengrass.ipc.pubsub\": {\"com.ecp.sensor_data.subscriber\": {\"policyDescription\": \"Allows subscribe on sensor_data topic.\", \"operations\": [\"aws.greengrass#SubscribeToTopic\"], \"resources\": [\"sensor_data\"]}}}}" } } } }

and recipe: { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.ecp.sensor_data.subscriber", "ComponentVersion": "1.0.0", "ComponentType": "aws.greengrass.generic", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "", "run": "python3 /greengrass/v2/shared_data/docker_code/lambdas/sensor_data_subscriber/index.py" }, "Artifacts": [] } ], "Lifecycle": {} }

answered 2 years ago
  • Thanks for the effort! I think the remaining bit that needs to be removed is the Access control on the Bridge component. It is not needed. Try this first for your MQTT Bridge update { "mqttTopicMapping": {

    "LocalToPubsubMapping": {
      "topic": "sensor_data",
      "source": "LocalMqtt",
      "target": "Pubsub"
    }
    

    } }

    Then try with the wildcard topic #.

0

I have gotten a little further now. my messages are no longer publishing to iot core which is what I wanted. Here is my latest deployment config json

{
    "aws.greengrass.Cli": {
        "componentVersion": "2.13.0"
    },
    "aws.greengrass.Nucleus": {
        "componentVersion": "2.13.0"
    },
    "aws.greengrass.clientdevices.Auth": {
        "componentVersion": "2.5.1",
        "configurationUpdate": {
            "reset": ["/deviceGroups"],
            "merge": {
                "deviceGroups": {
                    "formatVersion": "2021-03-05",
                    "definitions": {
                        "AllDevicesGroup": {
                            "selectionRule": "thingName:*",
                            "policyName": "AllowAllPolicy"
                        }
                    },
                    "policies": {
                        "AllowAllPolicy": {
                            "AllowConnect": {
                                "statementDescription": "Allow all client devices to connect.",
                                "operations": [
                                    "mqtt:connect"
                                ],
                                "resources": [
                                    "*"
                                ]
                            },
                            "AllowPublish": {
                                "statementDescription": "Allow all client devices to publish to any topic.",
                                "operations": [
                                    "mqtt:publish"
                                ],
                                "resources": [
                                    "*"
                                ]
                            },
                            "AllowSubscribe": {
                                "statementDescription": "Allow all client devices to subscribe to any topic.",
                                "operations": [
                                    "mqtt:subscribe"
                                ],
                                "resources": [
                                    "*"
                                ]
                            }
                        }
                    }
                }
            }
        }
    },
    "aws.greengrass.clientdevices.IPDetector": {
        "componentVersion": "2.2.0"
    },
    "aws.greengrass.clientdevices.mqtt.Moquette": {
        "componentVersion": "2.3.7",
        "configurationUpdate": {
            "reset": ["/accessControl"],
            "merge": {
                "bindAddress": "0.0.0.0",
                "port": 8883,
                "logLevel": "DEBUG"
            }
        }
    },
    "aws.greengrass.clientdevices.mqtt.Bridge": {
        "componentVersion": "2.3.2",
        "configurationUpdate": {
            "reset": ["/mqttTopicMapping"],
            "merge": {
                "mqttTopicMapping": {
                    "LocalToPubsubMapping": {
                        "topic": "#",
                        "source": "LocalMqtt",
                        "target": "Pubsub"
                    },
                    "PubsubToLocalMapping": {
                        "topic": "#",
                        "source": "Pubsub",
                        "target": "LocalMqtt"
                    }
                }
            }
        }
    },
    "com.ecp.topic_subscriber_sensor_data": {
        "componentVersion": "1.0.0",
        "configurationUpdate": {
            "reset": ["/lambdaExecutionParameters"],
            "merge": {
                "lambdaExecutionParameters": {
                    "pinned": true,
                    "maxIdleTimeInSeconds": 60,
                    "timeoutInSeconds": 3,
                    "maxInstancesCount": 100,
                    "maxQueueSize": 1000,
                    "statusTimeoutInSeconds": 60,
                    "inputPayloadEncodingType": "json",
                    "eventSources": [
                        {
                            "type": "PUBSUB",
                            "topic": "sensor_data"
                        }
                    ],
                    "environmentVariables": {},
                    "linuxProcessParams": {
                        "isolationMode": "NoContainer",
                        "containerParams": {
                            "memorySizeInKB": 65536,
                            "mountROSysfs": false,
                            "volumes": [],
                            "devices": []
                        }
                    },
                    "execArgs": [
                        "python3.11",
                        "-u",
                        "/greengrass/v2/runtime/python/lambda_runtime.py",
                        "--handler=index.handler"
                    ]
                }
            }
        }
    }
}

and i don't see any errors in my greengrass.log file after deployment. here is a new version of the publisher code that discover the ip then uses that.


import sys
import os
import json
import time
import random
import logging
from datetime import datetime

from awscrt import io, mqtt
from awsiot import mqtt_connection_builder, greengrass_discovery

# Configure logging with a consistent format
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# AWS IoT Core and Greengrass configuration
aws_region = "us-east-1"
thing_name = "gg_client_device_1_1"

# Certificate file paths (Ensure these paths are correct based on your environment)
root_ca_path = "/greengrass/v2/AmazonRootCA1.pem"  # Path to Amazon Root CA
cert_path = "./certs/certificate.pem.crt"          # Path to client certificate
key_path = "./certs/private.pem.key"               # Path to client private key

# Check if certificate files exist
for path in [root_ca_path, cert_path, key_path]:
    if not os.path.isfile(path):
        logging.error(f"File not found: {path}")
        sys.exit(1)

# Initialize the event loop group, host resolver, and client bootstrap
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
socket_options = io.SocketOptions()

# Create a TLS context options with the root CA
tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(cert_path, key_path)
tls_ctx_options.override_default_trust_store_from_path(None, root_ca_path)
tls_context = io.ClientTlsContext(tls_ctx_options)

# Perform Greengrass discovery to get the connectivity info
def perform_greengrass_discovery():
    # Create the Discovery Client
    gg_discovery_client = greengrass_discovery.DiscoveryClient(
        bootstrap=client_bootstrap,
        socket_options=socket_options,
        tls_context=tls_context,
        region=aws_region
    )

    logging.info("Performing Greengrass discovery...")

    try:
        # Perform discovery
        discover_future = gg_discovery_client.discover(thing_name)
        response = discover_future.result()
        logging.info("Greengrass discovery successful.")
        return response
    except Exception as e:
        logging.error(f"Failed to perform Greengrass discovery: {e}")
        sys.exit(1)

# Callback when connection is accidentally lost
def on_connection_interrupted(connection, error, **kwargs):
    logging.error(f"Connection interrupted. Error: {error}")

# Callback when an interrupted connection is re-established
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    logging.info(f"Connection resumed. Return code: {return_code}, Session present: {session_present}")

# Connect to Greengrass Core
def connect_to_greengrass_core(response):
    global mqtt_connection
    for gg_group in response.gg_groups:
        ca_list = gg_group.certificate_authorities
        cores = gg_group.cores

        # Use the CA certificate from the group
        ca = ca_list[0]
        ca_bytes = ca.encode('utf-8')

        # Iterate through the connectivity info to find a valid endpoint
        for core in cores:
            connectivity_info_list = core.connectivity
            for conn_info in connectivity_info_list:
                # Debugging: Print all attributes of conn_info
                logging.info("---- ConnectivityInfo Attributes ----")
                attributes = {}
                for attribute in dir(conn_info):
                    if not attribute.startswith('_') and not callable(getattr(conn_info, attribute)):
                        value = getattr(conn_info, attribute)
                        attributes[attribute] = value
                        logging.info(f"{attribute}: {value}")
                logging.info("--------------------------------------")

                # Determine host and port based on available attributes
                host = getattr(conn_info, 'host_address', None) or getattr(conn_info, 'host', None) or getattr(conn_info, 'address', None)
                port = getattr(conn_info, 'port_number', None) or getattr(conn_info, 'port', None) or getattr(conn_info, 'portNumber', None)

                if host is None or port is None:
                    logging.warning("Host address or port number not found in ConnectivityInfo. Skipping...")
                    continue

                logging.info(f"Attempting to connect to Greengrass Core at {host}:{port}...")
                try:
                    mqtt_connection = mqtt_connection_builder.mtls_from_path(
                        endpoint=host,
                        port=port,
                        cert_filepath=cert_path,
                        pri_key_filepath=key_path,
                        ca_bytes=ca_bytes,  # Pass the CA bytes directly
                        client_bootstrap=client_bootstrap,
                        on_connection_interrupted=on_connection_interrupted,
                        on_connection_resumed=on_connection_resumed,
                        client_id=thing_name,
                        clean_session=False,
                        keep_alive_secs=30
                    )

                    # Connect
                    connect_future = mqtt_connection.connect()
                    connect_future.result()
                    logging.info(f"Connected to Greengrass Core at {host}:{port}")
                    return
                except Exception as e:
                    logging.error(f"Failed to connect to {host}:{port}: {e}")
                    continue
    logging.error("Failed to connect to any Greengrass Core endpoint.")
    sys.exit(1)

# Publish sensor data
def publish_sensor_data():
    try:
        while True:
            sensor_data = {
                "client_id": thing_name,
                "temperature": round(random.uniform(20.0, 25.0), 2),
                "humidity": round(random.uniform(30.0, 50.0), 2),
                "timestamp": datetime.now().isoformat()
            }
            message_json = json.dumps(sensor_data)
            mqtt_connection.publish(
                topic="sensor_data",
                payload=message_json,
                qos=mqtt.QoS.AT_LEAST_ONCE
            )
            logging.info(f"Published message to topic 'sensor_data': {message_json}")
            time.sleep(5)
    except KeyboardInterrupt:
        logging.info("Publishing stopped by user.")
    finally:
        # Disconnect
        disconnect_future = mqtt_connection.disconnect()
        disconnect_future.result()
        logging.info("Disconnected from Greengrass Core.")

if __name__ == "__main__":
    # Perform Greengrass discovery
    discovery_response = perform_greengrass_discovery()
    # Connect to Greengrass Core
    connect_to_greengrass_core(discovery_response)
    # Publish sensor data
    publish_sensor_data()

when i run the publisher it prints this which I think is good:

python3 pub.py
2024-11-19 10:59:27,548 - INFO - Performing Greengrass discovery...
2024-11-19 10:59:28,459 - INFO - Greengrass discovery successful.
2024-11-19 10:59:28,459 - INFO - ---- ConnectivityInfo Attributes ----
2024-11-19 10:59:28,459 - INFO - host_address: 172.28.0.2
2024-11-19 10:59:28,459 - INFO - id: 172.28.0.2
2024-11-19 10:59:28,459 - INFO - metadata: 
2024-11-19 10:59:28,459 - INFO - port: 8883
2024-11-19 10:59:28,459 - INFO - --------------------------------------
2024-11-19 10:59:28,459 - INFO - Attempting to connect to Greengrass Core at 172.28.0.2:8883...
2024-11-19 10:59:29,332 - INFO - Connected to Greengrass Core at 172.28.0.2:8883
2024-11-19 10:59:29,332 - INFO - Published message to topic 'sensor_data': {"client_id": "gg_client_device_1_1", "temperature": 21.19, "humidity": 36.74, "timestamp": "2024-11-19T10:59:29.332103"}

my sub.py does things similar and when running it, it also receives these messages as expect. So I am no longer using iot core, but my lambda is not getting these messages, is any final changes needed in my config file?

answered 2 years ago
0

Just want to give a final update I finally got this working with lot's of pain and frustration, but all I care is it's working. My ultimiate goal was to disconnect internet and it all still works (for mfg machines that are offline). For those that need help, I will follow up with a blog post later this week and update this. Essentially although this is pretty general and not specific to the greengrass setup, here is the config for the lambda function:

{
  "lambdaExecutionParameters": {
    "EnvironmentVariables": {}
  },
  "containerParams": {
    "memorySize": 16384,
    "mountROSysfs": false,
    "volumes": {},
    "devices": {}
  },
  "containerMode": "NoContainer",
  "timeoutInSeconds": 15,
  "maxInstancesCount": 100,
  "inputPayloadEncodingType": "json",
  "maxQueueSize": 1000,
  "pinned": true,
  "maxIdleTimeInSeconds": 60,
  "statusTimeoutInSeconds": 60,
  "pubsubTopics": {
    "0": {
      "topic": "#",
      "type": "PUB_SUB"
    }
  }
}

my deployment config (for doing a greengrass deployment (This is really where most of my pain-points were, getting it correct):

{
    "aws.greengrass.Cli": {
        "componentVersion": "2.13.0"
    },
    "aws.greengrass.Nucleus": {
        "componentVersion": "2.13.0"
    },
    "aws.greengrass.clientdevices.Auth": {
        "componentVersion": "2.5.1",
        "configurationUpdate": {
            "reset": ["/deviceGroups"],
            "merge": {
                "deviceGroups": {
                    "formatVersion": "2021-03-05",
                    "definitions": {
                        "AllDevicesGroup": {
                            "selectionRule": "thingName:*",
                            "policyName": "AllowAllPolicy"
                        }
                    },
                    "policies": {
                        "AllowAllPolicy": {
                            "AllowConnect": {
                                "statementDescription": "Allow all client devices to connect.",
                                "operations": [
                                    "mqtt:connect"
                                ],
                                "resources": [
                                    "*"
                                ]
                            },
                            "AllowPublish": {
                                "statementDescription": "Allow all client devices to publish to any topic.",
                                "operations": [
                                    "mqtt:publish"
                                ],
                                "resources": [
                                    "*"
                                ]
                            },
                            "AllowSubscribe": {
                                "statementDescription": "Allow all client devices to subscribe to any topic.",
                                "operations": [
                                    "mqtt:subscribe"
                                ],
                                "resources": [
                                    "*"
                                ]
                            }
                        }
                    }
                }
            }
        }
    },
    "aws.greengrass.clientdevices.IPDetector": {
        "componentVersion": "2.2.0"
    },
    "aws.greengrass.clientdevices.mqtt.Moquette": {
        "componentVersion": "2.3.7",
        "configurationUpdate": {
            "reset": ["/accessControl"],
            "merge": {
                "bindAddress": "0.0.0.0",
                "port": 8883,
                "logLevel": "DEBUG"
            }
        }
    },
    "aws.greengrass.clientdevices.mqtt.Bridge": {
        "componentVersion": "2.3.2",
        "configurationUpdate": {
            "reset": ["/mqttTopicMapping"],
            "merge": {
                "mqttTopicMapping": {
                    "LocalToPubsubMapping": {
                        "topic": "#",
                        "source": "LocalMqtt",
                        "target": "Pubsub"
                    },
                    "PubsubToLocalMapping": {
                        "topic": "#",
                        "source": "Pubsub",
                        "target": "LocalMqtt"
                    }
                }
            }
        }
    },
    "com.ecp.topic_subscriber_sensor_data": {
        "componentVersion": "1.0.0",
        "configurationUpdate": {
            "reset": ["/lambdaExecutionParameters"],
            "merge": {
                "DefaultConfiguration": {
                  "accessControl": {
                    "aws.greengrass.ipc.pubsub": {
                      "com.ecp.topic_subscriber_sensor_data": {
                        "policyDescription": "Allows access to subscribe to all topics.",
                        "operations": [
                          "aws.greengrass#SubscribeToTopic"
                        ],
                        "resources": [
                          "*"
                        ]
                      }
                    }
                  }
                },
                "lambdaExecutionParameters": {
                    "pinned": true,
                    "maxIdleTimeInSeconds": 60,
                    "timeoutInSeconds": 3,
                    "maxInstancesCount": 100,
                    "maxQueueSize": 1000,
                    "statusTimeoutInSeconds": 60,
                    "inputPayloadEncodingType": "json",
                    "eventSources": [
                        {
                            "type": "PUB_SUB",
                            "topic": "sensor_data"
                        }
                    ],
                    "environmentVariables": {},
                    "linuxProcessParams": {
                        "isolationMode": "NoContainer",
                        "containerParams": {
                            "memorySizeInKB": 65536,
                            "mountROSysfs": false,
                            "volumes": [],
                            "devices": []
                        }
                    },
                    "execArgs": [
                        "python3.11",
                        "-u",
                        "/greengrass/v2/runtime/python/lambda_runtime.py",
                        "--handler=index.handler"
                    ]
                }
            }
        }
    }
}

The lambda function itself is no different than any other lambda, it just receives the payload. When you create a new component just go into the web console greegrass/components, create a new one, and select the lambda option and find your lambbda, I am not using the docker container option, and pin set to true.

answered 2 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.