MQTT Disconnects Without Notifying Default Linsteners- "disconnect" and "close"

0

Hello everyone,

We're encountering an issue with our MQTT client and would appreciate your help.

Our MQTT client runs on an EKS service, and it's on as long as the service is up. We configure the client using 'aws-iot-device-sdk-v2' package, with a certificate and a private key that come from a k8s secret (and are attached to a dummy thing).

After a few hours when the service is up, we stop receiving messages from subscribed topics. We don't receive any notification from "disconnect", "closed" or "error" listeners about any error, so we assume the MQTT is supposed to work. When we restart the service, we get all the messages that were supposed to arrive in retro (so it means, that no one read the message from the specific queue- we have a unique client ID, and we have one replica for the specific service).

If anyone has encountered similar issues or has expertise in MQTT setup, we would greatly appreciate any guidance or suggestions on how to fix it. We have attached the connection code below.

Thanks

import { TextDecoder } from 'util'
import { iot } from 'aws-crt'
import { mqtt } from 'aws-iot-device-sdk-v2'
import { mqttConfig } from '../config/mqtt'
import { generalConfig } from '../config'
import iotClient from './iotClient'
import logger from './logger'

const { certificateFile, privateKeyFile, caCert } = mqttConfig

const decoder = new TextDecoder('utf-8')

class MqttClientHandler {
  private connection: mqtt.MqttClientConnection | null = null

  async connectToIotCore() {
    try {
      logger.debug('Connecting to IoT Core broker')
      const brokerEndpoint = await iotClient.getEndpoint()
      await this.connect({
        mqttClientId: `mqttClientId-envUnique-serviceName`,
        brokerEndpoint,
        certificateFile,
        privateKeyFile,
        caCert
      })
    } catch (error) {
      logger.error('Failed to connect to IoT Core broker', error)
      throw error
    }
  }

  private registerListeners() {
    if (!this.connection) return
    this.connection.on('connect', () => {
      logger.info('MQTT connection established')
    })
    this.connection.on('error', error => {
      logger.error(`MQTT connection error`, { err: error })
    })
    this.connection.on('disconnect', () => {
      logger.error('MQTT connection disconnected. Exiting service.')
      process.exit(1)
    })
    this.connection.on('closed', () => {
      logger.error('MQTT connection closed. Exiting service.')
      process.exit(1)
    })
  }

  private async connect({
    certificateFile,
    privateKeyFile,
    mqttClientId,
    brokerEndpoint,
    caCert
  }: {
    certificateFile: string
    privateKeyFile: string
    mqttClientId: string
    brokerEndpoint: string
    caCert: string
  }) {
    const configBuilder =
      iot.AwsIotMqttConnectionConfigBuilder.new_mtls_builder(
        certificateFile,
        privateKeyFile
      )
    configBuilder.with_certificate_authority(caCert)
    configBuilder.with_clean_session(false)
    configBuilder.with_client_id(mqttClientId)
    configBuilder.with_endpoint(brokerEndpoint)
    const config = configBuilder.build()
    const client = new mqtt.MqttClient()
    this.connection = client.new_connection(config)
    this.registerListeners()
    await this.connection.connect()
  }

  async publish({
    topic,
    payload
  }: {
    topic: string
    payload: mqtt.Payload
  }): Promise<void> {
    if (!this.connection) {
      logger.error('MQTT connection not established')
      return
    }
    const json = typeof payload === 'string' ? payload : JSON.stringify(payload)
    logger.debug(`Publishing a message to topic ${topic}`, {
      topic,
      payload: json
    })
    try {
      await this.connection.publish(topic, json, mqtt.QoS.AtLeastOnce)
    } catch (error) {
      logger.error(`Failed to publish to topic ${topic}`, error)
      throw error
    }
  }

  async subscribe({
    topic,
    onMessage
  }: {
    topic: string
    onMessage: (topic: string, payload: string) => void
  }): Promise<void> {
    if (!this.connection) {
      logger.error('MQTT connection not established')
      return
    }
    await this.connection.subscribe(
      topic,
      mqtt.QoS.AtLeastOnce,
      (topic, payload) => {
        const json = decoder.decode(payload)
        logger.debug(`Received message on topic ${topic}, payload: ${json}`)
        onMessage(topic, json)
      }
    )
    logger.info(`Subscribed to MQTT topic: ${topic}`)
  }
}
Adim
asked 3 months ago197 views
1 Answer
0

Hello,

From the description I understand that you are unable to connect to your MQTT client, rather not receiving any errors such as "disconnect", "closed" or "error". Can you kindly share the sample IoT logs for the mentioned behaviour. Also, kindly check the IoT core Cloudwatch metrics and confirm if you are observing any errors during the timeframe of the issue. Please refer the below documentation for more information on the Cloudwatch metrics:

[+]. https://docs.aws.amazon.com/iot/latest/developerguide/metrics_dimensions.html#iot-metrics

AWS
answered 3 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions