IoT Core broker and an MQTT client running in a docker container, and the problem is that it's reconnecting every 20 min

0

We have a problem with an IoT Core broker and an MQTT client running in a docker container, and it's reconnecting every 20 min, it looks like an interval of some scheduled procedure.

We found that this problem can be fixed by adding the --cap-add NET_ADMIN argument to the container to the start command, but this solution is a security hole (and we are also not sure that Fargate + ECS will allow a container with NET_ADMIN to start).

Perhaps someone can tell us what routings/ports/protocols we need to publish for the container so that scheduled broker procedure works correctly?

The callbacks are triggered through MqttClientConnectionEvents object.


callbackhandler example

package org.testproject.gha.service.message;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.testproject.gha.entity.db.Assignment;
import org.testproject.gha.entity.db.Gateway;
import org.testproject.gha.entity.db.User;
import org.testproject.gha.entity.exception.BadRequestException;
import org.testproject.gha.entity.io.da.GoogleIntent;
import org.testproject.gha.entity.io.da.request.GoogleRequest;
import org.testproject.gha.entity.io.da.response.error.ErrorResponse;
import org.testproject.gha.entity.topic.Answer;
import org.testproject.gha.entity.topic.InternalResponses;
import org.testproject.gha.entity.topic.MqttAbstract;
import org.testproject.gha.entity.topic.req.MqttRequest;
import org.testproject.gha.service.OauthService;
import org.testproject.gha.service.db.mana.GatewayManager;
import org.testproject.gha.service.db.repo.UserRepository;
import org.testproject.gha.service.trait.GoogleErrors;
import org.testproject.gha.service.trait.Verbal;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.auth.credentials.CredentialsProvider;
import software.amazon.awssdk.crt.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.crt.mqtt.MqttClientConnection;
import software.amazon.awssdk.crt.mqtt.MqttClientConnectionEvents;
import software.amazon.awssdk.crt.mqtt.MqttMessage;
import software.amazon.awssdk.crt.mqtt.QualityOfService;
import software.amazon.awssdk.iot.AwsIotMqttConnectionBuilder;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

@Log4j2
@Service
@RequiredArgsConstructor
public class CallbackHandler implements Verbal {
    private MqttClientConnection connection;
    private final CallbackRegistry registry;
    private final ExecutorService executor;
    private final GatewayManager gateways;
    private final UserRepository users;
    private final OauthService oauth;

    private final MqttClientConnectionEvents callbacks = new MqttClientConnectionEvents() {
        public void onConnectionInterrupted(int code) {
            log.warn("Interruptor <{}> : [{}] {}", code, CRT.awsErrorName(code), CRT.awsErrorString(code));
        }

        public void onConnectionResumed(boolean flag) {
            log.warn("Resuming: {} [true if the session has been resumed.]", flag);
            CompletableFuture.runAsync(() -> subscribe());
        }
    };

    @Value("${service.description}")
    private String description;
    @Value(value = "${aws.iot.endpoint}")
    private String endpoint;
    @Value(value = "${aws.iot.clientId}")
    private String clientId;

    @Value(value = "${aws.iot.access}")
    private String access;
    @Value(value = "${aws.iot.secret}")
    private String secret;

    public JSONObject execute(String serial, String command, GoogleRequest google) {
        try {
            switch (command) {
                /*Google cases*/
                case GATEWAY_SYNC: {
                    MqttRequest request = MqttAbstract.sync(serial, google);
                    CompletableFuture<JSONObject> future = async(request);
                    return registry.extract(new Answer(future, serial, request.getBody().getPayload().getRequestId()));
                }
                case GATEWAY_QUERY: {
                    MqttRequest request = MqttAbstract.query(serial, google);
                    CompletableFuture<JSONObject> future = async(request);
                    return registry.extract(new Answer(future, serial, request.getBody().getPayload().getRequestId()));
                }
                case GATEWAY_EXECUTE: {
                    MqttRequest request = MqttAbstract.execute(serial, google);
                    CompletableFuture<JSONObject> future = async(request);
                    return registry.extract(new Answer(future, serial, request.getBody().getPayload().getRequestId()));
                }

                /*Default behavior*/
                default: {
                    MqttRequest request = MqttAbstract.system(serial);
                    CompletableFuture<JSONObject> future = async(MqttAbstract.system(serial));
                    return registry.extract(new Answer(future, serial, request.getBody().getPayload().getRequestId()));
                }
            }
        } catch (InterruptedException | ExecutionException e) {
            log.error("execute: {} - {}", e.getClass().getSimpleName(), e.getMessage());
            throw new RuntimeException(e);
        }
    }

    private CompletableFuture<JSONObject> async(MqttAbstract req) {
        final MqttMessage message = new MqttMessage(req.getTopic(), req.bytes(), QualityOfService.AT_LEAST_ONCE);

        executor.execute(() -> {
            log.info("Pub-ing to [{}]: {}", req.getTopic(), new String(message.getPayload()));
            this.publish(message);
        });

        return registry.waitForMqtt(req.getBody().getPayload().getRequestId());
    }

    private void subscribe(InternalResponses topic) {
        try {
            this.connection.subscribe(topic.getTopic(), QualityOfService.AT_LEAST_ONCE, topic.getHandler()).get();
        } catch (Exception exc) {
            log.error("subscribe [{}]: {}", exc.getClass().getSimpleName(), exc.getMessage());
        }
    }

    private void subscribe() {
        log.info("Starting subscribing...");
        try {
            this.subscribe(new InternalResponses(gateways, this.registry));
        } catch (Exception error) {
            log.error("onApplicationEvent [{}]: {}", error.getClass().getSimpleName(), error.getMessage());
        }
    }

    public void publish(MqttMessage message) {
        try {
            log.info("Publish to {}", message.getTopic());
            executor.execute(() -> connection.publish(message));
        } catch (Throwable e) {
            log.error("publish: {} - {}", e.getClass().getSimpleName(), e.getMessage());
        }
    }

    @PostConstruct
    public void construct() {
//        AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder
//                .newDefaultBuilder()
//                .withEndpoint(endpoint)
//                .withClientId(clientId)
//                .withConnectionEventCallbacks(callbacks)
//                .withProtocolOperationTimeoutMs(MINUTE)
//                .withCleanSession(true)
//                .withWebsockets(true)
//                .withWebsocketSigningRegion("eu-north-1")
//                .withWebsocketCredentialsProvider(provider)

//        AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder.newMtlsBuilderFromPath(
//                certFile.getAbsolutePath(), keyFile.getAbsolutePath())

        /*File certFile = new File("/cert.crt");
        File keyFile = new File("/pri.key");*/

        final CredentialsProvider provider = new StaticCredentialsProvider.StaticCredentialsProviderBuilder()
                .withAccessKeyId(access.getBytes(StandardCharsets.UTF_8))
                .withSecretAccessKey(secret.getBytes(StandardCharsets.UTF_8))
                .build();

        try (AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder
                .newDefaultBuilder()
                .withEndpoint(endpoint)
                .withClientId(clientId)
                .withConnectionEventCallbacks(callbacks)
                .withProtocolOperationTimeoutMs(MINUTE)
                .withCleanSession(true)
                .withWebsockets(true)
                .withWebsocketSigningRegion("eu-north-1")
                .withWebsocketCredentialsProvider(provider)) {
            this.connection = builder.build();
            this.connection.connect()
                           .whenCompleteAsync((connectResult, error) -> this.subscribe());//should be whenCompleteAsync
            log.info("{} to [{}] as {}", description, endpoint, clientId);
        } catch (Exception e) {
            log.error("postConstruct: {} - {}", e.getClass().getSimpleName(), e.getMessage());
        }
    }

    @PreDestroy
    public void destroy() {
        log.info("Shutting down [{}]", description);
    }

    public ResponseEntity<?> intent(GoogleRequest request, String bearer) {
        final String intent = request.getInputs().get(0).getIntent();
        final String userId = String.valueOf(oauth.authenticate(request, bearer));

        log.info("{} Start [{}] for {} {}", LOG_EDG, intent, userId, LOG_EDG);
        if (!GoogleIntent.SYNC.equals(intent)) {
            log.info("Origin Req {}", JSON.toJSONString(request, true));
        }

        final List<String> list = this.linkedTo(userId);
        log.info("Selected {} for {}", list, userId);

        if (list.isEmpty()) {
            log.warn("User [id={}] dont have any gateways", userId);
            return ResponseEntity.badRequest()
                                 .body(new ErrorResponse(GoogleErrors.DEVICE_NOT_FOUND, request.getRequestId()));
        }
        final String serial = list.get(0);

        try {
            switch (intent) {
                case GoogleIntent.SYNC:
                    return extract(serial, GATEWAY_SYNC, request, userId);

                case GoogleIntent.QUERY:
                    return extract(serial, GATEWAY_QUERY, request, userId);

                case GoogleIntent.EXECUTE:
                    return extract(serial, GATEWAY_EXECUTE, request, userId);

                case GoogleIntent.DISCONNECT:
                    log.info("successful handle of DISCONNECT: {}", request);
                    return ResponseEntity.ok("{}");
            }
        } catch (RuntimeException error) {
            log.error("intent: {}, exception: {} {}", intent, error.getClass().getSimpleName(), error.getMessage());
            log.error("{} {} [{}] for {} {}", LOG_EDG, LOG_END, intent, userId, LOG_EDG);
            if (error instanceof NullPointerException) {
                error.printStackTrace();
            }

            return ResponseEntity.badRequest()
                                 .body(new ErrorResponse(GoogleErrors.PROTOCOL_ERROR, request.getRequestId()));
        }
        log.error("unknown intent: {}", intent);
        return ResponseEntity.badRequest().body(new ErrorResponse(GoogleErrors.NOT_SUPPORTED, request.getRequestId()));
    }

    public List<String> linkedTo(String userId) {
        log.info("Search for [{}] gateways...", userId);
        long id = Long.parseLong(userId);
        if (users.existsById(id)) {
            final User user = users.findById(id).get();
            return user.getAssignments()
                       .stream()
                       .map(Assignment::getGateway)
                       .map(Gateway::getSerialNumber)
                       .collect(Collectors.toList());
        } else {
            throw new BadRequestException(String.format("User [ID=%s] dont exist", userId));
        }
    }

    private ResponseEntity<?> extract(String serial, String command, GoogleRequest google, String agentUserId) {
        final JSONObject answer = this.execute(serial, command, google).getJSONObject("data");

        if (answer.containsKey("payload")) {
            if (answer.getJSONObject("payload").containsKey("payload")) {
                answer.getJSONObject("payload").getJSONObject("payload").put("agentUserId", agentUserId);
            }
        }

        final JSONObject result = answer.getJSONObject("payload");
        return ResponseEntity.ok(result);
    }
}

docker file example

#FROM amazoncorretto:11-alpine-jdk
FROM openjdk:11-jdk
MAINTAINER testproject.com    
COPY build/libs/* /
EXPOSE 80
EXPOSE 443
EXPOSE 8883
EXPOSE 8080
COPY app.sh /app.sh
RUN ["chmod", "+x", "/app.sh"]
ENTRYPOINT ["/app.sh"]

logs sample 2023-07-26T11:01:01 26 WA 11:01:01 CallbackHandler : Interruptor <1051> : [AWS_IO_SOCKET_CLOSED] socket is closed. 2023-07-26T11:01:02 26 WA 11:01:02 CallbackHandler : Resuming: false [true if the session has been resumed.]

  • There shouldn't be the need to expose ports, as an MQTT connection to AWS IoT Core is outbound only. What value is defined for MINUTES, which is used for MQTT PINGREQ (keepalive)? Also, what is the network configuration from the container to the docker host? That will help diagnose!

  • int MINUTE = 60000;
    

    The container uses network mode - AWSVPC. SG allows all outgoing traffic, and incoming traffic only on tcp port 8080

    Also, could you specify which docker host you are asking?

No Answers

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions