Skip to content


RoundRobinLoadBalancer: re-subscribe when all hosts become unhealthy (
Browse files Browse the repository at this point in the history


It’s possible for the RRLB to turn into a state when all available
hosts become unhealthy. Possible scenarios:
- DNS returned incorrect results;
- All hosts were restarted and got different addresses.
RRL will be in a dead state until TTL expires, which may take some time.

- When RRLB detects that all hosts are unhealthy, it re-subscribes to the
SD events publisher to trigger a new resolution;
- Add `RoundRobinLoadBalancerFactory.Builder#healthCheckResubscribeInterval`
option to configure the new feature;
- Wrap `backgroundExecutor` with `NormalizedTimeSourceExecutor` to make
sure `currentTime` is always positive;
- Test behavior when the new feature is enabled/disabled;
- Add `DurationUtils.ensureNonNegative(...)` utility for validation;
- Make `TestExecutor(long)` constructor public to test
- Add `SequentialPublisherSubscriberFunction.numberOfSubscribers()` to
verify how many subscribers the `TestPublisher` already saw;


RRLB forces SD to re-resolve addresses by cancelling the current subscription
and re-subscribing to the publisher when it detects that all hosts become
idelpivnitskiy authored Feb 24, 2023
1 parent 38d5166 commit b4543f4
Showing 10 changed files with 631 additions and 211 deletions.
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
import io.servicetalk.concurrent.PublisherSource.Subscription;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.annotation.Nullable;

@@ -32,6 +33,7 @@ public final class SequentialPublisherSubscriberFunction<T>
implements Function<Subscriber<? super T>, Subscriber<? super T>> {

private final AtomicBoolean subscribed = new AtomicBoolean();
private final AtomicInteger numberOfSubscribersSeen = new AtomicInteger();
private volatile Subscriber<? super T> subscriber;

@@ -41,6 +43,7 @@ public Subscriber<? super T> apply(final Subscriber<? super T> subscriber) {
throw new IllegalStateException("Duplicate subscriber: " + subscriber);
this.subscriber = subscriber;
return new DelegatingPublisherSubscriber<T>(subscriber) {
public void onSubscribe(final Subscription s) {
@@ -99,4 +102,13 @@ public Subscriber<? super T> subscriber() {
public boolean isSubscribed() {
return subscribed.get();

* Returns total number of observed {@link Subscriber}s.
* @return total number of observed {@link Subscriber}s.
public int numberOfSubscribersSeen() {
return numberOfSubscribersSeen.get();
Original file line number Diff line number Diff line change
@@ -59,7 +59,12 @@ public TestExecutor() {

TestExecutor(final long epochNanos) {
* Create a new instance.
* @param epochNanos initial value for {@link #currentTime(TimeUnit)} in nanoseconds
public TestExecutor(final long epochNanos) {
currentNanos = epochNanos;
nanoOffset = epochNanos - Long.MIN_VALUE;
1 change: 1 addition & 0 deletions servicetalk-loadbalancer/build.gradle
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ dependencies {
implementation project(":servicetalk-annotations")
implementation project(":servicetalk-concurrent-api-internal")
implementation project(":servicetalk-concurrent-internal")
implementation project(":servicetalk-utils-internal")
implementation ""
implementation "org.slf4j:slf4j-api"

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package io.servicetalk.loadbalancer;

import io.servicetalk.concurrent.api.DelegatingExecutor;
import io.servicetalk.concurrent.api.Executor;

import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

* An {@link Executor} that always starts counting {@link #currentTime(TimeUnit)} from {@code 0}.
final class NormalizedTimeSourceExecutor extends DelegatingExecutor {

private final long offset;

NormalizedTimeSourceExecutor(final Executor delegate) {
offset = delegate.currentTime(NANOSECONDS);

public long currentTime(final TimeUnit unit) {
return delegate().currentTime(unit) - offset;

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancer;
import io.servicetalk.client.api.LoadBalancerFactory;
import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.api.DefaultThreadFactory;
import io.servicetalk.concurrent.api.Executor;
@@ -35,8 +36,10 @@
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.DurationUtils.ensureNonNegative;
import static io.servicetalk.utils.internal.DurationUtils.ensurePositive;
import static io.servicetalk.utils.internal.DurationUtils.isPositive;
import static java.time.Duration.ofSeconds;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;

@@ -78,6 +81,7 @@ public final class RoundRobinLoadBalancerFactory<ResolvedAddress, C extends Load

private static final Duration DEFAULT_HEALTH_CHECK_INTERVAL = ofSeconds(5);
private static final Duration DEFAULT_HEALTH_CHECK_JITTER = ofSeconds(3);
static final Duration DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL = ofSeconds(10);
static final int DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD = 5; // higher than default for AutoRetryStrategy

private final int linearSearchSpace;
@@ -96,7 +100,7 @@ public <T extends C> LoadBalancer<T> newLoadBalancer(
final String targetResource,
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new RoundRobinLoadBalancer<>(requireNonNull(targetResource), eventPublisher, connectionFactory,
return new RoundRobinLoadBalancer<>(targetResource, eventPublisher, connectionFactory,
linearSearchSpace, healthCheckConfig);

@@ -105,7 +109,7 @@ public LoadBalancer<C> newLoadBalancer(
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, C> connectionFactory,
final String targetResource) {
return new RoundRobinLoadBalancer<>(requireNonNull(targetResource), eventPublisher, connectionFactory,
return new RoundRobinLoadBalancer<>(targetResource, eventPublisher, connectionFactory,
linearSearchSpace, healthCheckConfig);

@@ -128,6 +132,10 @@ public static final class Builder<ResolvedAddress, C extends LoadBalancedConnect
private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL;
private Duration healthCheckJitter = DEFAULT_HEALTH_CHECK_JITTER;
private int healthCheckFailedConnectionsThreshold = DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
private long healthCheckResubscribeLowerBound =
private long healthCheckResubscribeUpperBound =;;

* Creates a new instance with default settings.
@@ -174,7 +182,7 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> linearSearchSpa
public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> backgroundExecutor(
Executor backgroundExecutor) {
this.backgroundExecutor = requireNonNull(backgroundExecutor);
this.backgroundExecutor = new NormalizedTimeSourceExecutor(backgroundExecutor);
return this;

@@ -184,6 +192,7 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> backgroundExecu
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism
* and always consider all hosts for establishing new connections.
* @param interval interval at which a background health check will be scheduled.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
@@ -192,7 +201,7 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> backgroundExecu
public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckInterval(Duration interval) {
return healthCheckInterval(interval,
interval.compareTo(DEFAULT_HEALTH_CHECK_INTERVAL) <= 0 ? interval.dividedBy(2) :
interval.compareTo(DEFAULT_HEALTH_CHECK_INTERVAL) < 0 ? interval.dividedBy(2) :

@@ -202,27 +211,59 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckInte
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism
* and always consider all hosts for establishing new connections.
* @param interval interval at which a background health check will be scheduled.
* @param jitter the amount of jitter to apply to each retry {@code interval}.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckInterval(Duration interval,
Duration jitter) {
if (interval.isNegative() || interval.isZero()) {
throw new IllegalArgumentException("Health check interval should be greater than 0");
if (jitter.isNegative() || jitter.isZero()) {
throw new IllegalArgumentException("Jitter interval should be greater than 0");
if (interval.minus(jitter).isNegative() || {
throw new IllegalArgumentException("Jitter plus/minus interval underflow/overflow");
validate(interval, jitter);
this.healthCheckInterval = interval;
this.healthCheckJitter = jitter;
return this;

* Configure an interval for re-subscribing to the original events stream in case all existing hosts become
* unhealthy.
* <p>
* In situations when there is a latency between {@link ServiceDiscoverer} propagating the updated state and all
* known hosts become unhealthy, which could happen due to intermediate caching layers, re-subscribe to the
* events stream can help to exit from a dead state.
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism
* and always consider all hosts for establishing new connections.
* @param interval interval at which re-subscribes will be scheduled.
* @param jitter the amount of jitter to apply to each re-subscribe {@code interval}.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckResubscribeInterval(
Duration interval, Duration jitter) {
validate(interval, jitter);
this.healthCheckResubscribeLowerBound = interval.minus(jitter).toNanos();
this.healthCheckResubscribeUpperBound =;
return this;

private static void validate(Duration interval, Duration jitter) {
ensurePositive(interval, "interval");
ensureNonNegative(jitter, "jitter");
final Duration lowerBound = interval.minus(jitter);
if (!isPositive(lowerBound)) {
throw new IllegalArgumentException("interval (" + interval + ") minus jitter (" + jitter +
") must be greater than 0, current=" + lowerBound);
final Duration upperBound =;
if (!isPositive(upperBound)) {
throw new IllegalArgumentException("interval (" + interval + ") plus jitter (" + jitter +
") must not overflow, current=" + upperBound);

* Configure a threshold for consecutive connection failures to a host. When the {@link LoadBalancer}
* consecutively fails to open connections in the amount greater or equal to the specified value,
@@ -231,6 +272,7 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckInte
* load balancing selection.
* <p>
* Use a negative value of the argument to disable health checking.
* @param threshold number of consecutive connection failures to consider a host unhealthy and eligible for
* background health checking. Use negative value to disable the health checking mechanism.
* @return {@code this}.
@@ -258,17 +300,18 @@ public RoundRobinLoadBalancerFactory<ResolvedAddress, C> build() {

HealthCheckConfig healthCheckConfig = new HealthCheckConfig(
this.backgroundExecutor == null ? SharedExecutor.getInstance() : this.backgroundExecutor,
healthCheckInterval, healthCheckJitter, healthCheckFailedConnectionsThreshold);
healthCheckInterval, healthCheckJitter, healthCheckFailedConnectionsThreshold,
healthCheckResubscribeLowerBound, healthCheckResubscribeUpperBound);

return new RoundRobinLoadBalancerFactory<>(linearSearchSpace, healthCheckConfig);

static final class SharedExecutor {
private static final Executor INSTANCE = Executors.from(
private static final Executor INSTANCE = new NormalizedTimeSourceExecutor(Executors.from(
new ThreadPoolExecutor(1, 1, 60, SECONDS,
new LinkedBlockingQueue<>(),
new DefaultThreadFactory("round-robin-load-balancer-executor")));
new DefaultThreadFactory("round-robin-load-balancer-executor"))));

private SharedExecutor() {
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package io.servicetalk.loadbalancer;

import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.TestExecutor;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static java.lang.Long.MAX_VALUE;
import static java.lang.Long.MIN_VALUE;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static;

class NormalizedTimeSourceExecutorTest {

private TestExecutor testExecutor;
private Executor executor;

void tearDown() throws Exception {

void setUp(long initialValue) {
testExecutor = new TestExecutor(initialValue);
executor = new NormalizedTimeSourceExecutor(testExecutor);
assertThat("Unexpected initial value", executor.currentTime(NANOSECONDS), is(0L));

void advanceAndVerify(long advanceByNanos, long expected) {
testExecutor.advanceTimeByNoExecuteTasks(advanceByNanos, NANOSECONDS);
assertThat(executor.currentTime(NANOSECONDS), is(expected));

@ParameterizedTest(name = "{displayName} [{index}]: initialValue={0}")
@ValueSource(longs = {MIN_VALUE, -100, 0, 100, MAX_VALUE})
void minValue(long initialValue) {
advanceAndVerify(10, 10);
advanceAndVerify(MAX_VALUE - 10, MAX_VALUE);
advanceAndVerify(10, MAX_VALUE + 10);
advanceAndVerify(MAX_VALUE - 8, 0);

0 comments on commit b4543f4

Please sign in to comment.