Hello everyone,
We're encountering an issue with our MQTT client and would appreciate your help.
Our MQTT client runs on an EKS service, and it's on as long as the service is up.
We configure the client using 'aws-iot-device-sdk-v2' package, with a certificate and a private key that come from a k8s secret (and are attached to a dummy thing).
After a few hours when the service is up, we stop receiving messages from subscribed topics.
We don't receive any notification from "disconnect", "closed" or "error" listeners about any error, so we assume the MQTT is supposed to work.
When we restart the service, we get all the messages that were supposed to arrive in retro (so it means, that no one read the message from the specific queue- we have a unique client ID, and we have one replica for the specific service).
If anyone has encountered similar issues or has expertise in MQTT setup, we would greatly appreciate any guidance or suggestions on how to fix it.
We have attached the connection code below.
Thanks
import { TextDecoder } from 'util'
import { iot } from 'aws-crt'
import { mqtt } from 'aws-iot-device-sdk-v2'
import { mqttConfig } from '../config/mqtt'
import { generalConfig } from '../config'
import iotClient from './iotClient'
import logger from './logger'
const { certificateFile, privateKeyFile, caCert } = mqttConfig
const decoder = new TextDecoder('utf-8')
class MqttClientHandler {
private connection: mqtt.MqttClientConnection | null = null
async connectToIotCore() {
try {
logger.debug('Connecting to IoT Core broker')
const brokerEndpoint = await iotClient.getEndpoint()
await this.connect({
mqttClientId: `mqttClientId-envUnique-serviceName`,
brokerEndpoint,
certificateFile,
privateKeyFile,
caCert
})
} catch (error) {
logger.error('Failed to connect to IoT Core broker', error)
throw error
}
}
private registerListeners() {
if (!this.connection) return
this.connection.on('connect', () => {
logger.info('MQTT connection established')
})
this.connection.on('error', error => {
logger.error(`MQTT connection error`, { err: error })
})
this.connection.on('disconnect', () => {
logger.error('MQTT connection disconnected. Exiting service.')
process.exit(1)
})
this.connection.on('closed', () => {
logger.error('MQTT connection closed. Exiting service.')
process.exit(1)
})
}
private async connect({
certificateFile,
privateKeyFile,
mqttClientId,
brokerEndpoint,
caCert
}: {
certificateFile: string
privateKeyFile: string
mqttClientId: string
brokerEndpoint: string
caCert: string
}) {
const configBuilder =
iot.AwsIotMqttConnectionConfigBuilder.new_mtls_builder(
certificateFile,
privateKeyFile
)
configBuilder.with_certificate_authority(caCert)
configBuilder.with_clean_session(false)
configBuilder.with_client_id(mqttClientId)
configBuilder.with_endpoint(brokerEndpoint)
const config = configBuilder.build()
const client = new mqtt.MqttClient()
this.connection = client.new_connection(config)
this.registerListeners()
await this.connection.connect()
}
async publish({
topic,
payload
}: {
topic: string
payload: mqtt.Payload
}): Promise<void> {
if (!this.connection) {
logger.error('MQTT connection not established')
return
}
const json = typeof payload === 'string' ? payload : JSON.stringify(payload)
logger.debug(`Publishing a message to topic ${topic}`, {
topic,
payload: json
})
try {
await this.connection.publish(topic, json, mqtt.QoS.AtLeastOnce)
} catch (error) {
logger.error(`Failed to publish to topic ${topic}`, error)
throw error
}
}
async subscribe({
topic,
onMessage
}: {
topic: string
onMessage: (topic: string, payload: string) => void
}): Promise<void> {
if (!this.connection) {
logger.error('MQTT connection not established')
return
}
await this.connection.subscribe(
topic,
mqtt.QoS.AtLeastOnce,
(topic, payload) => {
const json = decoder.decode(payload)
logger.debug(`Received message on topic ${topic}, payload: ${json}`)
onMessage(topic, json)
}
)
logger.info(`Subscribed to MQTT topic: ${topic}`)
}
}