Hi,
I have 2 instances of Spring boot REST API application that run on 2 servers that subscribe to IOT Core MQTT shared subscription topic. When 1 instance of the application was shut down, the other instance not receive all the published messages from IOT core MQTT server. When 1 instance of the application shut down, the other instance suppose to receive all the messages from MQTT server. I have set the keepAliveIntervalSeconds to 60 seconds. Are there any settings that I missed or misconfigured?
Below is the simplified version of my code:
import java.util.concurrent.*;
import org.bson.types.ObjectId;
import org.slf4j.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.*;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.mqtt5.*;
import software.amazon.awssdk.crt.mqtt5.packets.*;
import software.amazon.awssdk.iot.AwsIotMqtt5ClientBuilder;
@Component
public class TestRunner implements ApplicationRunner {
private static final Logger log = LoggerFactory.getLogger(TestRunner.class);
private ApplicationContext appContext;
private Mqtt5Client client;
private MqttPublishEventsHandler publishEventsHandler = new MqttPublishEventsHandler();
private MqttLifecycleEventsHandler lifecycleEventsHandler = new MqttLifecycleEventsHandler();
@Value("${application.mqttHost}")
private String mqttHost;
@Value("${application.mqttPrivateKeyPath}")
private String mqttPrivateKeyPath;
@Value("${application.mqttDeviceCertPath}")
private String mqttDeviceCertPath;
public TestRunner(ApplicationContext appContext) {
this.appContext = appContext;
}
@Override
public void run(ApplicationArguments args) {
_initMqttClient();
_subscribeMqttTopic();
}
private void _initMqttClient() {
AwsIotMqtt5ClientBuilder builder = AwsIotMqtt5ClientBuilder.newDirectMqttBuilderWithMtlsFromPath(
mqttHost, mqttDeviceCertPath, mqttPrivateKeyPath);
ConnectPacket.ConnectPacketBuilder connectProperties = new ConnectPacket.ConnectPacketBuilder();
connectProperties.withClientId("testing-" + new ObjectId().toHexString());
connectProperties.withKeepAliveIntervalSeconds(60l);
builder.withConnectProperties(connectProperties);
builder.withMinReconnectDelayMs(5000l);
builder.withMaxReconnectDelayMs(60000l);
builder.withLifeCycleEvents(lifecycleEventsHandler);
builder.withPublishEvents(publishEventsHandler);
client = builder.build();
builder.close();
try {
client.start(); // Connect to mqtt broker
lifecycleEventsHandler.connectedFuture.get(15, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Exception occurred during connecting to mqtt broker", e);
System.exit(SpringApplication.exit(appContext, () -> 0));
}
}
private void _subscribeMqttTopic() {
String topic = "$share/testing/topic1";
SubscribePacket.SubscribePacketBuilder subscribeBuilder = new SubscribePacket.SubscribePacketBuilder();
subscribeBuilder.withSubscription(topic, QOS.AT_LEAST_ONCE, false, false, SubscribePacket.RetainHandlingType.DONT_SEND);
try {
SubAckPacket a = client.subscribe(subscribeBuilder.build()).get(15, TimeUnit.SECONDS);
} catch (Exception e) {
System.exit(SpringApplication.exit(appContext, () -> 0));
}
}
static final class MqttLifecycleEventsHandler implements Mqtt5ClientOptions.LifecycleEvents {
CompletableFuture<Void> connectedFuture = new CompletableFuture<>();
CompletableFuture<Void> stoppedFuture = new CompletableFuture<>();
@Override public void onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn) {
log.info("Mqtt5 client: Attempting connection...");
}
@Override public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn) {
log.info("Mqtt5 client is connected successfully, client ID: " + onConnectionSuccessReturn.getNegotiatedSettings().getAssignedClientID());
connectedFuture.complete(null);
}
@Override public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) {
String errorString = CRT.awsErrorString(onConnectionFailureReturn.getErrorCode());
log.warn("Mqtt5 client: Connection failed with error: " + errorString);
connectedFuture.completeExceptionally(new Exception("Could not connect: " + errorString));
}
@Override public void onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn) {
log.info("Mqtt5 client has disconnected");
DisconnectPacket disconnectPacket = onDisconnectionReturn.getDisconnectPacket();
if (disconnectPacket != null) {
log.info("tDisconnection packet code: " + disconnectPacket.getReasonCode());
log.info("tDisconnection packet reason: " + disconnectPacket.getReasonString());
if (disconnectPacket.getReasonCode() == DisconnectPacket.DisconnectReasonCode.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED) {
client.stop(null);
}
}
}
@Override public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) {
stoppedFuture.complete(null);
}
}
static final class MqttPublishEventsHandler implements Mqtt5ClientOptions.PublishEvents {
@Override public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
PublishPacket publishPacket = publishReturn.getPublishPacket();
String strMsgJson = new String(publishPacket.getPayload());
log.info("Received message from mqtt broker for topic {}. json = {}", publishPacket.getTopic(), strMsgJson);
}
}
}
I am new to MQTT and AWS IOT core, may I know why enabling the persistent session able to solve the issue where last instance not able to receive all messages when the other instances were shut down?
When all other instances were shut down and left the last instance running, this last instance should receive all the messages from MQTT server right?
My guess here is, you want a group of nodes to process messages with QoS 1 (at least once). If message 1 was consumed by instance A, but the instance fails before acknowledging the message, instance B won't see the message again without a persistent session. A persistent session will keep any unacknowledged message, so if you are using shared subscription, after instance A fails, if instance B is in the same consumer group, it will receive the unacknowledged message 1 back again.
let say I send a message to MQTT broker after instance A is down, all messages that I sent after instance A is down should be routed to instance B right? However, instance B is not receiving all the messages that sent after instance A is down