MQTT Disconnects Without Notifying Default Linsteners- "disconnect" and "close"

0

Hello everyone,

We're encountering an issue with our MQTT client and would appreciate your help.

Our MQTT client runs on an EKS service, and it's on as long as the service is up. We configure the client using 'aws-iot-device-sdk-v2' package, with a certificate and a private key that come from a k8s secret (and are attached to a dummy thing).

After a few hours when the service is up, we stop receiving messages from subscribed topics. We don't receive any notification from "disconnect", "closed" or "error" listeners about any error, so we assume the MQTT is supposed to work. When we restart the service, we get all the messages that were supposed to arrive in retro (so it means, that no one read the message from the specific queue- we have a unique client ID, and we have one replica for the specific service).

If anyone has encountered similar issues or has expertise in MQTT setup, we would greatly appreciate any guidance or suggestions on how to fix it. We have attached the connection code below.

Thanks

import { TextDecoder } from 'util'
import { iot } from 'aws-crt'
import { mqtt } from 'aws-iot-device-sdk-v2'
import { mqttConfig } from '../config/mqtt'
import { generalConfig } from '../config'
import iotClient from './iotClient'
import logger from './logger'

const { certificateFile, privateKeyFile, caCert } = mqttConfig

const decoder = new TextDecoder('utf-8')

class MqttClientHandler {
  private connection: mqtt.MqttClientConnection | null = null

  async connectToIotCore() {
    try {
      logger.debug('Connecting to IoT Core broker')
      const brokerEndpoint = await iotClient.getEndpoint()
      await this.connect({
        mqttClientId: `mqttClientId-envUnique-serviceName`,
        brokerEndpoint,
        certificateFile,
        privateKeyFile,
        caCert
      })
    } catch (error) {
      logger.error('Failed to connect to IoT Core broker', error)
      throw error
    }
  }

  private registerListeners() {
    if (!this.connection) return
    this.connection.on('connect', () => {
      logger.info('MQTT connection established')
    })
    this.connection.on('error', error => {
      logger.error(`MQTT connection error`, { err: error })
    })
    this.connection.on('disconnect', () => {
      logger.error('MQTT connection disconnected. Exiting service.')
      process.exit(1)
    })
    this.connection.on('closed', () => {
      logger.error('MQTT connection closed. Exiting service.')
      process.exit(1)
    })
  }

  private async connect({
    certificateFile,
    privateKeyFile,
    mqttClientId,
    brokerEndpoint,
    caCert
  }: {
    certificateFile: string
    privateKeyFile: string
    mqttClientId: string
    brokerEndpoint: string
    caCert: string
  }) {
    const configBuilder =
      iot.AwsIotMqttConnectionConfigBuilder.new_mtls_builder(
        certificateFile,
        privateKeyFile
      )
    configBuilder.with_certificate_authority(caCert)
    configBuilder.with_clean_session(false)
    configBuilder.with_client_id(mqttClientId)
    configBuilder.with_endpoint(brokerEndpoint)
    const config = configBuilder.build()
    const client = new mqtt.MqttClient()
    this.connection = client.new_connection(config)
    this.registerListeners()
    await this.connection.connect()
  }

  async publish({
    topic,
    payload
  }: {
    topic: string
    payload: mqtt.Payload
  }): Promise<void> {
    if (!this.connection) {
      logger.error('MQTT connection not established')
      return
    }
    const json = typeof payload === 'string' ? payload : JSON.stringify(payload)
    logger.debug(`Publishing a message to topic ${topic}`, {
      topic,
      payload: json
    })
    try {
      await this.connection.publish(topic, json, mqtt.QoS.AtLeastOnce)
    } catch (error) {
      logger.error(`Failed to publish to topic ${topic}`, error)
      throw error
    }
  }

  async subscribe({
    topic,
    onMessage
  }: {
    topic: string
    onMessage: (topic: string, payload: string) => void
  }): Promise<void> {
    if (!this.connection) {
      logger.error('MQTT connection not established')
      return
    }
    await this.connection.subscribe(
      topic,
      mqtt.QoS.AtLeastOnce,
      (topic, payload) => {
        const json = decoder.decode(payload)
        logger.debug(`Received message on topic ${topic}, payload: ${json}`)
        onMessage(topic, json)
      }
    )
    logger.info(`Subscribed to MQTT topic: ${topic}`)
  }
}
Adim
已提問 3 個月前檢視次數 210 次
1 個回答
0

Hello,

From the description I understand that you are unable to connect to your MQTT client, rather not receiving any errors such as "disconnect", "closed" or "error". Can you kindly share the sample IoT logs for the mentioned behaviour. Also, kindly check the IoT core Cloudwatch metrics and confirm if you are observing any errors during the timeframe of the issue. Please refer the below documentation for more information on the Cloudwatch metrics:

[+]. https://docs.aws.amazon.com/iot/latest/developerguide/metrics_dimensions.html#iot-metrics

AWS
已回答 3 個月前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南