Skip to content

Commit

Permalink
RoundRobinLoadBalancer: re-subscribe when all hosts become unhealthy
Browse files Browse the repository at this point in the history
Motivation:

It’s possible for the RRLB to turn into a state when all available
hosts become unhealthy. Possible scenarios:
- DNS returned incorrect result;
- All hosts were restarted and got different addresses.
RRL will be in a dead state until TTL expires, which may take some time.

Modifications:
- When RRLB detects that all hosts are unhealthy, it re-subscribes to the
SD events publisher to trigger a new resolution;
- Add `RoundRobinLoadBalancerFactory.Builder#healthCheckResubscribeInterval`
option to configure new feature;
- Wrap `backgroundExecutor` with `NormalizedTimeSourceExecutor` to make
sure `currentTime` is always positive;
- Test behavior when the new features is enabled/disabled;
- Add `DurationUtils.ensureNonNegative(...)` utility for validation;
- Make `TestExecutor(long)` constructor public to test
`NormalizedTimeSourceExecutor`;
- Add `SequentialPublisherSubscriberFunction.numberOfSubscribers()` to
verify how many subscribers the `TestPublisher` already saw;

Result:

RRLB forces SD to re-resolve addresses by cancelling the current subscription
and re-subscribing to the publisher when it detects that all hosts become
unhealthy.
  • Loading branch information
idelpivnitskiy committed Feb 22, 2023
1 parent 02ebab9 commit f7fc25f
Show file tree
Hide file tree
Showing 10 changed files with 628 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.servicetalk.concurrent.PublisherSource.Subscription;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.annotation.Nullable;

Expand All @@ -32,6 +33,7 @@ public final class SequentialPublisherSubscriberFunction<T>
implements Function<Subscriber<? super T>, Subscriber<? super T>> {

private final AtomicBoolean subscribed = new AtomicBoolean();
private final AtomicInteger numberOfSubscribers = new AtomicInteger();
@Nullable
private volatile Subscriber<? super T> subscriber;

Expand All @@ -41,6 +43,7 @@ public Subscriber<? super T> apply(final Subscriber<? super T> subscriber) {
throw new IllegalStateException("Duplicate subscriber: " + subscriber);
}
this.subscriber = subscriber;
numberOfSubscribers.incrementAndGet();
return new DelegatingPublisherSubscriber<T>(subscriber) {
@Override
public void onSubscribe(final Subscription s) {
Expand Down Expand Up @@ -99,4 +102,13 @@ public Subscriber<? super T> subscriber() {
public boolean isSubscribed() {
return subscribed.get();
}

/**
* Returns total number of observed {@link Subscriber}s.
*
* @return total number of observed {@link Subscriber}s.
*/
public int numberOfSubscribers() {
return numberOfSubscribers.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ public TestExecutor() {
this(ThreadLocalRandom.current().nextLong());
}

TestExecutor(final long epochNanos) {
/**
* Create a new instance.
*
* @param epochNanos initial value for {@link #currentTime(TimeUnit)} in nanoseconds
*/
public TestExecutor(final long epochNanos) {
currentNanos = epochNanos;
nanoOffset = epochNanos - Long.MIN_VALUE;
}
Expand Down
1 change: 1 addition & 0 deletions servicetalk-loadbalancer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
implementation project(":servicetalk-annotations")
implementation project(":servicetalk-concurrent-api-internal")
implementation project(":servicetalk-concurrent-internal")
implementation project(":servicetalk-utils-internal")
implementation "com.google.code.findbugs:jsr305"
implementation "org.slf4j:slf4j-api"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.concurrent.api.DelegatingExecutor;
import io.servicetalk.concurrent.api.Executor;

import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
* An {@link Executor} that always starts counting {@link #currentTime(TimeUnit)} from {@code 0}.
*/
final class NormalizedTimeSourceExecutor extends DelegatingExecutor {

private final long offset;

NormalizedTimeSourceExecutor(final Executor delegate) {
super(delegate);
offset = delegate.currentTime(NANOSECONDS);
}

@Override
public long currentTime(final TimeUnit unit) {
return delegate().currentTime(unit) - offset;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancer;
import io.servicetalk.client.api.LoadBalancerFactory;
import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.api.DefaultThreadFactory;
import io.servicetalk.concurrent.api.Executor;
Expand All @@ -35,8 +36,10 @@
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.DurationUtils.ensureNonNegative;
import static io.servicetalk.utils.internal.DurationUtils.ensurePositive;
import static io.servicetalk.utils.internal.DurationUtils.isPositive;
import static java.time.Duration.ofSeconds;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
Expand Down Expand Up @@ -78,6 +81,7 @@ public final class RoundRobinLoadBalancerFactory<ResolvedAddress, C extends Load

private static final Duration DEFAULT_HEALTH_CHECK_INTERVAL = ofSeconds(5);
private static final Duration DEFAULT_HEALTH_CHECK_JITTER = ofSeconds(3);
static final Duration DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL = ofSeconds(10);
static final int DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD = 5; // higher than default for AutoRetryStrategy

private final int linearSearchSpace;
Expand All @@ -96,7 +100,7 @@ public <T extends C> LoadBalancer<T> newLoadBalancer(
final String targetResource,
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new RoundRobinLoadBalancer<>(requireNonNull(targetResource), eventPublisher, connectionFactory,
return new RoundRobinLoadBalancer<>(targetResource, eventPublisher, connectionFactory,
linearSearchSpace, healthCheckConfig);
}

Expand All @@ -105,7 +109,7 @@ public LoadBalancer<C> newLoadBalancer(
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, C> connectionFactory,
final String targetResource) {
return new RoundRobinLoadBalancer<>(requireNonNull(targetResource), eventPublisher, connectionFactory,
return new RoundRobinLoadBalancer<>(targetResource, eventPublisher, connectionFactory,
linearSearchSpace, healthCheckConfig);
}

Expand All @@ -128,6 +132,10 @@ public static final class Builder<ResolvedAddress, C extends LoadBalancedConnect
private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL;
private Duration healthCheckJitter = DEFAULT_HEALTH_CHECK_JITTER;
private int healthCheckFailedConnectionsThreshold = DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
private long healthCheckResubscribeLowerBound =
DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL.minus(DEFAULT_HEALTH_CHECK_JITTER).toNanos();
private long healthCheckResubscribeUpperBound =
DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL.plus(DEFAULT_HEALTH_CHECK_JITTER).toNanos();;

/**
* Creates a new instance with default settings.
Expand Down Expand Up @@ -174,7 +182,7 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> linearSearchSpa
*/
public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> backgroundExecutor(
Executor backgroundExecutor) {
this.backgroundExecutor = requireNonNull(backgroundExecutor);
this.backgroundExecutor = new NormalizedTimeSourceExecutor(backgroundExecutor);
return this;
}

Expand All @@ -184,6 +192,7 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> backgroundExecu
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism
* and always consider all hosts for establishing new connections.
*
* @param interval interval at which a background health check will be scheduled.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
Expand All @@ -192,7 +201,7 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> backgroundExecu
@Deprecated
public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckInterval(Duration interval) {
return healthCheckInterval(interval,
interval.compareTo(DEFAULT_HEALTH_CHECK_INTERVAL) <= 0 ? interval.dividedBy(2) :
interval.compareTo(DEFAULT_HEALTH_CHECK_INTERVAL) < 0 ? interval.dividedBy(2) :
DEFAULT_HEALTH_CHECK_JITTER);
}

Expand All @@ -202,27 +211,59 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckInte
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism
* and always consider all hosts for establishing new connections.
*
* @param interval interval at which a background health check will be scheduled.
* @param jitter the amount of jitter to apply to each retry {@code interval}.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
*/
public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckInterval(Duration interval,
Duration jitter) {
if (interval.isNegative() || interval.isZero()) {
throw new IllegalArgumentException("Health check interval should be greater than 0");
}
if (jitter.isNegative() || jitter.isZero()) {
throw new IllegalArgumentException("Jitter interval should be greater than 0");
}
if (interval.minus(jitter).isNegative() || interval.plus(jitter).isNegative()) {
throw new IllegalArgumentException("Jitter plus/minus interval underflow/overflow");
}
validate(interval, jitter);
this.healthCheckInterval = interval;
this.healthCheckJitter = jitter;
return this;
}

/**
* Configure an interval for re-subscribing to the original events stream in case all existing hosts become
* unhealthy.
* <p>
* In situations when there is a latency between {@link ServiceDiscoverer} propagating the updated state and all
* known hosts become unhealthy, which could happen due to intermediate caching layers, re-subscribe to the
* events stream can help to exit from a dead state.
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism
* and always consider all hosts for establishing new connections.
*
* @param interval interval at which re-subscribes will be scheduled.
* @param jitter the amount of jitter to apply to each re-subscribe {@code interval}.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
*/
public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckResubscribeInterval(
Duration interval, Duration jitter) {
validate(interval, jitter);
this.healthCheckResubscribeLowerBound = interval.minus(jitter).toNanos();
this.healthCheckResubscribeUpperBound = interval.plus(jitter).toNanos();
return this;
}

private static void validate(Duration interval, Duration jitter) {
ensurePositive(interval, "interval");
ensureNonNegative(jitter, "jitter");
final Duration lowerBound = interval.minus(jitter);
if (!isPositive(lowerBound)) {
throw new IllegalArgumentException("interval (" + interval + ") minus jitter (" + jitter +
") must be greater than 0, current=" + lowerBound);
}
final Duration upperBound = interval.plus(jitter);
if (!isPositive(upperBound)) {
throw new IllegalArgumentException("interval (" + interval + ") plus jitter (" + jitter +
") must not overflow, current=" + upperBound);
}
}

/**
* Configure a threshold for consecutive connection failures to a host. When the {@link LoadBalancer}
* consecutively fails to open connections in the amount greater or equal to the specified value,
Expand All @@ -231,6 +272,7 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckInte
* load balancing selection.
* <p>
* Use a negative value of the argument to disable health checking.
*
* @param threshold number of consecutive connection failures to consider a host unhealthy and eligible for
* background health checking. Use negative value to disable the health checking mechanism.
* @return {@code this}.
Expand Down Expand Up @@ -258,17 +300,18 @@ public RoundRobinLoadBalancerFactory<ResolvedAddress, C> build() {

HealthCheckConfig healthCheckConfig = new HealthCheckConfig(
this.backgroundExecutor == null ? SharedExecutor.getInstance() : this.backgroundExecutor,
healthCheckInterval, healthCheckJitter, healthCheckFailedConnectionsThreshold);
healthCheckInterval, healthCheckJitter, healthCheckFailedConnectionsThreshold,
healthCheckResubscribeLowerBound, healthCheckResubscribeUpperBound);

return new RoundRobinLoadBalancerFactory<>(linearSearchSpace, healthCheckConfig);
}
}

static final class SharedExecutor {
private static final Executor INSTANCE = Executors.from(
private static final Executor INSTANCE = new NormalizedTimeSourceExecutor(Executors.from(
new ThreadPoolExecutor(1, 1, 60, SECONDS,
new LinkedBlockingQueue<>(),
new DefaultThreadFactory("round-robin-load-balancer-executor")));
new DefaultThreadFactory("round-robin-load-balancer-executor"))));

private SharedExecutor() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.TestExecutor;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static java.lang.Long.MAX_VALUE;
import static java.lang.Long.MIN_VALUE;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@SuppressWarnings("NumericOverflow")
class NormalizedTimeSourceExecutorTest {

private TestExecutor testExecutor;
private Executor executor;

@AfterEach
void tearDown() throws Exception {
executor.closeAsync().toFuture().get();
}

void setUp(long initialValue) {
testExecutor = new TestExecutor(initialValue);
executor = new NormalizedTimeSourceExecutor(testExecutor);
assertThat("Unexpected initial value", executor.currentTime(NANOSECONDS), is(0L));
}

void advanceAndVerify(long advanceByNanos, long expected) {
testExecutor.advanceTimeByNoExecuteTasks(advanceByNanos, NANOSECONDS);
assertThat(executor.currentTime(NANOSECONDS), is(expected));
}

@ParameterizedTest(name = "{displayName} [{index}]: initialValue={0}")
@ValueSource(longs = {MIN_VALUE, -100, 0, 100, MAX_VALUE})
void minValue(long initialValue) {
setUp(initialValue);
advanceAndVerify(10, 10);
advanceAndVerify(MAX_VALUE - 10, MAX_VALUE);
advanceAndVerify(10, MAX_VALUE + 10);
advanceAndVerify(MAX_VALUE - 8, 0);
}
}
Loading

0 comments on commit f7fc25f

Please sign in to comment.