diff --git a/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/SequentialPublisherSubscriberFunction.java b/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/SequentialPublisherSubscriberFunction.java index 57b762a669..9f55b112a5 100644 --- a/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/SequentialPublisherSubscriberFunction.java +++ b/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/SequentialPublisherSubscriberFunction.java @@ -19,6 +19,7 @@ import io.servicetalk.concurrent.PublisherSource.Subscription; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import javax.annotation.Nullable; @@ -32,6 +33,7 @@ public final class SequentialPublisherSubscriberFunction implements Function, Subscriber> { private final AtomicBoolean subscribed = new AtomicBoolean(); + private final AtomicInteger numberOfSubscribers = new AtomicInteger(); @Nullable private volatile Subscriber subscriber; @@ -41,6 +43,7 @@ public Subscriber apply(final Subscriber subscriber) { throw new IllegalStateException("Duplicate subscriber: " + subscriber); } this.subscriber = subscriber; + numberOfSubscribers.incrementAndGet(); return new DelegatingPublisherSubscriber(subscriber) { @Override public void onSubscribe(final Subscription s) { @@ -99,4 +102,13 @@ public Subscriber subscriber() { public boolean isSubscribed() { return subscribed.get(); } + + /** + * Returns total number of observed {@link Subscriber}s. + * + * @return total number of observed {@link Subscriber}s. + */ + public int numberOfSubscribers() { + return numberOfSubscribers.get(); + } } diff --git a/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestExecutor.java b/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestExecutor.java index f04ca758d1..42bb250298 100644 --- a/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestExecutor.java +++ b/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestExecutor.java @@ -59,7 +59,12 @@ public TestExecutor() { this(ThreadLocalRandom.current().nextLong()); } - TestExecutor(final long epochNanos) { + /** + * Create a new instance. + * + * @param epochNanos initial value for {@link #currentTime(TimeUnit)} in nanoseconds + */ + public TestExecutor(final long epochNanos) { currentNanos = epochNanos; nanoOffset = epochNanos - Long.MIN_VALUE; } diff --git a/servicetalk-loadbalancer/build.gradle b/servicetalk-loadbalancer/build.gradle index 09b684b8a6..6b71f98445 100644 --- a/servicetalk-loadbalancer/build.gradle +++ b/servicetalk-loadbalancer/build.gradle @@ -26,6 +26,7 @@ dependencies { implementation project(":servicetalk-annotations") implementation project(":servicetalk-concurrent-api-internal") implementation project(":servicetalk-concurrent-internal") + implementation project(":servicetalk-utils-internal") implementation "com.google.code.findbugs:jsr305" implementation "org.slf4j:slf4j-api" diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NormalizedTimeSourceExecutor.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NormalizedTimeSourceExecutor.java new file mode 100644 index 0000000000..cee6fef96a --- /dev/null +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NormalizedTimeSourceExecutor.java @@ -0,0 +1,41 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.concurrent.api.DelegatingExecutor; +import io.servicetalk.concurrent.api.Executor; + +import java.util.concurrent.TimeUnit; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * An {@link Executor} that always starts counting {@link #currentTime(TimeUnit)} from {@code 0}. + */ +final class NormalizedTimeSourceExecutor extends DelegatingExecutor { + + private final long offset; + + NormalizedTimeSourceExecutor(final Executor delegate) { + super(delegate); + offset = delegate.currentTime(NANOSECONDS); + } + + @Override + public long currentTime(final TimeUnit unit) { + return delegate().currentTime(unit) - offset; + } +} diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index fcd824ada0..f5c3c74cfe 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -54,6 +54,7 @@ import java.util.Spliterator; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; import java.util.function.Function; @@ -84,7 +85,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; import static java.util.stream.Collectors.toList; @@ -102,10 +103,15 @@ final class RoundRobinLoadBalancer usedHostsUpdater = - newUpdater(RoundRobinLoadBalancer.class, List.class, "usedHosts"); + AtomicReferenceFieldUpdater.newUpdater(RoundRobinLoadBalancer.class, List.class, "usedHosts"); @SuppressWarnings("rawtypes") private static final AtomicIntegerFieldUpdater indexUpdater = - newUpdater(RoundRobinLoadBalancer.class, "index"); + AtomicIntegerFieldUpdater.newUpdater(RoundRobinLoadBalancer.class, "index"); + @SuppressWarnings("rawtypes") + private static final AtomicLongFieldUpdater nextResubscribeTimeUpdater = + AtomicLongFieldUpdater.newUpdater(RoundRobinLoadBalancer.class, "nextResubscribeTime"); + + private static final long RESUBSCRIBING = -1L; /** * With a relatively small number of connections we can minimize connection creation under moderate concurrency by @@ -124,15 +130,20 @@ final class RoundRobinLoadBalancer> usedHosts = emptyList(); private final String targetResource; + private final Publisher>> eventPublisher; + private final Processor eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); private final Publisher eventStream; private final SequentialCancellable discoveryCancellable = new SequentialCancellable(); private final ConnectionFactory connectionFactory; private final int linearSearchSpace; + @Nullable + private final HealthCheckConfig healthCheckConfig; private final ListenableAsyncCloseable asyncCloseable; /** @@ -154,188 +165,288 @@ final class RoundRobinLoadBalancer eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); + this.eventPublisher = requireNonNull(eventPublisher); this.eventStream = fromSource(eventStreamProcessor); this.connectionFactory = requireNonNull(connectionFactory); this.linearSearchSpace = linearSearchSpace; + this.healthCheckConfig = healthCheckConfig; + this.asyncCloseable = toAsyncCloseable(graceful -> { + discoveryCancellable.cancel(); + eventStreamProcessor.onComplete(); + final CompositeCloseable compositeCloseable; + for (;;) { + List> currentList = usedHosts; + if (isClosedList(currentList) || + usedHostsUpdater.compareAndSet(this, currentList, new ClosedList<>(currentList))) { + compositeCloseable = newCompositeCloseable().appendAll(currentList).appendAll(connectionFactory); + LOGGER.debug("Load balancer for {} is closing {}gracefully. Last seen addresses (size={}): {}.", + targetResource, graceful ? "" : "non", currentList.size(), currentList); + break; + } + } + return (graceful ? compositeCloseable.closeAsyncGracefully() : compositeCloseable.closeAsync()) + .beforeOnError(t -> { + if (!graceful) { + usedHosts = new ClosedList<>(emptyList()); + } + }) + .beforeOnComplete(() -> usedHosts = new ClosedList<>(emptyList())); + }); + subscribeToEvents(false); + } - toSource(eventPublisher).subscribe( - new Subscriber>>() { + private void subscribeToEvents(boolean resubscribe) { + if (resubscribe) { + discoveryCancellable.cancelCurrent(); + } + toSource(eventPublisher).subscribe(new EventSubscriber(resubscribe)); + if (healthCheckConfig != null) { + assert healthCheckConfig.executor instanceof NormalizedTimeSourceExecutor; + nextResubscribeTime = nextResubscribeTime(healthCheckConfig); + } + } - @Override - public void onSubscribe(final Subscription s) { - // We request max value here to make sure we do not access Subscription concurrently - // (requestN here and cancel from discoveryCancellable). If we request-1 in onNext we would have to wrap - // the Subscription in a ConcurrentSubscription which is costly. - // Since, we synchronously process onNexts we do not really care about flow control. - s.request(Long.MAX_VALUE); - discoveryCancellable.nextCancellable(s); + private static long nextResubscribeTime(final HealthCheckConfig config) { + final long lower = config.healthCheckResubscribeLowerBound; + final long upper = config.healthCheckResubscribeUpperBound; + return config.executor.currentTime(NANOSECONDS) + + (lower == upper ? lower : ThreadLocalRandom.current().nextLong(lower, upper)); + } + + private static boolean allUnhealthy( + final List> usedHosts) { + boolean allUnhealthy = !usedHosts.isEmpty(); + for (Host host : usedHosts) { + if (!Host.isUnhealthy(host.connState)) { + allUnhealthy = false; + break; } + } + return allUnhealthy; + } - @Override - public void onNext(final Collection> events) { - for (ServiceDiscovererEvent event : events) { - final ServiceDiscovererEvent.Status eventStatus = event.status(); - LOGGER.debug("Load balancer for {}: received new ServiceDiscoverer event {}. Inferred status: {}.", - targetResource, event, eventStatus); + private static boolean onlyAvailable( + final Collection> events) { + boolean onlyAvailable = !events.isEmpty(); + for (ServiceDiscovererEvent event : events) { + if (!AVAILABLE.equals(event.status())) { + onlyAvailable = false; + break; + } + } + return onlyAvailable; + } - @SuppressWarnings("unchecked") - final List> usedAddresses = - usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, oldHosts -> { - if (isClosedList(oldHosts)) { - return oldHosts; - } - final ResolvedAddress addr = requireNonNull(event.address()); - @SuppressWarnings("unchecked") - final List> oldHostsTyped = - (List>) oldHosts; - - if (AVAILABLE.equals(eventStatus)) { - return addHostToList(oldHostsTyped, addr); - } else if (EXPIRED.equals(eventStatus)) { - if (oldHostsTyped.isEmpty()) { - return emptyList(); - } else { - return markHostAsExpired(oldHostsTyped, addr); - } - } else if (UNAVAILABLE.equals(eventStatus)) { - return listWithHostRemoved(oldHostsTyped, host -> { - boolean match = host.address.equals(addr); - if (match) { - host.markClosed(); - } - return match; - }); + private static boolean notAvailable( + final Host host, + final Collection> events) { + boolean available = false; + for (ServiceDiscovererEvent event : events) { + if (host.address.equals(event.address())) { + available = true; + break; + } + } + return !available; + } + + private final class EventSubscriber + implements Subscriber>> { + + private boolean firstEventsAfterResubscribe; + + EventSubscriber(boolean resubscribe) { + this.firstEventsAfterResubscribe = resubscribe; + } + + @Override + public void onSubscribe(final Subscription s) { + // We request max value here to make sure we do not access Subscription concurrently + // (requestN here and cancel from discoveryCancellable). If we request-1 in onNext we would have to wrap + // the Subscription in a ConcurrentSubscription which is costly. + // Since, we synchronously process onNexts we do not really care about flow control. + s.request(Long.MAX_VALUE); + discoveryCancellable.nextCancellable(s); + } + + @Override + public void onNext(@Nullable final Collection> events) { + if (events == null) { + LOGGER.debug("Load balancer for {}: unexpectedly received null instead of events.", targetResource); + return; + } + for (ServiceDiscovererEvent event : events) { + final ServiceDiscovererEvent.Status eventStatus = event.status(); + LOGGER.debug("Load balancer for {}: received new ServiceDiscoverer event {}. Inferred status: {}.", + targetResource, event, eventStatus); + + @SuppressWarnings("unchecked") + final List> usedAddresses = + usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, oldHosts -> { + if (isClosedList(oldHosts)) { + return oldHosts; + } + final ResolvedAddress addr = requireNonNull(event.address()); + @SuppressWarnings("unchecked") + final List> oldHostsTyped = + (List>) oldHosts; + + if (AVAILABLE.equals(eventStatus)) { + return addHostToList(oldHostsTyped, addr); + } else if (EXPIRED.equals(eventStatus)) { + if (oldHostsTyped.isEmpty()) { + return emptyList(); } else { - LOGGER.error("Load balancer for {}: Unexpected Status in event:" + - " {} (mapped to {}). Leaving usedHosts unchanged: {}", - targetResource, event, eventStatus, oldHosts); - return oldHosts; + return markHostAsExpired(oldHostsTyped, addr); } - }); + } else if (UNAVAILABLE.equals(eventStatus)) { + return listWithHostRemoved(oldHostsTyped, host -> { + boolean match = host.address.equals(addr); + if (match) { + host.markClosed(); + } + return match; + }); + } else { + LOGGER.error("Load balancer for {}: Unexpected Status in event:" + + " {} (mapped to {}). Leaving usedHosts unchanged: {}", + targetResource, event, eventStatus, oldHosts); + return oldHosts; + } + }); - LOGGER.debug("Load balancer for {}: now using {} addresses: {}.", - targetResource, usedAddresses.size(), usedAddresses); + LOGGER.debug("Load balancer for {}: now using addresses (size={}): {}.", + targetResource, usedAddresses.size(), usedAddresses); - if (AVAILABLE.equals(eventStatus)) { - if (usedAddresses.size() == 1) { - eventStreamProcessor.onNext(LOAD_BALANCER_READY_EVENT); - } - } else if (usedAddresses.isEmpty()) { - eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT); + if (AVAILABLE.equals(eventStatus)) { + if (usedAddresses.size() == 1) { + eventStreamProcessor.onNext(LOAD_BALANCER_READY_EVENT); } + } else if (usedAddresses.isEmpty()) { + eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT); } } - private List> markHostAsExpired( - final List> oldHostsTyped, final ResolvedAddress addr) { - for (Host host : oldHostsTyped) { - if (host.address.equals(addr)) { - // Host removal will be handled by the Host's onClose::afterFinally callback - host.markExpired(); - break; // because duplicates are not allowed, we can stop iteration - } + if (firstEventsAfterResubscribe) { + // We can enter this path only if we re-subscribed because all previous hosts were UNHEALTHY. + if (events.isEmpty()) { + return; // Wait for the next collection of events. } - return oldHostsTyped; - } + firstEventsAfterResubscribe = false; - private Host createHost(ResolvedAddress addr) { - Host host = new Host<>(targetResource, addr, healthCheckConfig); - host.onClose().afterFinally(() -> - usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, previousHosts -> { - @SuppressWarnings("unchecked") - List> previousHostsTyped = - (List>) previousHosts; - return listWithHostRemoved(previousHostsTyped, current -> current == host); - } - )).subscribe(); - return host; + if (!onlyAvailable(events)) { + // Looks like the current ServiceDiscoverer maintains a state between re-subscribes. It already + // assigned correct states to all hosts. Even if some of them were left UNHEALTHY, we should keep + // running health-checks. + return; + } + // Looks like the current ServiceDiscoverer doesn't maintain a state between re-subscribes and always + // starts from an empty state propagating only AVAILABLE events. To be in sync with the + // ServiceDiscoverer we should clean up and close gracefully all hosts that are not present in the + // initial collection of events, regardless of their current state. + final List> currentHosts = usedHosts; + for (Host host : currentHosts) { + if (notAvailable(host, events)) { + host.closeAsyncGracefully().subscribe(); + } + } } + } - private List> addHostToList( - List> oldHostsTyped, ResolvedAddress addr) { - if (oldHostsTyped.isEmpty()) { - return singletonList(createHost(addr)); + private List> markHostAsExpired( + final List> oldHostsTyped, final ResolvedAddress addr) { + for (Host host : oldHostsTyped) { + if (host.address.equals(addr)) { + // Host removal will be handled by the Host's onClose::afterFinally callback + host.markExpired(); + break; // because duplicates are not allowed, we can stop iteration } + } + return oldHostsTyped; + } - // duplicates are not allowed - for (Host host : oldHostsTyped) { - if (host.address.equals(addr)) { - if (!host.markActiveIfNotClosed()) { - // If the host is already in CLOSED state, we should create a new entry. - // For duplicate ACTIVE events or for repeated activation due to failed CAS - // of replacing the usedHosts array the marking succeeds so we will not add a new entry. - break; - } - return oldHostsTyped; - } - } + private Host createHost(ResolvedAddress addr) { + Host host = new Host<>(targetResource, addr, healthCheckConfig); + host.onClose().afterFinally(() -> + usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, previousHosts -> { + @SuppressWarnings("unchecked") + List> previousHostsTyped = + (List>) previousHosts; + return listWithHostRemoved(previousHostsTyped, current -> current == host); + } + )).subscribe(); + return host; + } - final List> newHosts = new ArrayList<>(oldHostsTyped.size() + 1); - newHosts.addAll(oldHostsTyped); - newHosts.add(createHost(addr)); - return newHosts; + private List> addHostToList( + List> oldHostsTyped, ResolvedAddress addr) { + if (oldHostsTyped.isEmpty()) { + return singletonList(createHost(addr)); } - private List> listWithHostRemoved( - List> oldHostsTyped, Predicate> hostPredicate) { - if (oldHostsTyped.isEmpty()) { - // this can happen when an expired host is removed during closing of the RoundRobinLoadBalancer, - // but all of its connections have already been closed + // duplicates are not allowed + for (Host host : oldHostsTyped) { + if (host.address.equals(addr)) { + if (!host.markActiveIfNotClosed()) { + // If the host is already in CLOSED state, we should create a new entry. + // For duplicate ACTIVE events or for repeated activation due to failed CAS + // of replacing the usedHosts array the marking succeeds so we will not add a new entry. + break; + } return oldHostsTyped; } - final List> newHosts = new ArrayList<>(oldHostsTyped.size() - 1); - for (int i = 0; i < oldHostsTyped.size(); ++i) { - final Host current = oldHostsTyped.get(i); - if (hostPredicate.test(current)) { - for (int x = i + 1; x < oldHostsTyped.size(); ++x) { - newHosts.add(oldHostsTyped.get(x)); - } - return newHosts.isEmpty() ? emptyList() : newHosts; - } else { - newHosts.add(current); + } + + final List> newHosts = new ArrayList<>(oldHostsTyped.size() + 1); + newHosts.addAll(oldHostsTyped); + newHosts.add(createHost(addr)); + return newHosts; + } + + private List> listWithHostRemoved( + List> oldHostsTyped, Predicate> hostPredicate) { + if (oldHostsTyped.isEmpty()) { + // this can happen when an expired host is removed during closing of the RoundRobinLoadBalancer, + // but all of its connections have already been closed + return oldHostsTyped; + } + final List> newHosts = new ArrayList<>(oldHostsTyped.size() - 1); + for (int i = 0; i < oldHostsTyped.size(); ++i) { + final Host current = oldHostsTyped.get(i); + if (hostPredicate.test(current)) { + for (int x = i + 1; x < oldHostsTyped.size(); ++x) { + newHosts.add(oldHostsTyped.get(x)); } + return newHosts.isEmpty() ? emptyList() : newHosts; + } else { + newHosts.add(current); } - return newHosts; } + return newHosts; + } - @Override - public void onError(final Throwable t) { - List> hosts = usedHosts; + @Override + public void onError(final Throwable t) { + List> hosts = usedHosts; + if (healthCheckConfig == null) { + // Terminate processor only if we will never re-subscribe eventStreamProcessor.onError(t); - LOGGER.error( - "Load balancer for {}: service discoverer {} emitted an error. Last seen addresses (size {}): {}", - targetResource, eventPublisher, hosts.size(), hosts, t); } + LOGGER.error( + "Load balancer for {}: service discoverer {} emitted an error. Last seen addresses (size={}): {}.", + targetResource, eventPublisher, hosts.size(), hosts, t); + } - @Override - public void onComplete() { - List> hosts = usedHosts; + @Override + public void onComplete() { + List> hosts = usedHosts; + if (healthCheckConfig == null) { + // Terminate processor only if we will never re-subscribe eventStreamProcessor.onComplete(); - LOGGER.error("Load balancer for {}: service discoverer {} completed. Last seen addresses (size {}): {}", - targetResource, eventPublisher, hosts.size(), hosts); } - }); - asyncCloseable = toAsyncCloseable(graceful -> { - discoveryCancellable.cancel(); - eventStreamProcessor.onComplete(); - final CompositeCloseable compositeCloseable; - for (;;) { - List> currentList = usedHosts; - if (isClosedList(currentList) || - usedHostsUpdater.compareAndSet(this, currentList, new ClosedList<>(currentList))) { - compositeCloseable = newCompositeCloseable().appendAll(currentList).appendAll(connectionFactory); - break; - } - } - return (graceful ? compositeCloseable.closeAsyncGracefully() : compositeCloseable.closeAsync()) - .beforeOnError(t -> { - if (!graceful) { - usedHosts = new ClosedList<>(emptyList()); - } - }) - .beforeOnComplete(() -> usedHosts = new ClosedList<>(emptyList())); - }); + LOGGER.error("Load balancer for {}: service discoverer completed. Last seen addresses (size={}): {}.", + targetResource, hosts.size(), hosts); + } } private static Single failedLBClosed(String targetResource) { @@ -423,6 +534,14 @@ private Single selectConnection0(final Predicate selector, @Nullable final } } if (pickedHost == null) { + if (healthCheckConfig != null && allUnhealthy(usedHosts)) { + final long currNextResubscribeTime = nextResubscribeTime; + if (currNextResubscribeTime >= 0 && + healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime && + nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) { + subscribeToEvents(true); + } + } return failed(StacklessNoAvailableHostException.newInstance("Failed to pick an active host for " + targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts, RoundRobinLoadBalancer.class, "selectConnection0(...)")); @@ -503,13 +622,18 @@ static final class HealthCheckConfig { private final Duration healthCheckInterval; private final Duration jitter; private final int failedThreshold; + private final long healthCheckResubscribeLowerBound; + private final long healthCheckResubscribeUpperBound; HealthCheckConfig(final Executor executor, final Duration healthCheckInterval, final Duration healthCheckJitter, - final int failedThreshold) { + final int failedThreshold, final long healthCheckResubscribeLowerBound, + final long healthCheckResubscribeUpperBound) { this.executor = executor; this.healthCheckInterval = healthCheckInterval; this.failedThreshold = failedThreshold; this.jitter = healthCheckJitter; + this.healthCheckResubscribeLowerBound = healthCheckResubscribeLowerBound; + this.healthCheckResubscribeUpperBound = healthCheckResubscribeUpperBound; } } @@ -530,7 +654,7 @@ private enum State { @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater connStateUpdater = - AtomicReferenceFieldUpdater.newUpdater(Host.class, ConnState.class, "connState"); + newUpdater(Host.class, ConnState.class, "connState"); private final String targetResource; final Addr address; @@ -540,7 +664,7 @@ private enum State { private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE; Host(String targetResource, Addr address, @Nullable HealthCheckConfig healthCheckConfig) { - this.targetResource = requireNonNull(targetResource); + this.targetResource = targetResource; this.address = requireNonNull(address); this.healthCheckConfig = healthCheckConfig; this.closeable = toAsyncCloseable(graceful -> @@ -563,7 +687,7 @@ boolean markActiveIfNotClosed() { void markClosed() { final ConnState oldState = closeConnState(); final Object[] toRemove = oldState.connections; - cancelIfHealthCheck(oldState.state); + cancelIfHealthCheck(oldState); LOGGER.debug("Load balancer for {}: closing {} connection(s) gracefully to the closed address: {}.", targetResource, toRemove.length, address); for (Object conn : toRemove) { @@ -596,7 +720,7 @@ void markExpired() { if (connStateUpdater.compareAndSet(this, oldState, new ConnState(oldState.connections, nextState))) { - cancelIfHealthCheck(oldState.state); + cancelIfHealthCheck(oldState); if (nextState == State.CLOSED) { // Trigger the callback to remove the host from usedHosts array. this.closeAsync().subscribe(); @@ -614,13 +738,13 @@ void markHealthy(final HealthCheck originalHealthCheckState) { // In an unlikely scenario that the following connection attempts fail indefinitely, a health check task // would leak and would not be cancelled. Therefore, we cancel it here and allow failures to trigger a new // health check. - Object oldState = connStateUpdater.getAndUpdate(this, previous -> { - if (HealthCheck.class.equals(previous.state.getClass())) { + ConnState oldState = connStateUpdater.getAndUpdate(this, previous -> { + if (Host.isUnhealthy(previous)) { return new ConnState(previous.connections, STATE_ACTIVE_NO_FAILURES); } return previous; - }).state; - if (oldState != originalHealthCheckState) { + }); + if (oldState.state != originalHealthCheckState) { cancelIfHealthCheck(oldState); } } @@ -632,7 +756,7 @@ void markUnhealthy(final Throwable cause, final ConnectionFactory 0 || cause instanceof ConnectionLimitReachedException) { - LOGGER.debug("Load balancer for {}: failed to open a new connection to the host on address {}. {}", + LOGGER.debug("Load balancer for {}: failed to open a new connection to the host on address {}. {}.", targetResource, address, previous, cause); break; } @@ -669,6 +793,10 @@ boolean isActiveAndHealthy() { return ActiveState.class.equals(connState.state.getClass()); } + static boolean isUnhealthy(final ConnState connState) { + return HealthCheck.class.equals(connState.state.getClass()); + } + boolean addConnection(C connection) { int addAttempt = 0; for (;;) { @@ -783,7 +911,7 @@ public Completable onClosing() { private Completable doClose(final Function closeFunction) { return Completable.defer(() -> { final ConnState oldState = closeConnState(); - cancelIfHealthCheck(oldState.state); + cancelIfHealthCheck(oldState); final Object[] connections = oldState.connections; return (connections.length == 0 ? completed() : from(connections).flatMapCompletableDelayError(conn -> closeFunction.apply((C) conn))) @@ -791,10 +919,10 @@ private Completable doClose(final Function closeFunction }); } - private void cancelIfHealthCheck(Object o) { - if (HealthCheck.class.equals(o.getClass())) { + private void cancelIfHealthCheck(ConnState connState) { + if (Host.isUnhealthy(connState)) { @SuppressWarnings("unchecked") - HealthCheck healthCheck = (HealthCheck) o; + HealthCheck healthCheck = (HealthCheck) connState.state; LOGGER.debug("Load balancer for {}: health check cancelled for {}.", targetResource, healthCheck.host); healthCheck.cancel(); } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index 23bfd7752e..d9f594715d 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -19,6 +19,7 @@ import io.servicetalk.client.api.LoadBalancedConnection; import io.servicetalk.client.api.LoadBalancer; import io.servicetalk.client.api.LoadBalancerFactory; +import io.servicetalk.client.api.ServiceDiscoverer; import io.servicetalk.client.api.ServiceDiscovererEvent; import io.servicetalk.concurrent.api.DefaultThreadFactory; import io.servicetalk.concurrent.api.Executor; @@ -35,8 +36,10 @@ import java.util.function.Predicate; import javax.annotation.Nullable; +import static io.servicetalk.utils.internal.DurationUtils.ensureNonNegative; +import static io.servicetalk.utils.internal.DurationUtils.ensurePositive; +import static io.servicetalk.utils.internal.DurationUtils.isPositive; import static java.time.Duration.ofSeconds; -import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.SECONDS; /** @@ -78,6 +81,7 @@ public final class RoundRobinLoadBalancerFactory LoadBalancer newLoadBalancer( final String targetResource, final Publisher>> eventPublisher, final ConnectionFactory connectionFactory) { - return new RoundRobinLoadBalancer<>(requireNonNull(targetResource), eventPublisher, connectionFactory, + return new RoundRobinLoadBalancer<>(targetResource, eventPublisher, connectionFactory, linearSearchSpace, healthCheckConfig); } @@ -105,7 +109,7 @@ public LoadBalancer newLoadBalancer( final Publisher>> eventPublisher, final ConnectionFactory connectionFactory, final String targetResource) { - return new RoundRobinLoadBalancer<>(requireNonNull(targetResource), eventPublisher, connectionFactory, + return new RoundRobinLoadBalancer<>(targetResource, eventPublisher, connectionFactory, linearSearchSpace, healthCheckConfig); } @@ -128,6 +132,10 @@ public static final class Builder linearSearchSpa */ public RoundRobinLoadBalancerFactory.Builder backgroundExecutor( Executor backgroundExecutor) { - this.backgroundExecutor = requireNonNull(backgroundExecutor); + this.backgroundExecutor = new NormalizedTimeSourceExecutor(backgroundExecutor); return this; } @@ -184,6 +192,7 @@ public RoundRobinLoadBalancerFactory.Builder backgroundExecu *

* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism * and always consider all hosts for establishing new connections. + * * @param interval interval at which a background health check will be scheduled. * @return {@code this}. * @see #healthCheckFailedConnectionsThreshold(int) @@ -192,7 +201,7 @@ public RoundRobinLoadBalancerFactory.Builder backgroundExecu @Deprecated public RoundRobinLoadBalancerFactory.Builder healthCheckInterval(Duration interval) { return healthCheckInterval(interval, - interval.compareTo(DEFAULT_HEALTH_CHECK_INTERVAL) <= 0 ? interval.dividedBy(2) : + interval.compareTo(DEFAULT_HEALTH_CHECK_INTERVAL) < 0 ? interval.dividedBy(2) : DEFAULT_HEALTH_CHECK_JITTER); } @@ -202,6 +211,7 @@ public RoundRobinLoadBalancerFactory.Builder healthCheckInte *

* {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism * and always consider all hosts for establishing new connections. + * * @param interval interval at which a background health check will be scheduled. * @param jitter the amount of jitter to apply to each retry {@code interval}. * @return {@code this}. @@ -209,20 +219,51 @@ public RoundRobinLoadBalancerFactory.Builder healthCheckInte */ public RoundRobinLoadBalancerFactory.Builder healthCheckInterval(Duration interval, Duration jitter) { - if (interval.isNegative() || interval.isZero()) { - throw new IllegalArgumentException("Health check interval should be greater than 0"); - } - if (jitter.isNegative() || jitter.isZero()) { - throw new IllegalArgumentException("Jitter interval should be greater than 0"); - } - if (interval.minus(jitter).isNegative() || interval.plus(jitter).isNegative()) { - throw new IllegalArgumentException("Jitter plus/minus interval underflow/overflow"); - } + validate(interval, jitter); this.healthCheckInterval = interval; this.healthCheckJitter = jitter; return this; } + /** + * Configure an interval for re-subscribing to the original events stream in case all existing hosts become + * unhealthy. + *

+ * In situations when there is a latency between {@link ServiceDiscoverer} propagating the updated state and all + * known hosts become unhealthy, which could happen due to intermediate caching layers, re-subscribe to the + * events stream can help to exit from a dead state. + *

+ * {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism + * and always consider all hosts for establishing new connections. + * + * @param interval interval at which re-subscribes will be scheduled. + * @param jitter the amount of jitter to apply to each re-subscribe {@code interval}. + * @return {@code this}. + * @see #healthCheckFailedConnectionsThreshold(int) + */ + public RoundRobinLoadBalancerFactory.Builder healthCheckResubscribeInterval( + Duration interval, Duration jitter) { + validate(interval, jitter); + this.healthCheckResubscribeLowerBound = interval.minus(jitter).toNanos(); + this.healthCheckResubscribeUpperBound = interval.plus(jitter).toNanos(); + return this; + } + + private static void validate(Duration interval, Duration jitter) { + ensurePositive(interval, "interval"); + ensureNonNegative(jitter, "jitter"); + final Duration lowerBound = interval.minus(jitter); + if (!isPositive(lowerBound)) { + throw new IllegalArgumentException("interval (" + interval + ") minus jitter (" + jitter + + ") must be greater than 0, current=" + lowerBound); + } + final Duration upperBound = interval.plus(jitter); + if (!isPositive(upperBound)) { + throw new IllegalArgumentException("interval (" + interval + ") plus jitter (" + jitter + + ") must not overflow, current=" + upperBound); + } + } + /** * Configure a threshold for consecutive connection failures to a host. When the {@link LoadBalancer} * consecutively fails to open connections in the amount greater or equal to the specified value, @@ -231,6 +272,7 @@ public RoundRobinLoadBalancerFactory.Builder healthCheckInte * load balancing selection. *

* Use a negative value of the argument to disable health checking. + * * @param threshold number of consecutive connection failures to consider a host unhealthy and eligible for * background health checking. Use negative value to disable the health checking mechanism. * @return {@code this}. @@ -258,17 +300,18 @@ public RoundRobinLoadBalancerFactory build() { HealthCheckConfig healthCheckConfig = new HealthCheckConfig( this.backgroundExecutor == null ? SharedExecutor.getInstance() : this.backgroundExecutor, - healthCheckInterval, healthCheckJitter, healthCheckFailedConnectionsThreshold); + healthCheckInterval, healthCheckJitter, healthCheckFailedConnectionsThreshold, + healthCheckResubscribeLowerBound, healthCheckResubscribeUpperBound); return new RoundRobinLoadBalancerFactory<>(linearSearchSpace, healthCheckConfig); } } static final class SharedExecutor { - private static final Executor INSTANCE = Executors.from( + private static final Executor INSTANCE = new NormalizedTimeSourceExecutor(Executors.from( new ThreadPoolExecutor(1, 1, 60, SECONDS, new LinkedBlockingQueue<>(), - new DefaultThreadFactory("round-robin-load-balancer-executor"))); + new DefaultThreadFactory("round-robin-load-balancer-executor")))); private SharedExecutor() { } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/NormalizedTimeSourceExecutorTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/NormalizedTimeSourceExecutorTest.java new file mode 100644 index 0000000000..e81e0f770b --- /dev/null +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/NormalizedTimeSourceExecutorTest.java @@ -0,0 +1,62 @@ +/* + * Copyright © 2023 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.loadbalancer; + +import io.servicetalk.concurrent.api.Executor; +import io.servicetalk.concurrent.api.TestExecutor; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static java.lang.Long.MAX_VALUE; +import static java.lang.Long.MIN_VALUE; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +@SuppressWarnings("NumericOverflow") +class NormalizedTimeSourceExecutorTest { + + private TestExecutor testExecutor; + private Executor executor; + + @AfterEach + void tearDown() throws Exception { + executor.closeAsync().toFuture().get(); + } + + void setUp(long initialValue) { + testExecutor = new TestExecutor(initialValue); + executor = new NormalizedTimeSourceExecutor(testExecutor); + assertThat("Unexpected initial value", executor.currentTime(NANOSECONDS), is(0L)); + } + + void advanceAndVerify(long advanceByNanos, long expected) { + testExecutor.advanceTimeByNoExecuteTasks(advanceByNanos, NANOSECONDS); + assertThat(executor.currentTime(NANOSECONDS), is(expected)); + } + + @ParameterizedTest(name = "{displayName} [{index}]: initialValue={0}") + @ValueSource(longs = {MIN_VALUE, -100, 0, 100, MAX_VALUE}) + void minValue(long initialValue) { + setUp(initialValue); + advanceAndVerify(10, 10); + advanceAndVerify(MAX_VALUE - 10, MAX_VALUE); + advanceAndVerify(10, MAX_VALUE + 10); + advanceAndVerify(MAX_VALUE - 8, 0); + } +} diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java index f7492f4de1..058b739b83 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java @@ -32,6 +32,7 @@ import io.servicetalk.concurrent.api.LegacyTestSingle; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.SequentialPublisherSubscriberFunction; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.api.TestExecutor; import io.servicetalk.concurrent.api.TestPublisher; @@ -52,6 +53,7 @@ import java.util.AbstractMap; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -93,12 +95,17 @@ import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static io.servicetalk.concurrent.internal.TestTimeoutConstants.DEFAULT_TIMEOUT_SECONDS; import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; +import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL; import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerTest.UnhealthyHostConnectionFactory.UNHEALTHY_HOST_EXCEPTION; +import static java.lang.Long.MAX_VALUE; +import static java.time.Duration.ZERO; import static java.time.Duration.ofMillis; +import static java.time.Duration.ofNanos; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toSet; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; @@ -132,8 +139,12 @@ abstract class RoundRobinLoadBalancerTest { new TestSingleSubscriber<>(); private final List connectionsCreated = new CopyOnWriteArrayList<>(); private final Queue connectionRealizers = new ConcurrentLinkedQueue<>(); - - final TestPublisher>> serviceDiscoveryPublisher = new TestPublisher<>(); + private final SequentialPublisherSubscriberFunction>> + sequentialPublisherSubscriberFunction = new SequentialPublisherSubscriberFunction<>(); + final TestPublisher>> serviceDiscoveryPublisher = + new TestPublisher.Builder>>() + .sequentialSubscribers(sequentialPublisherSubscriberFunction) + .build(); private DelegatingConnectionFactory connectionFactory = new DelegatingConnectionFactory(this::newRealizedConnectionSingle); @@ -394,7 +405,7 @@ void closeClosesConnectionFactory() throws Exception { assertTrue(connectionFactory.isClosed(), "ConnectionFactory not closed."); } - @ParameterizedTest(name = "closeFromLb={0}") + @ParameterizedTest(name = "{displayName} [{index}]: closeFromLb={0}") @ValueSource(booleans = {true, false}) void closeGracefulThenClose(boolean closeFromLb) throws ExecutionException, InterruptedException { @@ -504,18 +515,14 @@ void disabledHealthCheckDoesntRun() throws Exception { sendServiceDiscoveryEvents(upEvent("address-1")); for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { - Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any(), null) - .toFuture().get()); - assertThat(e.getCause(), is(UNHEALTHY_HOST_EXCEPTION)); + assertSelectThrows(is(UNHEALTHY_HOST_EXCEPTION)); } assertThat(testExecutor.scheduledTasksPending(), equalTo(0)); for (int i = 0; i < timeAdvancementsTillHealthy - 1; ++i) { unhealthyHostConnectionFactory.advanceTime(testExecutor); - Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any(), null) - .toFuture().get()); - assertThat(e.getCause(), is(UNHEALTHY_HOST_EXCEPTION)); + assertSelectThrows(is(UNHEALTHY_HOST_EXCEPTION)); } unhealthyHostConnectionFactory.advanceTime(testExecutor); @@ -541,17 +548,13 @@ void connectionLimitReachedExceptionDoesNotMarkHostAsUnhealthy() throws Exceptio sendServiceDiscoveryEvents(upEvent("address-1")); for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { - Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any(), null) - .toFuture().get()); - assertThat(e.getCause(), is(exception)); + assertSelectThrows(is(exception)); } assertThat(testExecutor.scheduledTasksPending(), equalTo(0)); for (int i = 0; i < timeAdvancementsTillHealthy - 1; ++i) { unhealthyHostConnectionFactory.advanceTime(testExecutor); - Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any(), null) - .toFuture().get()); - assertThat(e.getCause(), is(exception)); + assertSelectThrows(is(exception)); } unhealthyHostConnectionFactory.advanceTime(testExecutor); @@ -575,9 +578,7 @@ void hostUnhealthyIsHealthChecked() throws Exception { sendServiceDiscoveryEvents(upEvent("address-1")); for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { - Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any(), null) - .toFuture().get()); - assertThat(e.getCause(), is(UNHEALTHY_HOST_EXCEPTION)); + assertSelectThrows(is(UNHEALTHY_HOST_EXCEPTION)); } for (int i = 0; i < timeAdvancementsTillHealthy; ++i) { @@ -622,8 +623,7 @@ public Completable timer(final long delay, final TimeUnit unit) { sendServiceDiscoveryEvents(upEvent("address-1")); // Trigger first health check: - Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any(), null).toFuture().get()); - assertThat(e.getCause(), is(UNHEALTHY_HOST_EXCEPTION)); + assertSelectThrows(is(UNHEALTHY_HOST_EXCEPTION)); // Execute two health checks: first will fail due to connectionFactory, // second - due to an unexpected error from executor: for (int i = 0; i < 2; ++i) { @@ -631,8 +631,7 @@ public Completable timer(final long delay, final TimeUnit unit) { } // Trigger yet another health check: - e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any(), null).toFuture().get()); - assertThat(e.getCause(), is(UNHEALTHY_HOST_EXCEPTION)); + assertSelectThrows(is(UNHEALTHY_HOST_EXCEPTION)); // Execute two health checks: first will fail due to connectionFactory, second succeeds: for (int i = 0; i < 2; ++i) { unhealthyHostConnectionFactory.advanceTime(testExecutor); @@ -662,9 +661,7 @@ void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception { // Imitate concurrency by running multiple threads attempting to establish connections. ExecutorService executor = Executors.newFixedThreadPool(3); try { - final Runnable runnable = () -> - assertThrows(ExecutionException.class, () -> lb.selectConnection(any(), null).toFuture().get()); - + final Runnable runnable = () -> assertSelectThrows(is(UNHEALTHY_HOST_EXCEPTION)); for (int i = 0; i < 1000; i++) { executor.submit(runnable); } @@ -694,9 +691,7 @@ void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception { unhealthyHostConnectionFactory.advanceTime(testExecutor); // Assert still unhealthy - Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any(), null) - .toFuture().get()); - assertThat(e.getCause(), instanceOf(NoAvailableHostException.class)); + assertSelectThrows(instanceOf(NoAvailableHostException.class)); } } finally { // Shutdown the concurrent validation of unhealthiness. @@ -710,6 +705,101 @@ void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception { assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); } + @ParameterizedTest(name = "{displayName} [{index}]: sdReturnsDelta={0}") + @ValueSource(booleans = {false, true}) + void resubscribeToEventsWhenAllHostsAreUnhealthy(boolean sdReturnsDelta) throws Exception { + serviceDiscoveryPublisher.onComplete(); + assertThat(sequentialPublisherSubscriberFunction.isSubscribed(), is(false)); + assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribers(), is(1)); + + DelegatingConnectionFactory alwaysFail12ConnectionFactory = new DelegatingConnectionFactory(address -> { + switch (address) { + case "address-1": + case "address-2": + return Single.failed(UNHEALTHY_HOST_EXCEPTION); + default: + return Single.succeeded(newConnection(address)); + } + }); + lb = defaultLb(alwaysFail12ConnectionFactory); + + assertThat(sequentialPublisherSubscriberFunction.isSubscribed(), is(true)); + assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribers(), is(2)); + + assertAddresses(lb.usedAddresses(), EMPTY_ARRAY); + sendServiceDiscoveryEvents(upEvent("address-1")); + sendServiceDiscoveryEvents(upEvent("address-2")); + assertAddresses(lb.usedAddresses(), "address-1", "address-2"); + + // Force all usedAddresses into UNHEALTHY state + for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD * lb.usedAddresses().size(); ++i) { + assertSelectThrows(is(UNHEALTHY_HOST_EXCEPTION)); + } + assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribers(), is(2)); + // Assert the next select attempt after resubscribe internal triggers re-subscribe + testExecutor.advanceTimeBy(DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL.toMillis() * 2, MILLISECONDS); + assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribers(), is(2)); + assertSelectThrows(instanceOf(NoAvailableHostException.class)); + assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribers(), is(3)); + + // Verify state after re-subscribe + assertAddresses(lb.usedAddresses(), "address-1", "address-2"); + if (sdReturnsDelta) { + sendServiceDiscoveryEvents(upEvent("address-3"), upEvent("address-4"), downEvent("address-1")); + assertAddresses(lb.usedAddresses(), "address-2", "address-3", "address-4"); + sendServiceDiscoveryEvents(downEvent("address-2")); + } else { + sendServiceDiscoveryEvents(upEvent("address-3"), upEvent("address-4")); + } + assertAddresses(lb.usedAddresses(), "address-3", "address-4"); + + // Verify the LB is recovered + Map> expected = new HashMap<>(); + expected.put("address-3", is("address-3")); + expected.put("address-4", is("address-4")); + String selected1 = lb.selectConnection(any(), null).toFuture().get().address(); + assertThat(selected1, is(anyOf(expected.values()))); + expected.remove(selected1); + assertThat(lb.selectConnection(any(), null).toFuture().get().address(), is(anyOf(expected.values()))); + assertConnectionCount(lb.usedAddresses(), connectionsCount("address-3", 1), connectionsCount("address-4", 1)); + } + + @Test + void resubscribeToEventsNotTriggeredWhenDisabled() throws Exception { + serviceDiscoveryPublisher.onComplete(); + assertThat(sequentialPublisherSubscriberFunction.isSubscribed(), is(false)); + assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribers(), is(1)); + + DelegatingConnectionFactory alwaysFailConnectionFactory = + new DelegatingConnectionFactory(address -> Single.failed(UNHEALTHY_HOST_EXCEPTION)); + lb = (RoundRobinLoadBalancer) + new RoundRobinLoadBalancerFactory.Builder() + .healthCheckInterval(ofMillis(50), ofMillis(10)) + // Set resubscribe interval to very large number + .healthCheckResubscribeInterval(ofNanos(MAX_VALUE), ZERO) + .backgroundExecutor(testExecutor) + .build() + .newLoadBalancer(serviceDiscoveryPublisher, alwaysFailConnectionFactory, "test-service"); + + assertThat(sequentialPublisherSubscriberFunction.isSubscribed(), is(true)); + assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribers(), is(2)); + + assertAddresses(lb.usedAddresses(), EMPTY_ARRAY); + sendServiceDiscoveryEvents(upEvent("address-1")); + sendServiceDiscoveryEvents(upEvent("address-2")); + assertAddresses(lb.usedAddresses(), "address-1", "address-2"); + + // Force all usedAddresses into UNHEALTHY state + for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD * lb.usedAddresses().size(); ++i) { + assertSelectThrows(is(UNHEALTHY_HOST_EXCEPTION)); + } + assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribers(), is(2)); + testExecutor.advanceTimeBy(DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL.toMillis() * 2, MILLISECONDS); + assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribers(), is(2)); + assertSelectThrows(instanceOf(NoAvailableHostException.class)); + assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribers(), is(2)); + } + @Test void handleAllDiscoveryEvents() throws Exception { serviceDiscoveryPublisher.onComplete(); @@ -832,6 +922,11 @@ static void assertConnectionCount( assertThat(addresses, iterableMatcher); } + private void assertSelectThrows(Matcher matcher) { + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any(), null).toFuture().get()); + assertThat(e.getCause(), matcher); + } + Map.Entry connectionsCount(String addr, int count) { return new AbstractMap.SimpleImmutableEntry<>(addr, count); } diff --git a/servicetalk-utils-internal/src/main/java/io/servicetalk/utils/internal/DurationUtils.java b/servicetalk-utils-internal/src/main/java/io/servicetalk/utils/internal/DurationUtils.java index 0d610a2b41..3c3fa25867 100644 --- a/servicetalk-utils-internal/src/main/java/io/servicetalk/utils/internal/DurationUtils.java +++ b/servicetalk-utils-internal/src/main/java/io/servicetalk/utils/internal/DurationUtils.java @@ -67,6 +67,22 @@ public static Duration ensurePositive(final Duration duration, final String name return duration; } + /** + * Ensures the duration is non-negative. + * + * @param duration the {@link Duration} to validate + * @param name name of the {@link Duration} variable + * @return the passed duration if all checks pass + * @throws NullPointerException if the passed duration is {@code null} + * @throws IllegalArgumentException if the passed duration is not greater or equal to {@link Duration#ZERO} + */ + public static Duration ensureNonNegative(final Duration duration, final String name) { + if (duration.isNegative()) { + throw new IllegalArgumentException(name + ": " + duration + " (expected >= 0)"); + } + return duration; + } + /** * Checks if the duration is considered "infinite". * diff --git a/servicetalk-utils-internal/src/test/java/io/servicetalk/utils/internal/DurationUtilsTest.java b/servicetalk-utils-internal/src/test/java/io/servicetalk/utils/internal/DurationUtilsTest.java index 6282bbeb9b..11e8c47f39 100644 --- a/servicetalk-utils-internal/src/test/java/io/servicetalk/utils/internal/DurationUtilsTest.java +++ b/servicetalk-utils-internal/src/test/java/io/servicetalk/utils/internal/DurationUtilsTest.java @@ -19,6 +19,7 @@ import java.time.Duration; +import static io.servicetalk.utils.internal.DurationUtils.ensureNonNegative; import static io.servicetalk.utils.internal.DurationUtils.ensurePositive; import static io.servicetalk.utils.internal.DurationUtils.isInfinite; import static io.servicetalk.utils.internal.DurationUtils.isPositive; @@ -26,6 +27,7 @@ import static java.time.Duration.ofSeconds; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; class DurationUtilsTest { @@ -49,9 +51,21 @@ void testIsPositiveNanos() { @Test void testEnsurePositive() { assertThrows(NullPointerException.class, () -> ensurePositive(null, "duration")); - assertThrows(IllegalArgumentException.class, () -> ensurePositive(Duration.ZERO, "duration")); - assertThrows(IllegalArgumentException.class, () -> ensurePositive(ofNanos(1L).negated(), "duration")); assertThrows(IllegalArgumentException.class, () -> ensurePositive(ofSeconds(1L).negated(), "duration")); + assertThrows(IllegalArgumentException.class, () -> ensurePositive(ofNanos(1L).negated(), "duration")); + assertThrows(IllegalArgumentException.class, () -> ensurePositive(Duration.ZERO, "duration")); + assertDoesNotThrow(() -> ensureNonNegative(ofNanos(1L), "duration")); + assertDoesNotThrow(() -> ensureNonNegative(ofSeconds(1L), "duration")); + } + + @Test + void testEnsureNonNegative() { + assertThrows(NullPointerException.class, () -> ensureNonNegative(null, "duration")); + assertThrows(IllegalArgumentException.class, () -> ensureNonNegative(ofSeconds(1L).negated(), "duration")); + assertThrows(IllegalArgumentException.class, () -> ensureNonNegative(ofNanos(1L).negated(), "duration")); + assertDoesNotThrow(() -> ensureNonNegative(Duration.ZERO, "duration")); + assertDoesNotThrow(() -> ensureNonNegative(ofNanos(1L), "duration")); + assertDoesNotThrow(() -> ensureNonNegative(ofSeconds(1L), "duration")); } @Test