Publish MQTT local messages and read them from client as Zigbee2MQTT

0

I'm having trouble communicating with an end device connected to an IoT client. I'm sure it doesn't work for some configuration issue, I'll explain my scheme in case you can help me.

I have a greengrass device with a client configured with the following components and settings: aws.greengrass.clientdevices.Auth

{
	"deviceGroups": {
		"formatVersion": "2021-03-05",
		"definitions": {
			"MyDeviceGroup": {
				"selectionRule": "thingName: Cliente*",
				"policyName": "MyClientDevicePolicy"
			}
		},
		"policies": {
			"MyClientDevicePolicy": {
				"AllowConnect": {
					"statementDescription": "Allow client devices to connect.",
					"operations": [
						"mqtt:connect"
					],
					"resources": [
						"*"
					]
				},
				"AllowPublish": {
					"statementDescription": "Allow client devices to publish to all topics.",
					"operations": [
						"mqtt:publish"
					],
					"resources": [
						"*"
					]
				},
				"AllowSubscribe": {
					"statementDescription": "Allow client devices to subscribe to all topics.",
					"operations": [
						"mqtt:subscribe"
					],
					"resources": [
						"*"
					]
				}
			}
		}
	}
}

aws.greengrass.clientdevices.mqtt.Bridge

{
	"mqttTopicMapping": {
		"HelloWorldPubsubMapping": {
			"topic": "BlueMQTT/#",
			"source": "LocalMqtt",
			"target": "Pubsub"
		}
	}
}

aws.greengrass.clientdevices.mqtt.Moquette 2.3.3

Also I have a custom component who is suscribed to messages with topic "BlueMQTT/#" and relay them with other topic to AWS IoT using IPC.

The data is generated by a zigbee device that connects with moquette using Zigbee2MQTT (Z2M), Z2M is connected to moquette using certificates and allowed client_id. So far everything works fine. The zigbee device sends data, z2m captures it, sends it via mqtt to moquette, the custom component is subscribed to that topic, reads the message and sends it to AWS IoT. All perfect.

The problem comes when I have to interact with the zigbee device, some parameters can be configured and this is done through MQTT messages, in theory if I send the messages to the broker, Z2M should read them and send them to the zigbee device and that doesn't happen.

I am trying to send the MQTT messages in a different way, with a client like MQTT Explorer, with a custom python script and even using the basic_discovery.py script that comes in the examples. In all of them it seems that it sends the message well but the reality is that Z2M does not pick it up and therefore I cannot communicate with the zigbee device.

I don't know if the problem is authorizations, if the problem is that I always connect with the same client id, I don't know if I don't send the message correctly. I do not know what to do.

I have also tried replacing moquette with a borker mosquitto and everything works fine, there must be some configuration that is not right and that blocks those messages between moquette and z2m.

I hope I have been able to explain myself. Thanks for the help.

  • Do I understand correctly that you are able to send messages to the Zigbee device using Z2M and a Mosquitto broker, but you are not able to do the same when using Moquette, despite Z2M being connected and authorized? I also assume that your goal is to send messages from your component to Z2M via Moquette, in which case you should add further configuration to the Bridge, with "topic": "toZigbee/BlueMQTT/#", "source": "PubSub", "target": "LocalMqtt". Be careful to not create loops between source and target by using non overlapping topic spaces.

  • Thanks for your answer. It is just as you have explained. My question is how to do this configuration in the bridge without making loops. In "moquette" the BlueMQTT/# messages have to arrive but they are also the ones that I send to IoT. Any idea how I can do this?

  • Anyway, if I post directly from python or MQTT Explorer, shouldn't they get to Moquette? Apparently they arrive but where I don't see them is in the Z2M

  • to avoid loops you need to have different topic spaces. I do not know Z2M, but you might have something like BlueMQTT/toZigbee/# and BlueMQTT/fromZigbee/#

  • Regarding the posting of messages, can you verify if you have any error message in the /greengrass/v2/logs/greengrass.log? The Moquette component is a plugin hence it runs in the same JVM as Nucleus and therefore the log are in greengrass.log

cespar
asked 9 months ago374 views
2 Answers
0

What do you think about this log? The message I'm sending to moquette and doesn't reach z2m is BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set:


2023-08-01T19:05:13.093Z [DEBUG] (nioEventLoopGroup-3-3) io.moquette.broker.PostOffice: Routing cmd [PUB QoS0] for session [ClienteHT002] to event processor 0. {}
2023-08-01T19:05:13.093Z [DEBUG] (nioEventLoopGroup-3-3) io.moquette.broker.MQTTConnection: readCompleted client CId: ClienteHT002. {}
2023-08-01T19:05:13.093Z [DEBUG] (Session Executor 0) com.aws.greengrass.clientdevices.auth.DeviceAuthClient: Processing authorization request. {resource=mqtt:topic:BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set, action=mqtt:publish, sessionId=3ee4dd67-356d-4467-96b4-d9dfb6c3fb51}
2023-08-01T19:05:13.094Z [DEBUG] (Session Executor 0) com.aws.greengrass.clientdevices.auth.PermissionEvaluationUtils: Hit policy with permission Permission(principal=MyDeviceGroup, operation=mqtt:publish, resource=*). {}
2023-08-01T19:05:13.094Z [DEBUG] (Session Executor 0) com.aws.greengrass.clientdevices.auth.api.ClientDevicesAuthServiceApi: Successfully authorized client device action. {}
2023-08-01T19:05:13.094Z [DEBUG] (Session Executor 0) com.aws.greengrass.clientdevices.auth.api.DomainEvents: Invoking event handler. {event=AuthorizeClientDeviceActionEvent, eventHandler=AuthorizeClientDeviceActionsMetricHandler}
2023-08-01T19:05:13.094Z [DEBUG] (Session Executor 0) com.aws.greengrass.mqtt.moquette.ClientDeviceAuthorizer: MQTT publish request. {isAllowed=true, clientId=ClienteHT002, topic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set}
2023-08-01T19:05:13.094Z [DEBUG] (Session Executor 0) io.moquette.broker.PostOffice: Routing cmd [batched PUB] for session [ClienteHT002] to event processor 0. {}
2023-08-01T19:05:13.094Z [DEBUG] (Session Executor 0) io.moquette.broker.PostOffice: Sending PUBLISH message to active subscriber CId: ClienteHT002, topicFilter: BlueMQTT/00:23:24:7f:0d:f4/#, qos: AT_MOST_ONCE. {}
2023-08-01T19:05:13.094Z [DEBUG] (Session Executor 0) io.moquette.broker.MQTTConnection: Sending PUBLISH(AT_MOST_ONCE) message. MessageId=0, topic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set. {}
2023-08-01T19:05:13.094Z [DEBUG] (Session Executor 0) io.moquette.broker.MQTTConnection: OUT PUBLISH. {}
2023-08-01T19:05:13.094Z [DEBUG] (Session Executor 0) io.moquette.broker.PostOffice: Routing cmd [batched PUB] for session [mqtt-bridge-4u7l7d74scq] to event processor 2. {}
2023-08-01T19:05:13.094Z [DEBUG] (nioEventLoopGroup-3-3) io.moquette.broker.metrics.MQTTMessageLogger: C<-B PUBLISH <ClienteHT002> to topics <BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set>. {}
2023-08-01T19:05:13.094Z [DEBUG] (Session Executor 2) io.moquette.broker.PostOffice: Sending PUBLISH message to active subscriber CId: mqtt-bridge-4u7l7d74scq, topicFilter: BlueMQTT/#, qos: AT_MOST_ONCE. {}
2023-08-01T19:05:13.094Z [DEBUG] (Session Executor 2) io.moquette.broker.MQTTConnection: Sending PUBLISH(AT_MOST_ONCE) message. MessageId=0, topic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set. {}
2023-08-01T19:05:13.094Z [DEBUG] (Session Executor 2) io.moquette.broker.MQTTConnection: OUT PUBLISH. {}
2023-08-01T19:05:13.094Z [DEBUG] (nioEventLoopGroup-3-1) io.moquette.broker.metrics.MQTTMessageLogger: C<-B PUBLISH <mqtt-bridge-4u7l7d74scq> to topics <BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set>. {}
2023-08-01T19:05:13.094Z [DEBUG] (pool-8-thread-1) io.moquette.interception.BrokerInterceptor: Notifying MQTT PUBLISH message to interceptor. CId=ClienteHT002, messageId=-1, topic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set, interceptorId=ClientDeviceConnectionTerminationListener. {}
2023-08-01T19:05:13.094Z [DEBUG] (nioEventLoopGroup-3-3) io.moquette.broker.metrics.MQTTMessageLogger: C->B PUBLISH <ClienteHT002> to topics <BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set>. {}
2023-08-01T19:05:13.094Z [DEBUG] (nioEventLoopGroup-3-3) io.moquette.broker.MQTTConnection: Received MQTT message, type: PUBLISH. {}
2023-08-01T19:05:13.094Z [DEBUG] (MQTT Call: mqtt-bridge-4u7l7d74scq) com.aws.greengrass.mqtt.bridge.MessageBridge: Message received. {sourceTopic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set, source=LocalMqtt}
2023-08-01T19:05:13.094Z [DEBUG] (MQTT Call: mqtt-bridge-4u7l7d74scq) com.aws.greengrass.builtin.services.pubsub.PubSubIPCEventStreamAgent: Sending publish event for topic BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set. {componentName=aws.greengrass.clientdevices.mqtt.Bridge}
2023-08-01T19:05:13.094Z [DEBUG] (MQTT Call: mqtt-bridge-4u7l7d74scq) com.aws.greengrass.builtin.services.pubsub.PubSubIPCEventStreamAgent: Sending publish event for topic BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set. {componentName=aws.greengrass.clientdevices.mqtt.Bridge}
2023-08-01T19:05:13.094Z [DEBUG] (MQTT Call: mqtt-bridge-4u7l7d74scq) com.aws.greengrass.mqtt.bridge.MessageBridge: Published message. {sourceTopic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set, source=LocalMqtt, targetTopic=, target=Pubsub, resolvedTargetTopic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set}
2023-08-01T19:05:13.094Z [DEBUG] (nioEventLoopGroup-3-3) io.moquette.broker.PostOffice: Routing cmd [PUB QoS0] for session [ClienteHT002] to event processor 0. {}
2023-08-01T19:05:13.094Z [DEBUG] (nioEventLoopGroup-3-3) io.moquette.broker.MQTTConnection: readCompleted client CId: ClienteHT002. {}
2023-08-01T19:05:13.094Z [DEBUG] (Session Executor 0) com.aws.greengrass.clientdevices.auth.DeviceAuthClient: Processing authorization request. {resource=mqtt:topic:BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set, action=mqtt:publish, sessionId=3ee4dd67-356d-4467-96b4-d9dfb6c3fb51}
2023-08-01T19:05:13.094Z [DEBUG] (Session Executor 0) com.aws.greengrass.clientdevices.auth.PermissionEvaluationUtils: Hit policy with permission Permission(principal=MyDeviceGroup, operation=mqtt:publish, resource=*). {}
2023-08-01T19:05:13.095Z [DEBUG] (Session Executor 0) com.aws.greengrass.clientdevices.auth.api.ClientDevicesAuthServiceApi: Successfully authorized client device action. {}
2023-08-01T19:05:13.095Z [DEBUG] (Session Executor 0) com.aws.greengrass.clientdevices.auth.api.DomainEvents: Invoking event handler. {event=AuthorizeClientDeviceActionEvent, eventHandler=AuthorizeClientDeviceActionsMetricHandler}
2023-08-01T19:05:13.095Z [DEBUG] (Session Executor 0) com.aws.greengrass.mqtt.moquette.ClientDeviceAuthorizer: MQTT publish request. {isAllowed=true, clientId=ClienteHT002, topic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set}
2023-08-01T19:05:13.095Z [DEBUG] (Session Executor 0) io.moquette.broker.PostOffice: Routing cmd [batched PUB] for session [ClienteHT002] to event processor 0. {}
2023-08-01T19:05:13.095Z [DEBUG] (Session Executor 0) io.moquette.broker.PostOffice: Sending PUBLISH message to active subscriber CId: ClienteHT002, topicFilter: BlueMQTT/00:23:24:7f:0d:f4/#, qos: AT_MOST_ONCE. {}
2023-08-01T19:05:13.095Z [DEBUG] (Session Executor 0) io.moquette.broker.MQTTConnection: Sending PUBLISH(AT_MOST_ONCE) message. MessageId=0, topic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set. {}
2023-08-01T19:05:13.095Z [DEBUG] (Session Executor 0) io.moquette.broker.MQTTConnection: OUT PUBLISH. {}
2023-08-01T19:05:13.095Z [DEBUG] (Session Executor 0) io.moquette.broker.PostOffice: Routing cmd [batched PUB] for session [mqtt-bridge-4u7l7d74scq] to event processor 2. {}
2023-08-01T19:05:13.095Z [DEBUG] (Session Executor 2) io.moquette.broker.PostOffice: Sending PUBLISH message to active subscriber CId: mqtt-bridge-4u7l7d74scq, topicFilter: BlueMQTT/#, qos: AT_MOST_ONCE. {}
2023-08-01T19:05:13.095Z [DEBUG] (Session Executor 2) io.moquette.broker.MQTTConnection: Sending PUBLISH(AT_MOST_ONCE) message. MessageId=0, topic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set. {}
2023-08-01T19:05:13.095Z [DEBUG] (Session Executor 2) io.moquette.broker.MQTTConnection: OUT PUBLISH. {}
2023-08-01T19:05:13.095Z [DEBUG] (pool-8-thread-1) io.moquette.interception.BrokerInterceptor: Notifying MQTT PUBLISH message to interceptor. CId=ClienteHT002, messageId=-1, topic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set, interceptorId=ClientDeviceConnectionTerminationListener. {}
2023-08-01T19:05:13.095Z [INFO] (Thread-6) software.amazon.awssdk.eventstreamrpc.RpcServer: New connection code [AWS_ERROR_SUCCESS] for [Id 915, Class ServerConnection, Refs 1](2023-08-01T19:05:13.095597Z) - <null>. {}
2023-08-01T19:05:13.095Z [INFO] (Thread-6) software.amazon.awssdk.eventstreamrpc.ServiceOperationMappingContinuationHandler: aws.greengrass#GreengrassCoreIPC authenticated identity: eu.bluece.BlueCareMQTTMessage. {}
2023-08-01T19:05:13.095Z [INFO] (Thread-6) software.amazon.awssdk.eventstreamrpc.ServiceOperationMappingContinuationHandler: Connection accepted for eu.bluece.BlueCareMQTTMessage. {}
2023-08-01T19:05:13.095Z [INFO] (Thread-6) software.amazon.awssdk.eventstreamrpc.ServiceOperationMappingContinuationHandler: Sending connect response for eu.bluece.BlueCareMQTTMessage. {}
2023-08-01T19:05:13.095Z [DEBUG] (nioEventLoopGroup-3-1) io.moquette.broker.metrics.MQTTMessageLogger: C<-B PUBLISH <mqtt-bridge-4u7l7d74scq> to topics <BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set>. {}
2023-08-01T19:05:13.095Z [DEBUG] (nioEventLoopGroup-3-3) io.moquette.broker.metrics.MQTTMessageLogger: C<-B PUBLISH <ClienteHT002> to topics <BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set>. {}
2023-08-01T19:05:13.096Z [DEBUG] (Thread-6) software.amazon.awssdk.eventstreamrpc.OperationContinuationHandler: Continuation native id: 140694204474192. {}
2023-08-01T19:05:13.096Z [DEBUG] (MQTT Call: mqtt-bridge-4u7l7d74scq) com.aws.greengrass.mqtt.bridge.MessageBridge: Message received. {sourceTopic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set, source=LocalMqtt}
2023-08-01T19:05:13.096Z [DEBUG] (Thread-6) com.aws.greengrass.authorization.AuthorizationHandler: Hit policy with principal eu.bluece.BlueCareMQTTMessage, operation aws.greengrass#PublishToIoTCore, resource BlueCare/new/00:23:24:7f:0d:f4/0x0013a200421c82ff/set. {}
2023-08-01T19:05:13.096Z [DEBUG] (MQTT Call: mqtt-bridge-4u7l7d74scq) com.aws.greengrass.builtin.services.pubsub.PubSubIPCEventStreamAgent: Sending publish event for topic BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set. {componentName=aws.greengrass.clientdevices.mqtt.Bridge}
2023-08-01T19:05:13.096Z [DEBUG] (MQTT Call: mqtt-bridge-4u7l7d74scq) com.aws.greengrass.builtin.services.pubsub.PubSubIPCEventStreamAgent: Sending publish event for topic BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set. {componentName=aws.greengrass.clientdevices.mqtt.Bridge}
2023-08-01T19:05:13.096Z [DEBUG] (MQTT Call: mqtt-bridge-4u7l7d74scq) com.aws.greengrass.mqtt.bridge.MessageBridge: Published message. {sourceTopic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set, source=LocalMqtt, targetTopic=, target=Pubsub, resolvedTargetTopic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set}
2023-08-01T19:05:13.096Z [DEBUG] (Thread-6) software.amazon.awssdk.eventstreamrpc.OperationContinuationHandler: aws.greengrass#PublishToIoTCore stream continuation closed.. {}
2023-08-01T19:05:13.097Z [INFO] (Thread-6) software.amazon.awssdk.eventstreamrpc.RpcServer: New connection code [AWS_ERROR_SUCCESS] for [Id 917, Class ServerConnection, Refs 1](2023-08-01T19:05:13.097110Z) - <null>. {}
2023-08-01T19:05:13.097Z [INFO] (Thread-6) software.amazon.awssdk.eventstreamrpc.RpcServer: Server connection closed code [socket is closed.]: [Id 915, Class ServerConnection, Refs 1](2023-08-01T19:05:13.095597Z) - <null>. {}
2023-08-01T19:05:13.097Z [INFO] (Thread-6) software.amazon.awssdk.eventstreamrpc.ServiceOperationMappingContinuationHandler: aws.greengrass#GreengrassCoreIPC authenticated identity: eu.bluece.BlueCareMQTTMessage. {}
2023-08-01T19:05:13.097Z [INFO] (Thread-6) software.amazon.awssdk.eventstreamrpc.ServiceOperationMappingContinuationHandler: Connection accepted for eu.bluece.BlueCareMQTTMessage. {}
2023-08-01T19:05:13.097Z [INFO] (Thread-6) software.amazon.awssdk.eventstreamrpc.ServiceOperationMappingContinuationHandler: Sending connect response for eu.bluece.BlueCareMQTTMessage. {}
2023-08-01T19:05:13.098Z [DEBUG] (Thread-6) software.amazon.awssdk.eventstreamrpc.OperationContinuationHandler: Continuation native id: 140694204474192. {}
2023-08-01T19:05:13.098Z [DEBUG] (Thread-6) com.aws.greengrass.authorization.AuthorizationHandler: Hit policy with principal eu.bluece.BlueCareMQTTMessage, operation aws.greengrass#PublishToIoTCore, resource BlueCare/new/00:23:24:7f:0d:f4/0x0013a200421c82ff/set. {}

Do you see something wrong? I don't

cespar
answered 9 months ago
  • Where are you expecting the messages to go? The following log shows that the message was successfully published to Pubsub: 2023-08-01T19:05:13.096Z [DEBUG] (MQTT Call: mqtt-bridge-4u7l7d74scq) com.aws.greengrass.mqtt.bridge.MessageBridge: Published message. {sourceTopic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set, source=LocalMqtt, targetTopic=, target=Pubsub, resolvedTargetTopic=BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set}

  • I want the message to be published in the local mqtt moquette broker so that it can be read by Z2M. And that is not happening right now.

  • That is happening, as proved by the MQTT bridge being able to consume the message from the local mqtt broker.

    How are you checking to see if "Z2M" can get the message? Can you use a standard MQTT client to subscribe to the local broker?

    Where are these messages coming from exactly? The log shows a message is published to the topic BlueMQTT/00:23:24:7f:0d:f4/0x0013a200421c82ff/set over local MQTT.

  • I am aware that I have a bit of a mess between mqtt local and pubsub aws concept. The answer that has crossed with your comment the same clarifies the situation. Until now it was posting the "set" message from standard MQTT clients and python scripts. While it used the same client id as Z2M, I think Z2M was left out and didn't read it. Now with a new client there is no problem. If you can read the last answer, my next goal is to send that "set" message from a component.

0

I have news. Z2M connects to the MQTT broker as another client, for this reason it had created a client device called "ClientHT002" with its certificates that Z2M uses to connect to the broker.

As you know, only one client device with the same "client_id" can be connected. Well, in all the examples that I was testing with the same "client_id", what it did was that it disconnected Z2M at that moment and that's why it didn't collect the messages.

I have created a new client with other "client_id" and own certificates and I have tried to send messages to moquette and it receives them perfectly, it is great!.

Now I would like to make a component that would subscribe to the messages and with a filter publish the "set" to the mqtt broker. What would be the best approach to develop the component? Use the paho.mqtt library as a normal client or use something from grennegrass V2, ¿is there an example of publishing to the local mqtt directly??

Regards.

cespar
answered 9 months ago
  • To consume local MQTT messages in Greengrass components, the supported way to do this is using MQTT Bridge and local pubsub IPC to subscribe to the messages. Configure bridge to forward the relevant messages onto pubusb. In your component, use the IPC library to subscribe to local pubsub: https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-publish-subscribe.html#ipc-operation-subscribetotopic.

    Likewise for publishing, configure bridge to publish from local pubsub to local mqtt and use the IPC library to publish to the topic.

    You can use the LocalDebugConsole component to interact with local pubsub for debugging.

  • When we are talking about "local pubsub" is Pubsub in MQTT Bridge, isn' t it? There are more pubsub? Then.. I could use MQTT Bridge with this configuration: { "mqttTopicMapping": { "HelloWorldPubsubMapping": { "topic": "BlueMQTT/#", "source": "LocalMqtt", "target": "Pubsub" }, "PubSubToLocalMqtt": { "topic": "BlueMQTT/#", "source": "Pubsub", "target": "LocalMqtt" } } } I take note of LocalDebugConsole Regards.

  • Bridge simply moves messages between different communication methods. It supports 3 sources/targets which are IoT Core (IotCore), Local MQTT (LocalMqtt), and Local PubSub (Pubsub).

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions