Skip to content

Commit

Permalink
Restarting the spring context in case of MQTT connection loss
Browse files Browse the repository at this point in the history
  • Loading branch information
dvdgeisler committed Dec 7, 2022
1 parent 94d98f0 commit 53e9d56
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
Expand All @@ -20,13 +21,13 @@
HassLightDeviceEventHandler.class,
HassMotionSensorDeviceEventHandler.class,
HassOutletDeviceEventHandler.class})
public class DirigeraClientMqttApplication {
public class DirigeraClientMqttApplication implements MqttCallback {
private final static Logger log = LoggerFactory.getLogger(DirigeraClientMqttApplication.class);

private final static int EXIT_SUCCESS = 0;
private final static int EXIT_ERROR = 1;
private final DirigeraApi api;
private final ConfigurableApplicationContext context;
private ConfigurableApplicationContext context;

public DirigeraClientMqttApplication(
final DirigeraApi api,
Expand Down Expand Up @@ -62,28 +63,13 @@ public MqttClient getMqttClient(@Value("${dirigera.mqtt.hostname:localhost}") fi
options.setKeepAliveInterval(keepAliveInterval);
options.setMaxReconnectDelay(reconnectDelay);
options.setAutomaticReconnect(reconnect);
//options.setCleanSession(true);
options.setConnectionTimeout(timeout);

if (!username.isEmpty() && !password.isEmpty()) {
options.setUserName(username);
options.setPassword(password.toCharArray());
}
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(final Throwable cause) {
log.error("Connection lost to MQTT broker: {}", cause.getMessage());
exit(EXIT_ERROR);
}

@Override
public void messageArrived(final String topic, final MqttMessage message) {
}

@Override
public void deliveryComplete(final IMqttDeliveryToken token) {
}
});
client.setCallback(this);

try {
client.connect(options);
Expand All @@ -102,17 +88,42 @@ public static void main(String[] args) {
}

public void exit(int status) {
final ApplicationArguments args;

log.info("Close WebSocket");
this.api.websocket.stop();

if (context != null) {
log.info("Close Spring Boot context");
this.context.close();
if (context != null && status == EXIT_ERROR) {
log.info("Attempt to restart application");

args = this.context.getBean(ApplicationArguments.class);

Thread thread = new Thread(() -> {
log.info("Close Spring Boot context");
this.context.close();
main(args.getSourceArgs());
});

thread.setDaemon(false);
thread.start();
} else {
log.info("Exit application");
Runtime.getRuntime().halt(status);
}
}

log.info("Exit application");
Runtime.getRuntime().halt(status);
@Override
public void connectionLost(final Throwable cause) {
log.error("Connection lost to MQTT broker: {}", cause.getMessage());
exit(EXIT_ERROR);
}

@Override
public void messageArrived(final String topic, final MqttMessage message) {
log.debug("Received message from MQTT: topic={}, message={}", topic, message);
}

@Override
public void deliveryComplete(final IMqttDeliveryToken token) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected <T> void publish(final String topic, final T payload, final int qos, f
log.debug("Publish to MQTT: topic={}, payload={}, qos={}, retained={}", topic, json, qos, retained);
this.mqtt.publish(topic, json.getBytes(StandardCharsets.UTF_8), qos, retained);
} catch (MqttException e) {
log.error(e.getMessage());
log.error("Error while publishing to MQTT topic: {}", e.getMessage());
}
}

Expand All @@ -67,7 +67,7 @@ protected <T> void subscribe(final String topic, final Class<T> payloadType, fin
consumer.accept(this.fromJSON(message.getPayload(), payloadType));
});
} catch (MqttException e) {
log.error(e.getMessage());
log.error("Error while subscribing to MQTT topic: {}", e.getMessage());
}
}

Expand All @@ -76,7 +76,7 @@ protected void unsubscribe(final String topic) {
try {
this.mqtt.unsubscribe(topic);
} catch (MqttException e) {
log.error(e.getMessage());
log.error("Error while unsubscribing from MQTT topic: {}", e.getMessage());
}
}

Expand Down

0 comments on commit 53e9d56

Please sign in to comment.