- Newest
- Most votes
- Most comments
Hello rePost-User-6569438!
Really sorry about what you are going through.
Let's start by clarifying what you are trying to achieve:
- 2 client devices that connect to a local MQTT Broker (The Moquette MQTT broker Greengrass component) - Client 1 does Pub, Client 2 does Sub
- A custom component that collects data published by client devices.
From what I read in your code, your custom component is NOT subscribing to the local MQTT broker topics but rather to Greengrass Core IPC topics (which also works in a pub/sub mode but is NOT your local MQTT broker). Greengrass Core IPC function is to allow internal communication between Greengrass components.
In your case, you might want to leverage the MQTT Bridge component to forward messages from local MQTT plane (where client device are connecting) to local IPC plane (where your Custom component currently connects).
All you need to do is to update your "LocalToLocalMapping" topic mapping source to "LocalMqtt" and destination to "Pubsub", and ensure that the mapping is properly set (See here)
You should be able to receive messages after that. Let me know how that worked for you.
Thanks for the response. I spent some time on this and still see the same results. I updated my recipe and my deployment components json, provided below. Everything deploys fine and this message show up as expected: Successfully subscribed to topic: sensor_data. {scriptName=services.com.ecp.sensor_data.subscriber.lifecycle.run, serviceName=com.ecp.sensor_data.subscriber, currentState=RUNNING}
using this it works and writes the log file as expected: /greengrass/v2/bin/greengrass-cli pubsub pub --topic 'sensor_data' --message '{"test": "Test here...."}'
but still no love from the client things, they still communicate with each-other, but not my component:
{ "components": { "aws.greengrass.Nucleus": { "componentVersion": "2.13.0" }, "aws.greengrass.clientdevices.IPDetector": { "componentVersion": "2.2.0" }, "aws.greengrass.clientdevices.Auth": { "componentVersion": "2.5.1", "configurationUpdate": { "merge": "{\"deviceGroups\":{\"formatVersion\":\"2021-03-05\",\"definitions\":{\"DeviceGroup\":{\"selectionRule\":\"thingName: *\",\"policyName\":\"ClientDevicePolicy\"}},\"policies\":{\"ClientDevicePolicy\":{\"AllowConnect\":{\"statementDescription\":\"Allow client devices to connect.\",\"operations\":[\"mqtt:connect\"],\"resources\":[\"*\"]},\"AllowPublish\":{\"statementDescription\":\"Allow client devices to publish to all topics.\",\"operations\":[\"mqtt:publish\"],\"resources\":[\"*\"]},\"AllowSubscribe\":{\"statementDescription\":\"Allow client devices to subscribe to all topics.\",\"operations\":[\"mqtt:subscribe\"],\"resources\":[\"*\"]}}}}}" } }, "aws.greengrass.clientdevices.mqtt.Moquette": { "componentVersion": "2.3.7" }, "aws.greengrass.clientdevices.mqtt.Bridge": { "componentVersion": "2.3.2", "configurationUpdate": { "merge": "{\"mqttTopicMapping\":{\"LocalToPubsubMapping\":{\"topic\":\"#\",\"source\":\"LocalMqtt\",\"target\":\"Pubsub\"}},\"logLevel\":\"DEBUG\",\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"aws.greengrass.clientdevices.mqtt.Bridge\":{\"policyDescription\":\"Allows bridging messages to PubSub.\"}}}}" } }, "aws.greengrass.Cli": { "componentVersion": "2.13.0" }, "com.ecp.sensor_data.subscriber": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\": {\"aws.greengrass.ipc.pubsub\": {\"com.ecp.sensor_data.subscriber\": {\"policyDescription\": \"Allows subscribe on sensor_data topic.\", \"operations\": [\"aws.greengrass#SubscribeToTopic\"], \"resources\": [\"sensor_data\"]}}}}" } } } }
and recipe:
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.ecp.sensor_data.subscriber", "ComponentVersion": "1.0.0", "ComponentType": "aws.greengrass.generic", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "", "run": "python3 /greengrass/v2/shared_data/docker_code/lambdas/sensor_data_subscriber/index.py" }, "Artifacts": [] } ], "Lifecycle": {} }
Thanks for the effort! I think the remaining bit that needs to be removed is the Access control on the Bridge component. It is not needed. Try this first for your MQTT Bridge update { "mqttTopicMapping": {
"LocalToPubsubMapping": { "topic": "sensor_data", "source": "LocalMqtt", "target": "Pubsub" }} }
Then try with the wildcard topic #.
I have gotten a little further now. my messages are no longer publishing to iot core which is what I wanted. Here is my latest deployment config json
{
"aws.greengrass.Cli": {
"componentVersion": "2.13.0"
},
"aws.greengrass.Nucleus": {
"componentVersion": "2.13.0"
},
"aws.greengrass.clientdevices.Auth": {
"componentVersion": "2.5.1",
"configurationUpdate": {
"reset": ["/deviceGroups"],
"merge": {
"deviceGroups": {
"formatVersion": "2021-03-05",
"definitions": {
"AllDevicesGroup": {
"selectionRule": "thingName:*",
"policyName": "AllowAllPolicy"
}
},
"policies": {
"AllowAllPolicy": {
"AllowConnect": {
"statementDescription": "Allow all client devices to connect.",
"operations": [
"mqtt:connect"
],
"resources": [
"*"
]
},
"AllowPublish": {
"statementDescription": "Allow all client devices to publish to any topic.",
"operations": [
"mqtt:publish"
],
"resources": [
"*"
]
},
"AllowSubscribe": {
"statementDescription": "Allow all client devices to subscribe to any topic.",
"operations": [
"mqtt:subscribe"
],
"resources": [
"*"
]
}
}
}
}
}
}
},
"aws.greengrass.clientdevices.IPDetector": {
"componentVersion": "2.2.0"
},
"aws.greengrass.clientdevices.mqtt.Moquette": {
"componentVersion": "2.3.7",
"configurationUpdate": {
"reset": ["/accessControl"],
"merge": {
"bindAddress": "0.0.0.0",
"port": 8883,
"logLevel": "DEBUG"
}
}
},
"aws.greengrass.clientdevices.mqtt.Bridge": {
"componentVersion": "2.3.2",
"configurationUpdate": {
"reset": ["/mqttTopicMapping"],
"merge": {
"mqttTopicMapping": {
"LocalToPubsubMapping": {
"topic": "#",
"source": "LocalMqtt",
"target": "Pubsub"
},
"PubsubToLocalMapping": {
"topic": "#",
"source": "Pubsub",
"target": "LocalMqtt"
}
}
}
}
},
"com.ecp.topic_subscriber_sensor_data": {
"componentVersion": "1.0.0",
"configurationUpdate": {
"reset": ["/lambdaExecutionParameters"],
"merge": {
"lambdaExecutionParameters": {
"pinned": true,
"maxIdleTimeInSeconds": 60,
"timeoutInSeconds": 3,
"maxInstancesCount": 100,
"maxQueueSize": 1000,
"statusTimeoutInSeconds": 60,
"inputPayloadEncodingType": "json",
"eventSources": [
{
"type": "PUBSUB",
"topic": "sensor_data"
}
],
"environmentVariables": {},
"linuxProcessParams": {
"isolationMode": "NoContainer",
"containerParams": {
"memorySizeInKB": 65536,
"mountROSysfs": false,
"volumes": [],
"devices": []
}
},
"execArgs": [
"python3.11",
"-u",
"/greengrass/v2/runtime/python/lambda_runtime.py",
"--handler=index.handler"
]
}
}
}
}
}
and i don't see any errors in my greengrass.log file after deployment. here is a new version of the publisher code that discover the ip then uses that.
import sys
import os
import json
import time
import random
import logging
from datetime import datetime
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder, greengrass_discovery
# Configure logging with a consistent format
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# AWS IoT Core and Greengrass configuration
aws_region = "us-east-1"
thing_name = "gg_client_device_1_1"
# Certificate file paths (Ensure these paths are correct based on your environment)
root_ca_path = "/greengrass/v2/AmazonRootCA1.pem" # Path to Amazon Root CA
cert_path = "./certs/certificate.pem.crt" # Path to client certificate
key_path = "./certs/private.pem.key" # Path to client private key
# Check if certificate files exist
for path in [root_ca_path, cert_path, key_path]:
if not os.path.isfile(path):
logging.error(f"File not found: {path}")
sys.exit(1)
# Initialize the event loop group, host resolver, and client bootstrap
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
socket_options = io.SocketOptions()
# Create a TLS context options with the root CA
tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path(cert_path, key_path)
tls_ctx_options.override_default_trust_store_from_path(None, root_ca_path)
tls_context = io.ClientTlsContext(tls_ctx_options)
# Perform Greengrass discovery to get the connectivity info
def perform_greengrass_discovery():
# Create the Discovery Client
gg_discovery_client = greengrass_discovery.DiscoveryClient(
bootstrap=client_bootstrap,
socket_options=socket_options,
tls_context=tls_context,
region=aws_region
)
logging.info("Performing Greengrass discovery...")
try:
# Perform discovery
discover_future = gg_discovery_client.discover(thing_name)
response = discover_future.result()
logging.info("Greengrass discovery successful.")
return response
except Exception as e:
logging.error(f"Failed to perform Greengrass discovery: {e}")
sys.exit(1)
# Callback when connection is accidentally lost
def on_connection_interrupted(connection, error, **kwargs):
logging.error(f"Connection interrupted. Error: {error}")
# Callback when an interrupted connection is re-established
def on_connection_resumed(connection, return_code, session_present, **kwargs):
logging.info(f"Connection resumed. Return code: {return_code}, Session present: {session_present}")
# Connect to Greengrass Core
def connect_to_greengrass_core(response):
global mqtt_connection
for gg_group in response.gg_groups:
ca_list = gg_group.certificate_authorities
cores = gg_group.cores
# Use the CA certificate from the group
ca = ca_list[0]
ca_bytes = ca.encode('utf-8')
# Iterate through the connectivity info to find a valid endpoint
for core in cores:
connectivity_info_list = core.connectivity
for conn_info in connectivity_info_list:
# Debugging: Print all attributes of conn_info
logging.info("---- ConnectivityInfo Attributes ----")
attributes = {}
for attribute in dir(conn_info):
if not attribute.startswith('_') and not callable(getattr(conn_info, attribute)):
value = getattr(conn_info, attribute)
attributes[attribute] = value
logging.info(f"{attribute}: {value}")
logging.info("--------------------------------------")
# Determine host and port based on available attributes
host = getattr(conn_info, 'host_address', None) or getattr(conn_info, 'host', None) or getattr(conn_info, 'address', None)
port = getattr(conn_info, 'port_number', None) or getattr(conn_info, 'port', None) or getattr(conn_info, 'portNumber', None)
if host is None or port is None:
logging.warning("Host address or port number not found in ConnectivityInfo. Skipping...")
continue
logging.info(f"Attempting to connect to Greengrass Core at {host}:{port}...")
try:
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=host,
port=port,
cert_filepath=cert_path,
pri_key_filepath=key_path,
ca_bytes=ca_bytes, # Pass the CA bytes directly
client_bootstrap=client_bootstrap,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=thing_name,
clean_session=False,
keep_alive_secs=30
)
# Connect
connect_future = mqtt_connection.connect()
connect_future.result()
logging.info(f"Connected to Greengrass Core at {host}:{port}")
return
except Exception as e:
logging.error(f"Failed to connect to {host}:{port}: {e}")
continue
logging.error("Failed to connect to any Greengrass Core endpoint.")
sys.exit(1)
# Publish sensor data
def publish_sensor_data():
try:
while True:
sensor_data = {
"client_id": thing_name,
"temperature": round(random.uniform(20.0, 25.0), 2),
"humidity": round(random.uniform(30.0, 50.0), 2),
"timestamp": datetime.now().isoformat()
}
message_json = json.dumps(sensor_data)
mqtt_connection.publish(
topic="sensor_data",
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE
)
logging.info(f"Published message to topic 'sensor_data': {message_json}")
time.sleep(5)
except KeyboardInterrupt:
logging.info("Publishing stopped by user.")
finally:
# Disconnect
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
logging.info("Disconnected from Greengrass Core.")
if __name__ == "__main__":
# Perform Greengrass discovery
discovery_response = perform_greengrass_discovery()
# Connect to Greengrass Core
connect_to_greengrass_core(discovery_response)
# Publish sensor data
publish_sensor_data()
when i run the publisher it prints this which I think is good:
python3 pub.py
2024-11-19 10:59:27,548 - INFO - Performing Greengrass discovery...
2024-11-19 10:59:28,459 - INFO - Greengrass discovery successful.
2024-11-19 10:59:28,459 - INFO - ---- ConnectivityInfo Attributes ----
2024-11-19 10:59:28,459 - INFO - host_address: 172.28.0.2
2024-11-19 10:59:28,459 - INFO - id: 172.28.0.2
2024-11-19 10:59:28,459 - INFO - metadata:
2024-11-19 10:59:28,459 - INFO - port: 8883
2024-11-19 10:59:28,459 - INFO - --------------------------------------
2024-11-19 10:59:28,459 - INFO - Attempting to connect to Greengrass Core at 172.28.0.2:8883...
2024-11-19 10:59:29,332 - INFO - Connected to Greengrass Core at 172.28.0.2:8883
2024-11-19 10:59:29,332 - INFO - Published message to topic 'sensor_data': {"client_id": "gg_client_device_1_1", "temperature": 21.19, "humidity": 36.74, "timestamp": "2024-11-19T10:59:29.332103"}
my sub.py does things similar and when running it, it also receives these messages as expect. So I am no longer using iot core, but my lambda is not getting these messages, is any final changes needed in my config file?
Just want to give a final update I finally got this working with lot's of pain and frustration, but all I care is it's working. My ultimiate goal was to disconnect internet and it all still works (for mfg machines that are offline). For those that need help, I will follow up with a blog post later this week and update this. Essentially although this is pretty general and not specific to the greengrass setup, here is the config for the lambda function:
{
"lambdaExecutionParameters": {
"EnvironmentVariables": {}
},
"containerParams": {
"memorySize": 16384,
"mountROSysfs": false,
"volumes": {},
"devices": {}
},
"containerMode": "NoContainer",
"timeoutInSeconds": 15,
"maxInstancesCount": 100,
"inputPayloadEncodingType": "json",
"maxQueueSize": 1000,
"pinned": true,
"maxIdleTimeInSeconds": 60,
"statusTimeoutInSeconds": 60,
"pubsubTopics": {
"0": {
"topic": "#",
"type": "PUB_SUB"
}
}
}
my deployment config (for doing a greengrass deployment (This is really where most of my pain-points were, getting it correct):
{
"aws.greengrass.Cli": {
"componentVersion": "2.13.0"
},
"aws.greengrass.Nucleus": {
"componentVersion": "2.13.0"
},
"aws.greengrass.clientdevices.Auth": {
"componentVersion": "2.5.1",
"configurationUpdate": {
"reset": ["/deviceGroups"],
"merge": {
"deviceGroups": {
"formatVersion": "2021-03-05",
"definitions": {
"AllDevicesGroup": {
"selectionRule": "thingName:*",
"policyName": "AllowAllPolicy"
}
},
"policies": {
"AllowAllPolicy": {
"AllowConnect": {
"statementDescription": "Allow all client devices to connect.",
"operations": [
"mqtt:connect"
],
"resources": [
"*"
]
},
"AllowPublish": {
"statementDescription": "Allow all client devices to publish to any topic.",
"operations": [
"mqtt:publish"
],
"resources": [
"*"
]
},
"AllowSubscribe": {
"statementDescription": "Allow all client devices to subscribe to any topic.",
"operations": [
"mqtt:subscribe"
],
"resources": [
"*"
]
}
}
}
}
}
}
},
"aws.greengrass.clientdevices.IPDetector": {
"componentVersion": "2.2.0"
},
"aws.greengrass.clientdevices.mqtt.Moquette": {
"componentVersion": "2.3.7",
"configurationUpdate": {
"reset": ["/accessControl"],
"merge": {
"bindAddress": "0.0.0.0",
"port": 8883,
"logLevel": "DEBUG"
}
}
},
"aws.greengrass.clientdevices.mqtt.Bridge": {
"componentVersion": "2.3.2",
"configurationUpdate": {
"reset": ["/mqttTopicMapping"],
"merge": {
"mqttTopicMapping": {
"LocalToPubsubMapping": {
"topic": "#",
"source": "LocalMqtt",
"target": "Pubsub"
},
"PubsubToLocalMapping": {
"topic": "#",
"source": "Pubsub",
"target": "LocalMqtt"
}
}
}
}
},
"com.ecp.topic_subscriber_sensor_data": {
"componentVersion": "1.0.0",
"configurationUpdate": {
"reset": ["/lambdaExecutionParameters"],
"merge": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.ecp.topic_subscriber_sensor_data": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToTopic"
],
"resources": [
"*"
]
}
}
}
},
"lambdaExecutionParameters": {
"pinned": true,
"maxIdleTimeInSeconds": 60,
"timeoutInSeconds": 3,
"maxInstancesCount": 100,
"maxQueueSize": 1000,
"statusTimeoutInSeconds": 60,
"inputPayloadEncodingType": "json",
"eventSources": [
{
"type": "PUB_SUB",
"topic": "sensor_data"
}
],
"environmentVariables": {},
"linuxProcessParams": {
"isolationMode": "NoContainer",
"containerParams": {
"memorySizeInKB": 65536,
"mountROSysfs": false,
"volumes": [],
"devices": []
}
},
"execArgs": [
"python3.11",
"-u",
"/greengrass/v2/runtime/python/lambda_runtime.py",
"--handler=index.handler"
]
}
}
}
}
}
The lambda function itself is no different than any other lambda, it just receives the payload. When you create a new component just go into the web console greegrass/components, create a new one, and select the lambda option and find your lambbda, I am not using the docker container option, and pin set to true.
Relevant content
- asked 3 years ago
- asked 2 years ago

I wasn't sure how to use this forum, and did not have enough space to put my response in this comment, so posted it in the answers.
see latest response at the bottom.