Skip to content

Commit

Permalink
feat: high-prority messages cause re-connect (#4298)
Browse files Browse the repository at this point in the history
* feat: high-prority messages cause re-connect

* feat: added support to disconnect after prority message has been confirmed

* feat: implimented high-prority re-connect within the stratgy

* fix: renamed onPublish to onPublishRequested

* fix: created on publish state and moved logic into state machine

* fix: spelling in metatype

* fix: fixed state refrence

* fix: will only re-connect after seccond prorirty message (so LWT is ignored)

* fix: created new state to handle DC message

* tests: added test coverage for new feature

* tests: added test coverage for new feature

* fix: passed Options to ScheduleStrategy

* fix: priority in test

* tests: added a specific DataServiceOptions Test

* tests: added more coverage DataServiceOptions Test

* tests: added other ScheduleStratagy Constructor to tests

* tests: improved coverage on DataServiceOptionsTest

* tests: tried to add coverage to if

* tests: tried to add coverage to if
  • Loading branch information
GregoryIvo authored Jan 2, 2023
1 parent 36486eb commit 1bbe279
Show file tree
Hide file tree
Showing 8 changed files with 481 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,22 @@
default="0 0 0 ? * * *"
description="A CRON expression that specifies the instants when the gateway should perform a connection attempt. This parameter is only used if Enable Connection Schedule is set to true. The default expression schedules a connection every day at midnight." />

<AD id="connection.schedule.priority.override.enable"
name="Allow priority message to overide connection schedule"
type="Boolean"
cardinality="0"
required="true"
default="false"
description="Allows messages beyond a specified priority to force a connection and be sent regardless of connection schedule." />

<AD id="connection.schedule.priority.override.threshold"
name="Message schedule priority override threshold"
type="Integer"
cardinality="0"
required="true"
default="1"
description="A message with a priority equal to or less than this threshold will cause the framework to automatically re-connect and send regardless of the connection schedule." />

<AD id="connection.schedule.inactivity.interval.seconds"
name="Connection Schedule Disconnect Inactivity Interval Seconds"
type="Long"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,9 @@ public void shutdown() {
connectionManager.stopConnectionTask();
}

@Override
public void onPublishRequested(String topic, byte[] payload, int qos, boolean retain, int priority) {
// do nothing
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ interface AutoConnectStrategy extends DataServiceListener {

public void shutdown();

public void onPublishRequested(String topic, byte[] payload, int qos, boolean retain, int priority);

interface ConnectionManager {

void startConnectionTask();
Expand All @@ -26,6 +28,8 @@ interface ConnectionManager {

void disconnect();

DataMessage getNextMessage();

boolean hasInFlightMessages();

boolean isConnected();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,10 @@ public void unsubscribe(String topic) throws KuraException {
@Override
public int publish(String topic, byte[] payload, int qos, boolean retain, int priority) throws KuraStoreException {

if (this.autoConnectStrategy.isPresent()) {
this.autoConnectStrategy.get().onPublishRequested(topic, payload, qos, retain, priority);
}

logger.info("Storing message on topic: {}, priority: {}", topic, priority);

DataMessage dataMsg = this.store.store(topic, payload, qos, retain, priority);
Expand Down Expand Up @@ -581,8 +585,7 @@ private void createAutoConnectStrategy() {
if (!this.dataServiceOptions.isConnectionScheduleEnabled() || !schedule.isPresent()) {
strategy = new AlwaysConnectedStrategy(this);
} else {
strategy = new ScheduleStrategy(schedule.get(),
this.dataServiceOptions.getConnectionScheduleDisconnectDelay() * 1000, this);
strategy = new ScheduleStrategy(schedule.get(), this.dataServiceOptions, this);
}

this.autoConnectStrategy = Optional.of(strategy);
Expand Down Expand Up @@ -942,4 +945,16 @@ public void stopConnectionTask() {
public boolean hasInFlightMessages() {
return !inFlightMsgIds.isEmpty();
}

@Override
public DataMessage getNextMessage() {
DataMessage message = null;
try {
message = DataServiceImpl.this.store.getNextMessage();
} catch (Exception e) {
logger.error("Probably an unrecoverable exception", e);
}
return message;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class DataServiceOptions {
private static final String CONNECTION_SCHEDULE_ENABLED = "connection.schedule.enabled";
private static final String CONNECTION_SCHECULE_EXPRESSION = "connection.schedule.expression";
private static final String CONNECTION_SCHEDULE_INACTIVITY_INTERVAL_SECONDS = "connection.schedule.inactivity.interval.seconds";
private static final String CONNECTION_SCHEDULE_PRIORITY_OVERRIDE_ENABLE = "connection.schedule.priority.override.enable";
private static final String CONNECTION_SCHEDULE_PRIORITY_OVERRIDE_THRESHOLD = "connection.schedule.priority.override.threshold";

private static final boolean AUTOCONNECT_PROP_DEFAULT = false;
private static final int CONNECT_DELAY_DEFAULT = 60;
Expand All @@ -67,6 +69,8 @@ public class DataServiceOptions {
private static final int RECOVERY_MAX_FAILURES_DEFAULT = 10;
private static final boolean CONNECTION_SCHEDULE_ENABLED_DEFAULT = false;
private static final long CONNECTION_SCHEDULE_INACTIVITY_INTERVAL_SECONDS_DEFAULT = 60;
private static final boolean CONNECTION_SCHEDULE_PRIORITY_OVERRIDE_ENABLE_DEFAULT = false;
private static final int CONNECTION_SCHEDULE_PRIORITY_OVERRIDE_THRESHOLD_DEFAULT = 1;

private static final int CONNECT_CRITICAL_COMPONENT_TIMEOUT_MULTIPLIER = 5000;

Expand Down Expand Up @@ -187,4 +191,17 @@ long getConnectionScheduleDisconnectDelay() {
return (long) this.properties.getOrDefault(CONNECTION_SCHEDULE_INACTIVITY_INTERVAL_SECONDS,
CONNECTION_SCHEDULE_INACTIVITY_INTERVAL_SECONDS_DEFAULT);
}

Boolean isConnectionSchedulePriorityOverrideEnabled() {
return (Boolean) this.properties.getOrDefault(CONNECTION_SCHEDULE_PRIORITY_OVERRIDE_ENABLE,
CONNECTION_SCHEDULE_PRIORITY_OVERRIDE_ENABLE_DEFAULT) && isConnectionScheduleEnabled()
&& getConnectionScheduleExpression().isPresent();
}

int getConnectionSchedulePriorityOverridePriority() {

return (Integer) this.properties.getOrDefault(CONNECTION_SCHEDULE_PRIORITY_OVERRIDE_THRESHOLD,
CONNECTION_SCHEDULE_PRIORITY_OVERRIDE_THRESHOLD_DEFAULT);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,25 @@ public class ScheduleStrategy implements AutoConnectStrategy {
private State state;

private Optional<ScheduledFuture<?>> timeout = Optional.empty();
private DataServiceOptions dataServiceOptions;

public ScheduleStrategy(final CronExpression expression, final long disconnectTimeoutMs,
public ScheduleStrategy(final CronExpression expression, DataServiceOptions dataServiceOptions,
final ConnectionManager connectionManager) {
this(expression, disconnectTimeoutMs, connectionManager, Executors.newSingleThreadScheduledExecutor(),
Date::new);
this(expression, dataServiceOptions.getConnectionScheduleDisconnectDelay() * 1000, connectionManager,
Executors.newSingleThreadScheduledExecutor(),
Date::new, dataServiceOptions);
}

public ScheduleStrategy(final CronExpression expression, final long disconnectTimeoutMs,
final ConnectionManager connectionManager, final ScheduledExecutorService executor,
final Supplier<Date> currentTimeProvider) {
final Supplier<Date> currentTimeProvider, DataServiceOptions dataServiceOptions) {
this.expression = expression;
this.disconnectTimeoutMs = disconnectTimeoutMs;
this.connectionManager = connectionManager;
this.state = new AwaitConnectTime();
this.executor = executor;
this.currentTimeProvider = currentTimeProvider;
this.dataServiceOptions = dataServiceOptions;

updateState(State::onEnterState);
executor.scheduleWithFixedDelay(new TimeShiftDetector(60000), 0, 1, TimeUnit.MINUTES);
Expand All @@ -80,14 +83,25 @@ public default State onConnectionLost() {
public default State onTimeout() {
return this;
}

public default State onPublish(String topic, byte[] payload, int qos, boolean retain, int priority) {
return this;
}
}

private class AwaitConnectTime implements State {

@Override
public State onEnterState() {
connectionManager.stopConnectionTask();
connectionManager.disconnect();

DataMessage dm = connectionManager.getNextMessage();

if (dm != null
&& dm.getPriority() <= dataServiceOptions.getConnectionSchedulePriorityOverridePriority()) {
logger.info(
"Priority message sent while disconnecting. Initiating Connection to send message with a high priority.");
return new AwaitConnect();
}

final Date now = currentTimeProvider.get();

Expand All @@ -108,6 +122,20 @@ public State onTimeout() {
return new AwaitConnect();
}

@Override
public State onPublish(String topic, byte[] payload, int qos, boolean retain, int priority) {

if (dataServiceOptions.isConnectionSchedulePriorityOverrideEnabled()
&& priority <= dataServiceOptions.getConnectionSchedulePriorityOverridePriority()
&& !connectionManager.isConnected()) {
logger.info("Initiating Connection to send message with a high priority.");

return new AwaitConnect();
}

return this;
}

}

private class AwaitConnect implements State {
Expand Down Expand Up @@ -160,11 +188,31 @@ public State onTimeout() {
if (connectionManager.hasInFlightMessages()) {
return this;
} else {
return new AwaitConnectTime();
return new AwaitDisconnect();
}
}
}

private class AwaitDisconnect implements State {

@Override
public State onEnterState() {
connectionManager.stopConnectionTask();
connectionManager.disconnect();
return this;
}

@Override
public State onConnectionLost() {
return new AwaitConnectTime();
}

@Override
public State onMessageEvent() {
return this;
}
}

private void rescheduleTimeout(final long timeoutMs) {
final Optional<ScheduledFuture<?>> currentFuture = this.timeout;

Expand Down Expand Up @@ -270,4 +318,9 @@ public void onMessageConfirmed(int messageId, String topic) {
updateState(State::onMessageEvent);
}

@Override
public void onPublishRequested(String topic, byte[] payload, int qos, boolean retain, int priority) {
this.updateState(c -> this.state.onPublish(topic, payload, qos, retain, priority));
}

}
Loading

0 comments on commit 1bbe279

Please sign in to comment.