Skip to content

Commit

Permalink
Merge pull request #955 from sac-fork/refactor/incremental-backoff-ji…
Browse files Browse the repository at this point in the history
…tter

Fix incremental backoff jitter
  • Loading branch information
owenpearson authored Jul 5, 2023
2 parents 41b42f4 + eb6c487 commit 63e5b9a
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 230 deletions.
4 changes: 2 additions & 2 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import io.ably.lib.util.CollectionUtils;
import io.ably.lib.util.EventEmitter;
import io.ably.lib.util.Log;
import io.ably.lib.util.TimerUtil;
import io.ably.lib.util.ReconnectionStrategy;

/**
* Enables messages to be published and subscribed to.
Expand Down Expand Up @@ -502,7 +502,7 @@ synchronized private void reattachAfterTimeout() {
reattachTimer = currentReattachTimer;

this.retryCount++;
int retryDelay = TimerUtil.getRetryTime(ably.options.channelRetryTimeout, retryCount);
int retryDelay = ReconnectionStrategy.getRetryTime(ably.options.channelRetryTimeout, retryCount);

final Timer inProgressTimer = currentReattachTimer;
reattachTimer.schedule(new TimerTask() {
Expand Down
26 changes: 12 additions & 14 deletions lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import io.ably.lib.types.ProtocolSerializer;
import io.ably.lib.util.Log;
import io.ably.lib.util.PlatformAgentProvider;
import io.ably.lib.util.TimerUtil;
import io.ably.lib.util.ReconnectionStrategy;

public class ConnectionManager implements ConnectListener {
final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -134,7 +134,7 @@ public abstract class State {
public final boolean sendEvents;

final boolean terminal;
public final long timeout;
public long timeout;

State(ConnectionState state, boolean queueEvents, boolean sendEvents, boolean terminal, long timeout, ErrorInfo defaultErrorInfo) {
this.state = state;
Expand Down Expand Up @@ -282,7 +282,7 @@ void enact(StateIndication stateIndication, ConnectionStateChange change) {

class Disconnected extends State {
Disconnected() {
super(ConnectionState.disconnected, true, false, false, Defaults.TIMEOUT_DISCONNECT, REASON_DISCONNECTED);
super(ConnectionState.disconnected, true, false, false, ably.options.disconnectedRetryTimeout, REASON_DISCONNECTED);
}

@Override
Expand Down Expand Up @@ -836,8 +836,13 @@ private synchronized ConnectionStateChange setState(ITransport transport, StateI
return null;
}

if (stateIndication.state == ConnectionState.suspended || stateIndication.state == ConnectionState.connected) {
this.disconnectedRetryCount = 0;
if (stateIndication.state == ConnectionState.connected || stateIndication.state == ConnectionState.suspended) {
this.disconnectedRetryAttempt = 0;
}

if (stateIndication.state == ConnectionState.disconnected) {
states.get(ConnectionState.disconnected).timeout =
ReconnectionStrategy.getRetryTime(ably.options.disconnectedRetryTimeout, ++disconnectedRetryAttempt);
}

/* update currentState */
Expand All @@ -849,14 +854,7 @@ private synchronized ConnectionStateChange setState(ITransport transport, StateI
reason = newState.defaultErrorInfo;
}
Log.v(TAG, "setState(): setting " + newState.state + "; reason " + reason);

long retryDelay = newState.timeout;
if (newState.state == ConnectionState.disconnected) {
this.disconnectedRetryCount++;
retryDelay = TimerUtil.getRetryTime((int) newState.timeout, this.disconnectedRetryCount);
}

ConnectionStateChange change = new ConnectionStateChange(currentState.state, newConnectionState, retryDelay, reason);
ConnectionStateChange change = new ConnectionStateChange(currentState.state, newConnectionState, newState.timeout, reason);
currentState = newState;
stateError = reason;

Expand Down Expand Up @@ -1910,7 +1908,7 @@ private boolean isFatalError(ErrorInfo err) {
private CMConnectivityListener connectivityListener;
private long connectionStateTtl = Defaults.connectionStateTtl;
long maxIdleInterval = Defaults.maxIdleInterval;
private int disconnectedRetryCount = 0;
private int disconnectedRetryAttempt = 0;

/* for debug/test only */
private final RawProtocolListener protocolListener;
Expand Down
9 changes: 9 additions & 0 deletions lib/src/main/java/io/ably/lib/types/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ public ClientOptions(String key) throws AblyException {
*/
public long realtimeRequestTimeout = Defaults.realtimeRequestTimeout;

/**
* When the connection enters the disconnected state, after this timeout,
* if the state is still disconnected, the client library will attempt to reconnect automatically.
* The default is 15 seconds (TO3l1).
* <p>
* Spec: TO3l1
*/
public long disconnectedRetryTimeout = Defaults.TIMEOUT_DISCONNECT;

/**
* An array of fallback hosts to be used in the case of an error necessitating the use of an alternative host.
* If you have been provided a set of custom fallback hosts by Ably, please specify them here.
Expand Down
38 changes: 38 additions & 0 deletions lib/src/main/java/io/ably/lib/util/ReconnectionStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.ably.lib.util;

public class ReconnectionStrategy {

/**
* Spec: RTB1a
*
* @param count The retry count
* @return The backoff coefficient
*/
private static float getBackoffCoefficient(int count) {
return Math.min((count + 2) / 3f, 2f);
}

/**
* Spec: RTB1b
*
* @return The jitter coefficient
*/
private static double getJitterCoefficient() {
return 1 - Math.random() * 0.2;
}

/**
* Spec: RTB1
*
* @param initialTimeout The initial timeout value
* @param retryAttempt integer indicating retryAttempt
* @return RetryTimeout value for given timeout and retryAttempt.
* If x is the value returned then,
* Upper bound = min((retryAttempt + 2) / 3, 2) * initialTimeout,
* Lower bound = 0.8 * Upper bound,
* Lower bound < x < Upper bound
*/
public static int getRetryTime(long initialTimeout, int retryAttempt) {
return Double.valueOf(initialTimeout * getJitterCoefficient() * getBackoffCoefficient(retryAttempt)).intValue();
}
}
34 changes: 0 additions & 34 deletions lib/src/main/java/io/ably/lib/util/TimerUtil.java

This file was deleted.

13 changes: 13 additions & 0 deletions lib/src/test/java/io/ably/lib/test/common/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,16 @@
import io.ably.lib.util.Base64Coder;
import io.ably.lib.util.Log;
import io.ably.lib.util.Serialisation;
import org.hamcrest.Matcher;

import static junit.framework.Assert.assertTrue;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;

public class Helpers {

Expand Down Expand Up @@ -809,6 +814,14 @@ public static void assertMessagesEqual(BaseMessage expected, BaseMessage actual)
}
}

public static void assertTimeoutBetween(int timeout, Double min, Double max) {
assertThat(String.format("timeout %d should be between %f and %f", timeout, min, max ), (double) timeout, between(min, max));
}

public static Matcher<Double> between(Double min, Double max) {
return allOf(greaterThanOrEqualTo(min), lessThanOrEqualTo(max));
}

public static class AsyncWaiter<T> implements Callback<T> {
@Override
public synchronized void onSuccess(T result) {
Expand Down
Loading

0 comments on commit 63e5b9a

Please sign in to comment.