Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RoundRobinLoadBalancer: re-subscribe when all hosts become unhealthy #2514

Merged
merged 4 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.servicetalk.concurrent.PublisherSource.Subscription;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.annotation.Nullable;

Expand All @@ -32,6 +33,7 @@ public final class SequentialPublisherSubscriberFunction<T>
implements Function<Subscriber<? super T>, Subscriber<? super T>> {

private final AtomicBoolean subscribed = new AtomicBoolean();
private final AtomicInteger numberOfSubscribersSeen = new AtomicInteger();
@Nullable
private volatile Subscriber<? super T> subscriber;

Expand All @@ -41,6 +43,7 @@ public Subscriber<? super T> apply(final Subscriber<? super T> subscriber) {
throw new IllegalStateException("Duplicate subscriber: " + subscriber);
}
this.subscriber = subscriber;
numberOfSubscribersSeen.incrementAndGet();
return new DelegatingPublisherSubscriber<T>(subscriber) {
@Override
public void onSubscribe(final Subscription s) {
Expand Down Expand Up @@ -99,4 +102,13 @@ public Subscriber<? super T> subscriber() {
public boolean isSubscribed() {
return subscribed.get();
}

/**
* Returns total number of observed {@link Subscriber}s.
*
* @return total number of observed {@link Subscriber}s.
*/
public int numberOfSubscribersSeen() {
return numberOfSubscribersSeen.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ public TestExecutor() {
this(ThreadLocalRandom.current().nextLong());
}

TestExecutor(final long epochNanos) {
/**
* Create a new instance.
*
* @param epochNanos initial value for {@link #currentTime(TimeUnit)} in nanoseconds
*/
public TestExecutor(final long epochNanos) {
currentNanos = epochNanos;
nanoOffset = epochNanos - Long.MIN_VALUE;
}
Expand Down
1 change: 1 addition & 0 deletions servicetalk-loadbalancer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
implementation project(":servicetalk-annotations")
implementation project(":servicetalk-concurrent-api-internal")
implementation project(":servicetalk-concurrent-internal")
implementation project(":servicetalk-utils-internal")
implementation "com.google.code.findbugs:jsr305"
implementation "org.slf4j:slf4j-api"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.concurrent.api.DelegatingExecutor;
import io.servicetalk.concurrent.api.Executor;

import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
* An {@link Executor} that always starts counting {@link #currentTime(TimeUnit)} from {@code 0}.
*/
final class NormalizedTimeSourceExecutor extends DelegatingExecutor {

private final long offset;

NormalizedTimeSourceExecutor(final Executor delegate) {
super(delegate);
offset = delegate.currentTime(NANOSECONDS);
}

@Override
public long currentTime(final TimeUnit unit) {
return delegate().currentTime(unit) - offset;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancer;
import io.servicetalk.client.api.LoadBalancerFactory;
import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.api.DefaultThreadFactory;
import io.servicetalk.concurrent.api.Executor;
Expand All @@ -35,8 +36,10 @@
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.servicetalk.utils.internal.DurationUtils.ensureNonNegative;
import static io.servicetalk.utils.internal.DurationUtils.ensurePositive;
import static io.servicetalk.utils.internal.DurationUtils.isPositive;
import static java.time.Duration.ofSeconds;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
Expand Down Expand Up @@ -78,6 +81,7 @@ public final class RoundRobinLoadBalancerFactory<ResolvedAddress, C extends Load

private static final Duration DEFAULT_HEALTH_CHECK_INTERVAL = ofSeconds(5);
private static final Duration DEFAULT_HEALTH_CHECK_JITTER = ofSeconds(3);
static final Duration DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL = ofSeconds(10);
static final int DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD = 5; // higher than default for AutoRetryStrategy

private final int linearSearchSpace;
Expand All @@ -96,7 +100,7 @@ public <T extends C> LoadBalancer<T> newLoadBalancer(
final String targetResource,
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new RoundRobinLoadBalancer<>(requireNonNull(targetResource), eventPublisher, connectionFactory,
return new RoundRobinLoadBalancer<>(targetResource, eventPublisher, connectionFactory,
linearSearchSpace, healthCheckConfig);
}

Expand All @@ -105,7 +109,7 @@ public LoadBalancer<C> newLoadBalancer(
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, C> connectionFactory,
final String targetResource) {
return new RoundRobinLoadBalancer<>(requireNonNull(targetResource), eventPublisher, connectionFactory,
return new RoundRobinLoadBalancer<>(targetResource, eventPublisher, connectionFactory,
linearSearchSpace, healthCheckConfig);
}

Expand All @@ -128,6 +132,10 @@ public static final class Builder<ResolvedAddress, C extends LoadBalancedConnect
private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL;
private Duration healthCheckJitter = DEFAULT_HEALTH_CHECK_JITTER;
private int healthCheckFailedConnectionsThreshold = DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
private long healthCheckResubscribeLowerBound =
DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL.minus(DEFAULT_HEALTH_CHECK_JITTER).toNanos();
private long healthCheckResubscribeUpperBound =
DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL.plus(DEFAULT_HEALTH_CHECK_JITTER).toNanos();;

/**
* Creates a new instance with default settings.
Expand Down Expand Up @@ -174,7 +182,7 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> linearSearchSpa
*/
public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> backgroundExecutor(
Executor backgroundExecutor) {
this.backgroundExecutor = requireNonNull(backgroundExecutor);
this.backgroundExecutor = new NormalizedTimeSourceExecutor(backgroundExecutor);
return this;
}

Expand All @@ -184,6 +192,7 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> backgroundExecu
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism
* and always consider all hosts for establishing new connections.
*
* @param interval interval at which a background health check will be scheduled.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
Expand All @@ -192,7 +201,7 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> backgroundExecu
@Deprecated
public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckInterval(Duration interval) {
return healthCheckInterval(interval,
interval.compareTo(DEFAULT_HEALTH_CHECK_INTERVAL) <= 0 ? interval.dividedBy(2) :
interval.compareTo(DEFAULT_HEALTH_CHECK_INTERVAL) < 0 ? interval.dividedBy(2) :
DEFAULT_HEALTH_CHECK_JITTER);
}

Expand All @@ -202,27 +211,59 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckInte
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism
* and always consider all hosts for establishing new connections.
*
* @param interval interval at which a background health check will be scheduled.
* @param jitter the amount of jitter to apply to each retry {@code interval}.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
*/
public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckInterval(Duration interval,
Duration jitter) {
if (interval.isNegative() || interval.isZero()) {
throw new IllegalArgumentException("Health check interval should be greater than 0");
}
if (jitter.isNegative() || jitter.isZero()) {
throw new IllegalArgumentException("Jitter interval should be greater than 0");
}
if (interval.minus(jitter).isNegative() || interval.plus(jitter).isNegative()) {
throw new IllegalArgumentException("Jitter plus/minus interval underflow/overflow");
}
validate(interval, jitter);
this.healthCheckInterval = interval;
this.healthCheckJitter = jitter;
return this;
}

/**
* Configure an interval for re-subscribing to the original events stream in case all existing hosts become
daschl marked this conversation as resolved.
Show resolved Hide resolved
* unhealthy.
* <p>
* In situations when there is a latency between {@link ServiceDiscoverer} propagating the updated state and all
* known hosts become unhealthy, which could happen due to intermediate caching layers, re-subscribe to the
* events stream can help to exit from a dead state.
* <p>
* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism
* and always consider all hosts for establishing new connections.
*
* @param interval interval at which re-subscribes will be scheduled.
* @param jitter the amount of jitter to apply to each re-subscribe {@code interval}.
* @return {@code this}.
* @see #healthCheckFailedConnectionsThreshold(int)
*/
public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckResubscribeInterval(
Duration interval, Duration jitter) {
validate(interval, jitter);
this.healthCheckResubscribeLowerBound = interval.minus(jitter).toNanos();
this.healthCheckResubscribeUpperBound = interval.plus(jitter).toNanos();
return this;
}

private static void validate(Duration interval, Duration jitter) {
ensurePositive(interval, "interval");
ensureNonNegative(jitter, "jitter");
final Duration lowerBound = interval.minus(jitter);
if (!isPositive(lowerBound)) {
throw new IllegalArgumentException("interval (" + interval + ") minus jitter (" + jitter +
") must be greater than 0, current=" + lowerBound);
}
final Duration upperBound = interval.plus(jitter);
if (!isPositive(upperBound)) {
throw new IllegalArgumentException("interval (" + interval + ") plus jitter (" + jitter +
") must not overflow, current=" + upperBound);
}
}

/**
* Configure a threshold for consecutive connection failures to a host. When the {@link LoadBalancer}
* consecutively fails to open connections in the amount greater or equal to the specified value,
Expand All @@ -231,6 +272,7 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckInte
* load balancing selection.
* <p>
* Use a negative value of the argument to disable health checking.
*
* @param threshold number of consecutive connection failures to consider a host unhealthy and eligible for
* background health checking. Use negative value to disable the health checking mechanism.
* @return {@code this}.
Expand Down Expand Up @@ -258,17 +300,18 @@ public RoundRobinLoadBalancerFactory<ResolvedAddress, C> build() {

HealthCheckConfig healthCheckConfig = new HealthCheckConfig(
this.backgroundExecutor == null ? SharedExecutor.getInstance() : this.backgroundExecutor,
healthCheckInterval, healthCheckJitter, healthCheckFailedConnectionsThreshold);
healthCheckInterval, healthCheckJitter, healthCheckFailedConnectionsThreshold,
healthCheckResubscribeLowerBound, healthCheckResubscribeUpperBound);

return new RoundRobinLoadBalancerFactory<>(linearSearchSpace, healthCheckConfig);
}
}

static final class SharedExecutor {
private static final Executor INSTANCE = Executors.from(
private static final Executor INSTANCE = new NormalizedTimeSourceExecutor(Executors.from(
new ThreadPoolExecutor(1, 1, 60, SECONDS,
new LinkedBlockingQueue<>(),
new DefaultThreadFactory("round-robin-load-balancer-executor")));
new DefaultThreadFactory("round-robin-load-balancer-executor"))));

private SharedExecutor() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.TestExecutor;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static java.lang.Long.MAX_VALUE;
import static java.lang.Long.MIN_VALUE;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@SuppressWarnings("NumericOverflow")
class NormalizedTimeSourceExecutorTest {

private TestExecutor testExecutor;
private Executor executor;

@AfterEach
void tearDown() throws Exception {
testExecutor.closeAsync().toFuture().get();
executor.closeAsync().toFuture().get();
idelpivnitskiy marked this conversation as resolved.
Show resolved Hide resolved
}

void setUp(long initialValue) {
testExecutor = new TestExecutor(initialValue);
executor = new NormalizedTimeSourceExecutor(testExecutor);
assertThat("Unexpected initial value", executor.currentTime(NANOSECONDS), is(0L));
}

void advanceAndVerify(long advanceByNanos, long expected) {
testExecutor.advanceTimeByNoExecuteTasks(advanceByNanos, NANOSECONDS);
assertThat(executor.currentTime(NANOSECONDS), is(expected));
}

@ParameterizedTest(name = "{displayName} [{index}]: initialValue={0}")
@ValueSource(longs = {MIN_VALUE, -100, 0, 100, MAX_VALUE})
void minValue(long initialValue) {
setUp(initialValue);
advanceAndVerify(10, 10);
advanceAndVerify(MAX_VALUE - 10, MAX_VALUE);
advanceAndVerify(10, MAX_VALUE + 10);
advanceAndVerify(MAX_VALUE - 8, 0);
}
}
Loading