How to publish to the same AWS IoT Thing topic from multiple endpoints?

0

I'm currently running a test on an IoT project which is utilizing AWS IOT Core service. I have an MQTT broker with a Thing set up to which I have connection policies defined and authentication access certificates generated. Using these with a policy defined ClientID, I'm able to successfully publish project's sensors data(i.e temperature, humidity, soil pH and ambient light values) to the MQTT broker using Raspberry pi pico(RP2040) microcontroller device. From the broker, I've set up DynamoDB database tables to which the received data at the broker level is consumed. Then using NodeJs APIs listener, the data is fetched for display in a NodeJs web application in realtime. All these so far works well.

However, my actual problem kicks in whenever I try to publish data(set threshold values) from the platform to the MQTT broker using the very same topic to which the RP2040 device is publishing. Immediately whenever the platform published data is received at the broker, device published data stops being received by the broker.

Here's the API for handling platform publishing of DHT sensor threshold:

// -----------Publish to broker---------------
// DHT sensor component threshold
router.get('/dht-threshold-mqtt-publish', async (req, res) => {
  console.log("\nData Publishing ---------------------------");
  try {
    const { project_id, user_id } = req.query;

    // Fetch data from MongoDB based on project_id and user_id
    const dhtSensorData = await DHTSensor.findOne({ project_id, user_id });

    if (!dhtSensorData) {// validate if data isa available
      return res.status(404).json({ success: false, message: 'DHTSensor data not found' });
    }

    // Extract the temp_threshold from the retrieved data
    const tempThreshold = dhtSensorData.temp_threshold.toString();
    const humidThreshold = dhtSensorData.humid_threshold.toString();

    console.log("Component: DHT\nPublished thresholds;\n -Temperature: "+tempThreshold+"\n -Humidity: "+humidThreshold);

    // Construct the JSON payload
    const jsonPayload = JSON.stringify({
      user_id: user_id.toString(),
      project_id: project_id.toString(),
      dht_temperature_threshold: tempThreshold,
      dht_humidity_threshold: humidThreshold,
    });

    // call MQTT setup
    await mqttDataPublishHandler(jsonPayload, 'DHTSensor', res);

  } catch (error) {
    console.error('Error:', error);
    res.status(500).json({ success: false, message: 'Internal server error' });
  }
});

And the publishing function handler:

// MQTT broker data publish handler
async function mqttDataPublishHandler(dataPayload, targetComponent, res) {
  const mqttBrokerUrl = process.env.MQTT_BROKER_URL; // MQTT broker URL
  const topic = process.env.MQTT_PUB_TOPIC;
  const clientId = process.env.MQTT_CLIENT_ID; // Set your unique client_id
  console.log("MQTT broker url: ", mqttBrokerUrl);

  const mqttOptions = {
    clientId: clientId,
    protocol: 'mqtts', // Use 'mqtts' for MQTT over TLS/SSL
    rejectUnauthorized: true, // Set to "false" to ignore certificate validation (for testing only)
    key: fs.readFileSync(process.env.MQTT_CLIENT_KEY_PATH), // Set path to your client key
    cert: fs.readFileSync(process.env.MQTT_CLIENT_CERT_PATH), // Set path to your client certificate
    ca: fs.readFileSync(process.env.MQTT_CA_CERT_PATH), // Set path to your CA certificate
  };

  const mqttClient = mqtt.connect(mqttBrokerUrl, mqttOptions);

  let responseSent = false;

  mqttClient.on('error', (err) => {
    if (!responseSent) {
      console.error('MQTT Connection Error:', err);
      res.status(500).json({ success: false, message: 'MQTT Connection Error' });
      responseSent = true;
    }
  });

  mqttClient.on('connect', () => {
    // Publish payload to the specified MQTT topic
    mqttClient.publish(topic, dataPayload.toString(), (err) => {
      if (!responseSent) {
        if (err) {
          console.error('MQTT Publish Error:', err);
          res.status(500).json({ success: false, message: 'MQTT Publish Error' });
        } else {
          console.log(targetComponent+" threshold data published to MQTT broker.");
          // Close the MQTT connection
          mqttClient.end();
          // Respond to the client
          res.json({ success: true, data: dataPayload });
        }
        responseSent = true;
      }
    });
  });
}

Does this mean AWS IoT thing can't support multi-device publishing to a common topic and so I should consider creating a topic under Many Things? At initial stage of experiencing this, I thought it could be due to creation of concurrent connections to MQTT broker from different endpoints(platform API and from device firmware). I resolved this by ensuring platform connection to broker is only initiated at the time a new sensor threshold value is being set and thereafter close the connection. However, this did not resolve the issue.`

Any assistance kindly in case you've come across this? Thank you.

asked 2 months ago172 views
3 Answers
1
Accepted Answer

Using separate topics for publishing and subscribing, as you mentioned, is a standard approach to handling bidirectional communication in MQTT-based systems. This allows you to segregate the data flow and ensure that messages are processed appropriately by each part of your system.

Regarding your question about using certificates for multiple Things in AWS IoT:

  • Single Certificate for Multiple Things: In AWS IoT, you can attach the same certificate to multiple Things. This allows devices or services using that certificate to assume the identity of any of the Things it's attached to. This approach could work for your scenario, where the platform and the RP2040 device could use the same certificate and, depending on the topic they are publishing to or subscribing from, they could act as different Things.
  • Creating and Attaching Policies: Make sure that the IoT policy attached to your certificate allows publishing and subscribing to the topics you plan to use for both the device and the platform. Policies in AWS IoT define the permissions for the certificate and can include multiple statements allowing different actions (publish, subscribe, connect, etc.) on different topics.
  • Implementing in Firmware: When implementing this in your device firmware, you'll use the same certificate as before but make sure your device logic includes the capability to publish or subscribe to the two different topics as needed. The device will authenticate using the same certificate, and the permissions granted to that certificate via the attached policy will determine what actions it can perform.
  • Testing and Monitoring: After implementing the changes, use the AWS IoT Core Test Client available in the AWS Console to monitor the messages being published and subscribed to both topics. This will help you ensure that your setup is working as intended and debug any issues.
  • Security Considerations: Be mindful of the security implications of using the same certificate for multiple Things. Ensure that your IoT policies are as restrictive as possible to limit actions to only what's necessary for each Thing's role.

If you decide to proceed with this setup, it's essential to test thoroughly to ensure that the platform and device interactions are working as expected and that there are no security loopholes introduced by using the same certificate for multiple purposes.

profile picture
EXPERT
answered a month ago
  • Thank you so much @Giovanni for the well elaborated response.

1

Can you confirm that mqtt broker in this case is AWS IoT Core, and that you have a device (thing) successfully publishing to a topic (from process.env.MQTT_PUB_TOPIC that is invoking an AWS IoT Rule sending data to DynamoDB?

If that is the case, based on your code, the publish of device=>broker is configured. To have the client code get data, you need to subscribe to a topic where the set data is being sent. It is best practice to use two separate topics for publishing and subscribing. E.g., you can publish on thingX/telemetry and also subscribe on thingX/commands. Normal sensor data is sent to the telemetry topic (and evaluated by the rules engine) and when a command or other payload is published via a cloud resource or other device to the command topic, your device receives it.

Depending up the SDK or library you are using, look for the subscribe method. Normally there are implemented as callbacks so that when a message is received you can then process and take action accordingly.

Please let me know if this helps and if you have any follow-up questions add a comment.

AWS
Gavin_A
answered 2 months ago
  • Hello. Thank you for the response. Yes, I'm using AWS IOT mqtt broker and with the current created thing, the device can successfully publish to provided MQTT_PUB_TOPIC. Your idea of using two topics to support the two endpoints(platform and device) really sounds good and I would like to run a test using it. However, may I know how to go about using the certificates kindly? For with this approach on my project model it means the device MQTT_PUB_TOPIC will be from the THING_1 for instance and MQTT_SUB_TOPIC from THING_2. If this interpretation is correct, then I find it challenging to implement for it means I'll have to make use of both topics' certificates in the device firmware implementation as I'm currently using for the set up Thing. Is this possible? Or can I use same certificates for the two THINGS?

    If this is okay, I'll then be publishing to THING_2 topic from the platform and listen to its subscribe topic from the device.

    Thank you once again.

0

Lots of appreciation to @Giovanni and @Gavin_A for your positively insightful response. Combining your well elaborated responses I have successfully managed to get around the task.

Yes, utilizing same certificates for the two things provisioned(although I'll consider to use independent certificate for each as recommended), I've successfully managed to publish to the same AWS IoT Thing topic (device_dht22/pub) from multiple endpoints i.e RP2040 device and the platform supported with nodejs.

First, I had the main AWS IoT thing, DEVICE_DHT22 with clientID as DEVICE_DHT22 and pub/sub topics as device_dht22/pub and device_dht22/sub respectively. The device publish sensor data to broker via device_dht22/pub.

//DEVICE_DHT22 thing policy ---------------------------------------------------
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iot:Connect",
      "Resource": "arn:XXXXXX:client/DEVICE_DHT22"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Publish",
      "Resource": "arn:XXXXXX:topic/device_dht22/pub"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Subscribe",
      "Resource": "arn:XXXXXX:topicfilter/device_dht22/sub"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Receive",
      "Resource": "arn:XXXXXX:topic/device_dht22/sub"
    }
  ]
}
// END ------------------------------------------------------------------------

On the other hand, the platform is to publish control threshold settings back to the connected device via the broker through device_dht22/sub, the topic to which the device is listening to ( subscribed to ) for the set thresholds.

The second AWS IoT thing is PLATFORM_DHT with clientID as PLATFORM_DHT and pub/sub topics as device_dht22/sub for publish topic and the subscribe topic as platform_dht/sub.


//PLATFORM_DHT thing policy ---------------------------------------------------
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iot:Connect",
      "Resource": "arn:aws:XXXXXX:client/PLATFORM_DHT"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Publish",
      "Resource": "arn:aws:XXXXXX:topic/device_dht22/sub"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Subscribe",
      "Resource": "arn:aws:XXXXXX:topicfilter/platform_dht/sub"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Receive",
      "Resource": "arn:aws:XXXXXX:topic/platform_dht/sub"
    }
  ]
}
// END ------------------------------------------------------------------------

These two AWS IoT things sharing same certificate and having different client IDs from their policy settings made it possible to initiate simultaneous exchange of data between the device and the platform backend database endpoints via the MQTT broker of a single AWS IoT thing DEVICE_DHT22.

Thank you!

answered a month ago
  • Glad you got it working Ochieno! If you wouldn't mind, could you accept the answer on this question? It helps guide others here and with search optimization. One other thing is if you can have unique certificates (and/or policies) per "thing" (device or platform), that is best practice from a security perspective. Reduces blast radius if the certificate is ever compromised.

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions