IoT Core broker and an MQTT client running in a docker container, and the problem is that it's reconnecting every 20 min
We have a problem with an IoT Core broker and an MQTT client running in a docker container, and it's reconnecting every 20 min, it looks like an interval of some scheduled procedure.
We found that this problem can be fixed by adding the --cap-add NET_ADMIN argument to the container to the start command, but this solution is a security hole (and we are also not sure that Fargate + ECS will allow a container with NET_ADMIN to start).
Perhaps someone can tell us what routings/ports/protocols we need to publish for the container so that scheduled broker procedure works correctly?
The callbacks are triggered through MqttClientConnectionEvents object.
callbackhandler example
package org.testproject.gha.service.message;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.testproject.gha.entity.db.Assignment;
import org.testproject.gha.entity.db.Gateway;
import org.testproject.gha.entity.db.User;
import org.testproject.gha.entity.exception.BadRequestException;
import org.testproject.gha.entity.io.da.GoogleIntent;
import org.testproject.gha.entity.io.da.request.GoogleRequest;
import org.testproject.gha.entity.io.da.response.error.ErrorResponse;
import org.testproject.gha.entity.topic.Answer;
import org.testproject.gha.entity.topic.InternalResponses;
import org.testproject.gha.entity.topic.MqttAbstract;
import org.testproject.gha.entity.topic.req.MqttRequest;
import org.testproject.gha.service.OauthService;
import org.testproject.gha.service.db.mana.GatewayManager;
import org.testproject.gha.service.db.repo.UserRepository;
import org.testproject.gha.service.trait.GoogleErrors;
import org.testproject.gha.service.trait.Verbal;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.auth.credentials.CredentialsProvider;
import software.amazon.awssdk.crt.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.crt.mqtt.MqttClientConnection;
import software.amazon.awssdk.crt.mqtt.MqttClientConnectionEvents;
import software.amazon.awssdk.crt.mqtt.MqttMessage;
import software.amazon.awssdk.crt.mqtt.QualityOfService;
import software.amazon.awssdk.iot.AwsIotMqttConnectionBuilder;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
@Log4j2
@Service
@RequiredArgsConstructor
public class CallbackHandler implements Verbal {
private MqttClientConnection connection;
private final CallbackRegistry registry;
private final ExecutorService executor;
private final GatewayManager gateways;
private final UserRepository users;
private final OauthService oauth;
private final MqttClientConnectionEvents callbacks = new MqttClientConnectionEvents() {
public void onConnectionInterrupted(int code) {
log.warn("Interruptor <{}> : [{}] {}", code, CRT.awsErrorName(code), CRT.awsErrorString(code));
}
public void onConnectionResumed(boolean flag) {
log.warn("Resuming: {} [true if the session has been resumed.]", flag);
CompletableFuture.runAsync(() -> subscribe());
}
};
@Value("${service.description}")
private String description;
@Value(value = "${aws.iot.endpoint}")
private String endpoint;
@Value(value = "${aws.iot.clientId}")
private String clientId;
@Value(value = "${aws.iot.access}")
private String access;
@Value(value = "${aws.iot.secret}")
private String secret;
public JSONObject execute(String serial, String command, GoogleRequest google) {
try {
switch (command) {
/*Google cases*/
case GATEWAY_SYNC: {
MqttRequest request = MqttAbstract.sync(serial, google);
CompletableFuture<JSONObject> future = async(request);
return registry.extract(new Answer(future, serial, request.getBody().getPayload().getRequestId()));
}
case GATEWAY_QUERY: {
MqttRequest request = MqttAbstract.query(serial, google);
CompletableFuture<JSONObject> future = async(request);
return registry.extract(new Answer(future, serial, request.getBody().getPayload().getRequestId()));
}
case GATEWAY_EXECUTE: {
MqttRequest request = MqttAbstract.execute(serial, google);
CompletableFuture<JSONObject> future = async(request);
return registry.extract(new Answer(future, serial, request.getBody().getPayload().getRequestId()));
}
/*Default behavior*/
default: {
MqttRequest request = MqttAbstract.system(serial);
CompletableFuture<JSONObject> future = async(MqttAbstract.system(serial));
return registry.extract(new Answer(future, serial, request.getBody().getPayload().getRequestId()));
}
}
} catch (InterruptedException | ExecutionException e) {
log.error("execute: {} - {}", e.getClass().getSimpleName(), e.getMessage());
throw new RuntimeException(e);
}
}
private CompletableFuture<JSONObject> async(MqttAbstract req) {
final MqttMessage message = new MqttMessage(req.getTopic(), req.bytes(), QualityOfService.AT_LEAST_ONCE);
executor.execute(() -> {
log.info("Pub-ing to [{}]: {}", req.getTopic(), new String(message.getPayload()));
this.publish(message);
});
return registry.waitForMqtt(req.getBody().getPayload().getRequestId());
}
private void subscribe(InternalResponses topic) {
try {
this.connection.subscribe(topic.getTopic(), QualityOfService.AT_LEAST_ONCE, topic.getHandler()).get();
} catch (Exception exc) {
log.error("subscribe [{}]: {}", exc.getClass().getSimpleName(), exc.getMessage());
}
}
private void subscribe() {
log.info("Starting subscribing...");
try {
this.subscribe(new InternalResponses(gateways, this.registry));
} catch (Exception error) {
log.error("onApplicationEvent [{}]: {}", error.getClass().getSimpleName(), error.getMessage());
}
}
public void publish(MqttMessage message) {
try {
log.info("Publish to {}", message.getTopic());
executor.execute(() -> connection.publish(message));
} catch (Throwable e) {
log.error("publish: {} - {}", e.getClass().getSimpleName(), e.getMessage());
}
}
@PostConstruct
public void construct() {
// AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder
// .newDefaultBuilder()
// .withEndpoint(endpoint)
// .withClientId(clientId)
// .withConnectionEventCallbacks(callbacks)
// .withProtocolOperationTimeoutMs(MINUTE)
// .withCleanSession(true)
// .withWebsockets(true)
// .withWebsocketSigningRegion("eu-north-1")
// .withWebsocketCredentialsProvider(provider)
// AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder.newMtlsBuilderFromPath(
// certFile.getAbsolutePath(), keyFile.getAbsolutePath())
/*File certFile = new File("/cert.crt");
File keyFile = new File("/pri.key");*/
final CredentialsProvider provider = new StaticCredentialsProvider.StaticCredentialsProviderBuilder()
.withAccessKeyId(access.getBytes(StandardCharsets.UTF_8))
.withSecretAccessKey(secret.getBytes(StandardCharsets.UTF_8))
.build();
try (AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder
.newDefaultBuilder()
.withEndpoint(endpoint)
.withClientId(clientId)
.withConnectionEventCallbacks(callbacks)
.withProtocolOperationTimeoutMs(MINUTE)
.withCleanSession(true)
.withWebsockets(true)
.withWebsocketSigningRegion("eu-north-1")
.withWebsocketCredentialsProvider(provider)) {
this.connection = builder.build();
this.connection.connect()
.whenCompleteAsync((connectResult, error) -> this.subscribe());//should be whenCompleteAsync
log.info("{} to [{}] as {}", description, endpoint, clientId);
} catch (Exception e) {
log.error("postConstruct: {} - {}", e.getClass().getSimpleName(), e.getMessage());
}
}
@PreDestroy
public void destroy() {
log.info("Shutting down [{}]", description);
}
public ResponseEntity<?> intent(GoogleRequest request, String bearer) {
final String intent = request.getInputs().get(0).getIntent();
final String userId = String.valueOf(oauth.authenticate(request, bearer));
log.info("{} Start [{}] for {} {}", LOG_EDG, intent, userId, LOG_EDG);
if (!GoogleIntent.SYNC.equals(intent)) {
log.info("Origin Req {}", JSON.toJSONString(request, true));
}
final List<String> list = this.linkedTo(userId);
log.info("Selected {} for {}", list, userId);
if (list.isEmpty()) {
log.warn("User [id={}] dont have any gateways", userId);
return ResponseEntity.badRequest()
.body(new ErrorResponse(GoogleErrors.DEVICE_NOT_FOUND, request.getRequestId()));
}
final String serial = list.get(0);
try {
switch (intent) {
case GoogleIntent.SYNC:
return extract(serial, GATEWAY_SYNC, request, userId);
case GoogleIntent.QUERY:
return extract(serial, GATEWAY_QUERY, request, userId);
case GoogleIntent.EXECUTE:
return extract(serial, GATEWAY_EXECUTE, request, userId);
case GoogleIntent.DISCONNECT:
log.info("successful handle of DISCONNECT: {}", request);
return ResponseEntity.ok("{}");
}
} catch (RuntimeException error) {
log.error("intent: {}, exception: {} {}", intent, error.getClass().getSimpleName(), error.getMessage());
log.error("{} {} [{}] for {} {}", LOG_EDG, LOG_END, intent, userId, LOG_EDG);
if (error instanceof NullPointerException) {
error.printStackTrace();
}
return ResponseEntity.badRequest()
.body(new ErrorResponse(GoogleErrors.PROTOCOL_ERROR, request.getRequestId()));
}
log.error("unknown intent: {}", intent);
return ResponseEntity.badRequest().body(new ErrorResponse(GoogleErrors.NOT_SUPPORTED, request.getRequestId()));
}
public List<String> linkedTo(String userId) {
log.info("Search for [{}] gateways...", userId);
long id = Long.parseLong(userId);
if (users.existsById(id)) {
final User user = users.findById(id).get();
return user.getAssignments()
.stream()
.map(Assignment::getGateway)
.map(Gateway::getSerialNumber)
.collect(Collectors.toList());
} else {
throw new BadRequestException(String.format("User [ID=%s] dont exist", userId));
}
}
private ResponseEntity<?> extract(String serial, String command, GoogleRequest google, String agentUserId) {
final JSONObject answer = this.execute(serial, command, google).getJSONObject("data");
if (answer.containsKey("payload")) {
if (answer.getJSONObject("payload").containsKey("payload")) {
answer.getJSONObject("payload").getJSONObject("payload").put("agentUserId", agentUserId);
}
}
final JSONObject result = answer.getJSONObject("payload");
return ResponseEntity.ok(result);
}
}
docker file example
#FROM amazoncorretto:11-alpine-jdk
FROM openjdk:11-jdk
MAINTAINER testproject.com
COPY build/libs/* /
EXPOSE 80
EXPOSE 443
EXPOSE 8883
EXPOSE 8080
COPY app.sh /app.sh
RUN ["chmod", "+x", "/app.sh"]
ENTRYPOINT ["/app.sh"]
logs sample 2023-07-26T11:01:01 26 WA 11:01:01 CallbackHandler : Interruptor <1051> : [AWS_IO_SOCKET_CLOSED] socket is closed. 2023-07-26T11:01:02 26 WA 11:01:02 CallbackHandler : Resuming: false [true if the session has been resumed.]
- 最新
- 投票最多
- 评论最多
相关内容
- AWS 官方已更新 1 年前
- AWS 官方已更新 3 年前
- AWS 官方已更新 1 年前
- AWS 官方已更新 1 年前
There shouldn't be the need to expose ports, as an MQTT connection to AWS IoT Core is outbound only. What value is defined for
MINUTES
, which is used for MQTT PINGREQ (keepalive)? Also, what is the network configuration from the container to the docker host? That will help diagnose!The container uses network mode - AWSVPC. SG allows all outgoing traffic, and incoming traffic only on tcp port 8080
Also, could you specify which docker host you are asking?