IOT core MQTT shared subscription not working as expected

0

Hi,

I have 2 instances of Spring boot REST API application that run on 2 servers that subscribe to IOT Core MQTT shared subscription topic. When 1 instance of the application was shut down, the other instance not receive all the published messages from IOT core MQTT server. When 1 instance of the application shut down, the other instance suppose to receive all the messages from MQTT server. I have set the keepAliveIntervalSeconds to 60 seconds. Are there any settings that I missed or misconfigured?

Below is the simplified version of my code:

import java.util.concurrent.*;

import org.bson.types.ObjectId;
import org.slf4j.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.*;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.mqtt5.*;
import software.amazon.awssdk.crt.mqtt5.packets.*;
import software.amazon.awssdk.iot.AwsIotMqtt5ClientBuilder;

@Component
public class TestRunner implements ApplicationRunner {
  private static final Logger log = LoggerFactory.getLogger(TestRunner.class);
	
  private ApplicationContext appContext;
  private Mqtt5Client client;
  private MqttPublishEventsHandler publishEventsHandler = new MqttPublishEventsHandler();
  private MqttLifecycleEventsHandler lifecycleEventsHandler = new MqttLifecycleEventsHandler();
	
  @Value("${application.mqttHost}")
  private String mqttHost;
	
  @Value("${application.mqttPrivateKeyPath}")
  private String mqttPrivateKeyPath;
	
  @Value("${application.mqttDeviceCertPath}")
  private String mqttDeviceCertPath;
	
  public TestRunner(ApplicationContext appContext) {
	this.appContext = appContext;
  }

  @Override
  public void run(ApplicationArguments args) {
	_initMqttClient();
	_subscribeMqttTopic();
  }
	
  private void _initMqttClient() {
        AwsIotMqtt5ClientBuilder builder = AwsIotMqtt5ClientBuilder.newDirectMqttBuilderWithMtlsFromPath(
        		mqttHost, mqttDeviceCertPath, mqttPrivateKeyPath);
        ConnectPacket.ConnectPacketBuilder connectProperties = new ConnectPacket.ConnectPacketBuilder();
        connectProperties.withClientId("testing-" + new ObjectId().toHexString());
        connectProperties.withKeepAliveIntervalSeconds(60l);
        
        builder.withConnectProperties(connectProperties);
        builder.withMinReconnectDelayMs(5000l);
        builder.withMaxReconnectDelayMs(60000l);
        builder.withLifeCycleEvents(lifecycleEventsHandler);

        builder.withPublishEvents(publishEventsHandler);
        client = builder.build();
        builder.close();
        
        try {
            client.start(); // Connect to mqtt broker
            lifecycleEventsHandler.connectedFuture.get(15, TimeUnit.SECONDS);
        } catch (Exception e) {
            log.error("Exception occurred during connecting to mqtt broker", e);
            System.exit(SpringApplication.exit(appContext, () -> 0));
        }
  }
	
  private void _subscribeMqttTopic() {
        String topic = "$share/testing/topic1";
        SubscribePacket.SubscribePacketBuilder subscribeBuilder = new SubscribePacket.SubscribePacketBuilder();
        subscribeBuilder.withSubscription(topic, QOS.AT_LEAST_ONCE, false, false, SubscribePacket.RetainHandlingType.DONT_SEND);
        try {
            SubAckPacket a = client.subscribe(subscribeBuilder.build()).get(15, TimeUnit.SECONDS);
        } catch (Exception e) {
            System.exit(SpringApplication.exit(appContext, () -> 0));
        }
  }
	
    static final class MqttLifecycleEventsHandler implements Mqtt5ClientOptions.LifecycleEvents {
        CompletableFuture<Void> connectedFuture = new CompletableFuture<>();
        CompletableFuture<Void> stoppedFuture = new CompletableFuture<>();
        
        @Override public void onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn) {
            log.info("Mqtt5 client: Attempting connection...");
        }

        @Override public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn) {
	    log.info("Mqtt5 client is connected successfully, client ID: " + onConnectionSuccessReturn.getNegotiatedSettings().getAssignedClientID());
	    connectedFuture.complete(null);
        }

        @Override public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) {
            String errorString = CRT.awsErrorString(onConnectionFailureReturn.getErrorCode());
            log.warn("Mqtt5 client: Connection failed with error: " + errorString);
       		connectedFuture.completeExceptionally(new Exception("Could not connect: " + errorString));
        }

        @Override public void onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn) {
	    log.info("Mqtt5 client has disconnected");
	    DisconnectPacket disconnectPacket = onDisconnectionReturn.getDisconnectPacket();
	    if (disconnectPacket != null) {
	         log.info("tDisconnection packet code: " + disconnectPacket.getReasonCode());
	         log.info("tDisconnection packet reason: " + disconnectPacket.getReasonString());
	            	
	         if (disconnectPacket.getReasonCode() == DisconnectPacket.DisconnectReasonCode.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED) {
	             client.stop(null);
	         }
	    }
        }

        @Override public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) {
            stoppedFuture.complete(null);
        }
    }
    
    static final class MqttPublishEventsHandler implements Mqtt5ClientOptions.PublishEvents {
	@Override public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
	     PublishPacket publishPacket = publishReturn.getPublishPacket();
	     String strMsgJson = new String(publishPacket.getPayload());
	     log.info("Received message from mqtt broker for topic {}. json = {}", publishPacket.getTopic(), strMsgJson);
	}
    }
}  
Kenneth
asked 3 months ago228 views
2 Answers
0

Since you are using MQTT 5 and the AWS IoT SDK, here are the key things to check regarding persistent sessions:

  • When creating the MQTT client connections, make sure to set the Clean Start property to 0 in the client configuration. This ensures persistent sessions are used. See https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt-persistent-sessions for more details.

  • Also set the Session Expiry interval property to a non-zero value (for example 3600 seconds) when creating the clients. This controls how long the session state will be maintained after a disconnect.

  • The AWS IoT SDK handles reconnect logic automatically in most languages. But confirm reconnect is enabled and the retry strategy if needed.

  • Do you want to use shared subscriptions? Shared Subscriptions allow multiple clients to share a subscription to a topic and only one client will receive messages published to that topic using a random distribution. Shared Subscriptions can effectively load balance MQTT messages across a number of subscribers. See https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt5-shared-subscription for more details

  • Monitor the persistent session metrics in CloudWatch to validate sessions are resumed properly after disconnects/reconnects.

I guess you are looking for a combination of persistent sessions and shared subscriptions, but let me know if you have any other specific configuration details to discuss or need clarification on using persistent sessions with the AWS IoT SDK.

profile pictureAWS
answered 3 months ago
  • I am new to MQTT and AWS IOT core, may I know why enabling the persistent session able to solve the issue where last instance not able to receive all messages when the other instances were shut down?

  • When all other instances were shut down and left the last instance running, this last instance should receive all the messages from MQTT server right?

  • My guess here is, you want a group of nodes to process messages with QoS 1 (at least once). If message 1 was consumed by instance A, but the instance fails before acknowledging the message, instance B won't see the message again without a persistent session. A persistent session will keep any unacknowledged message, so if you are using shared subscription, after instance A fails, if instance B is in the same consumer group, it will receive the unacknowledged message 1 back again.

  • let say I send a message to MQTT broker after instance A is down, all messages that I sent after instance A is down should be routed to instance B right? However, instance B is not receiving all the messages that sent after instance A is down

0

let say I send a message to MQTT broker after instance A is down, all messages that I sent after instance A is down should be routed to instance B right. However, instance B is not receiving all the messages that sent after instance A is down

Is this just while the keep alive is timing out, or the problem persists even after the keep alive expires?

profile pictureAWS
EXPERT
Greg_B
answered 3 months ago
  • After adding the following configuration, the issue not happen again:

        connectProperties.withSessionExpiryIntervalSeconds(60l);        
        builder.withPingTimeoutMs(30000l);
        builder.withSessionBehavior(ClientSessionBehavior.REJOIN_ALWAYS);
    
  • May i know what is the different between ClientSessionBehavior.REJOIN_ALWAYS and ClientSessionBehavior.REJOIN_POST_SUCCESS? I try to understand by reading the comment but still couldn't figure out the differences. Should I use ClientSessionBehavior.REJOIN_ALWAYS or ClientSessionBehavior.REJOIN_POST_SUCCESS for persistent session?

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions