MQTT Disconnects Without Notifying Default Linsteners- "disconnect" and "close"

0

Hello everyone,

We're encountering an issue with our MQTT client and would appreciate your help.

Our MQTT client runs on an EKS service, and it's on as long as the service is up. We configure the client using 'aws-iot-device-sdk-v2' package, with a certificate and a private key that come from a k8s secret (and are attached to a dummy thing).

After a few hours when the service is up, we stop receiving messages from subscribed topics. We don't receive any notification from "disconnect", "closed" or "error" listeners about any error, so we assume the MQTT is supposed to work. When we restart the service, we get all the messages that were supposed to arrive in retro (so it means, that no one read the message from the specific queue- we have a unique client ID, and we have one replica for the specific service).

If anyone has encountered similar issues or has expertise in MQTT setup, we would greatly appreciate any guidance or suggestions on how to fix it. We have attached the connection code below.

Thanks

import { TextDecoder } from 'util'
import { iot } from 'aws-crt'
import { mqtt } from 'aws-iot-device-sdk-v2'
import { mqttConfig } from '../config/mqtt'
import { generalConfig } from '../config'
import iotClient from './iotClient'
import logger from './logger'

const { certificateFile, privateKeyFile, caCert } = mqttConfig

const decoder = new TextDecoder('utf-8')

class MqttClientHandler {
  private connection: mqtt.MqttClientConnection | null = null

  async connectToIotCore() {
    try {
      logger.debug('Connecting to IoT Core broker')
      const brokerEndpoint = await iotClient.getEndpoint()
      await this.connect({
        mqttClientId: `mqttClientId-envUnique-serviceName`,
        brokerEndpoint,
        certificateFile,
        privateKeyFile,
        caCert
      })
    } catch (error) {
      logger.error('Failed to connect to IoT Core broker', error)
      throw error
    }
  }

  private registerListeners() {
    if (!this.connection) return
    this.connection.on('connect', () => {
      logger.info('MQTT connection established')
    })
    this.connection.on('error', error => {
      logger.error(`MQTT connection error`, { err: error })
    })
    this.connection.on('disconnect', () => {
      logger.error('MQTT connection disconnected. Exiting service.')
      process.exit(1)
    })
    this.connection.on('closed', () => {
      logger.error('MQTT connection closed. Exiting service.')
      process.exit(1)
    })
  }

  private async connect({
    certificateFile,
    privateKeyFile,
    mqttClientId,
    brokerEndpoint,
    caCert
  }: {
    certificateFile: string
    privateKeyFile: string
    mqttClientId: string
    brokerEndpoint: string
    caCert: string
  }) {
    const configBuilder =
      iot.AwsIotMqttConnectionConfigBuilder.new_mtls_builder(
        certificateFile,
        privateKeyFile
      )
    configBuilder.with_certificate_authority(caCert)
    configBuilder.with_clean_session(false)
    configBuilder.with_client_id(mqttClientId)
    configBuilder.with_endpoint(brokerEndpoint)
    const config = configBuilder.build()
    const client = new mqtt.MqttClient()
    this.connection = client.new_connection(config)
    this.registerListeners()
    await this.connection.connect()
  }

  async publish({
    topic,
    payload
  }: {
    topic: string
    payload: mqtt.Payload
  }): Promise<void> {
    if (!this.connection) {
      logger.error('MQTT connection not established')
      return
    }
    const json = typeof payload === 'string' ? payload : JSON.stringify(payload)
    logger.debug(`Publishing a message to topic ${topic}`, {
      topic,
      payload: json
    })
    try {
      await this.connection.publish(topic, json, mqtt.QoS.AtLeastOnce)
    } catch (error) {
      logger.error(`Failed to publish to topic ${topic}`, error)
      throw error
    }
  }

  async subscribe({
    topic,
    onMessage
  }: {
    topic: string
    onMessage: (topic: string, payload: string) => void
  }): Promise<void> {
    if (!this.connection) {
      logger.error('MQTT connection not established')
      return
    }
    await this.connection.subscribe(
      topic,
      mqtt.QoS.AtLeastOnce,
      (topic, payload) => {
        const json = decoder.decode(payload)
        logger.debug(`Received message on topic ${topic}, payload: ${json}`)
        onMessage(topic, json)
      }
    )
    logger.info(`Subscribed to MQTT topic: ${topic}`)
  }
}
Adim
질문됨 3달 전211회 조회
1개 답변
0

Hello,

From the description I understand that you are unable to connect to your MQTT client, rather not receiving any errors such as "disconnect", "closed" or "error". Can you kindly share the sample IoT logs for the mentioned behaviour. Also, kindly check the IoT core Cloudwatch metrics and confirm if you are observing any errors during the timeframe of the issue. Please refer the below documentation for more information on the Cloudwatch metrics:

[+]. https://docs.aws.amazon.com/iot/latest/developerguide/metrics_dimensions.html#iot-metrics

AWS
답변함 3달 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠