From d48d8f59053a567670f803b82ac6e23d4719e39f Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 12 Jul 2024 09:53:13 -0700 Subject: [PATCH] Avoid race b/w ServiceDiscoverer events after cancel and re-subscribe Motivation: #2514 added an ability for `LoadBalancer` to cancel discovery stream and re-subscribe in attempt to trigger a new resolution when it's internal state became `UNHEALTHY`. However, it did not account for a possibility of new events racing with cancellation or a Publisher not stopping updates immediately. As a result, it's possible to end up in a corrupted state if old `EventSubscriber` receives new events after a new `EventSubscriber` received a new "state of the world". Modifications: - Track the current `EventSubscriber` and process events only if it's current. - Enhance `resubscribeToEventsWhenAllHostsAreUnhealthy()` to simulate described behavior. Result: New "state of the world" can not be corrupted by events emitted for the old `EventSubscriber`. --- .../loadbalancer/DefaultLoadBalancer.java | 26 +++++++++++++++---- .../loadbalancer/LoadBalancerTest.java | 16 +++++++++++- .../LoadBalancerTestScaffold.java | 11 ++++---- .../loadbalancer/RoundRobinLoadBalancer.java | 16 +++++++++++- 4 files changed, 57 insertions(+), 12 deletions(-) diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java index ad9a60d33f..ebae9b2eea 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java @@ -90,6 +90,8 @@ final class DefaultLoadBalancer hostSelector; @@ -180,10 +182,13 @@ private void subscribeToEvents(boolean resubscribe) { // This method is invoked only when we are in RESUBSCRIBING state. Only one thread can own this state. assert nextResubscribeTime == RESUBSCRIBING; if (resubscribe) { + assert healthCheckConfig != null : "Resubscribe can happen only when health-checking is configured"; LOGGER.debug("{}: resubscribing to the ServiceDiscoverer event publisher.", this); discoveryCancellable.cancelCurrent(); } - toSource(eventPublisher).subscribe(new EventSubscriber(resubscribe)); + final EventSubscriber eventSubscriber = new EventSubscriber(resubscribe); + this.currentSubscriber = eventSubscriber; + toSource(eventPublisher).subscribe(eventSubscriber); if (healthCheckConfig != null) { assert healthCheckConfig.executor instanceof NormalizedTimeSourceExecutor; nextResubscribeTime = nextResubscribeTime(healthCheckConfig, this); @@ -273,16 +278,27 @@ public void onSubscribe(final Subscription s) { @Override public void onNext(@Nullable final Collection> events) { if (events == null || events.isEmpty()) { - LOGGER.debug("{}: unexpectedly received null or empty list instead of events.", - DefaultLoadBalancer.this); + LOGGER.debug("{}: unexpectedly received null or empty collection instead of events: {}", + DefaultLoadBalancer.this, events); return; } sequentialExecutor.execute(() -> sequentialOnNext(events)); } private void sequentialOnNext(Collection> events) { - if (isClosed || events.isEmpty()) { - // nothing to do if the load balancer is closed or there are no events. + assert !events.isEmpty(); + if (isClosed) { + // nothing to do if the load balancer is closed. + return; + } + + // According to Reactive Streams Rule 1.8 + // (https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.8) new events will + // stop eventually but not guaranteed to stop immediately after cancellation or could race with cancel. + // Therefore, we should check that this is the current Subscriber before processing new events. + if (healthCheckConfig != null && currentSubscriber != this) { + LOGGER.debug("{}: received new events after cancelling previous subscription, discarding: {}", + DefaultLoadBalancer.this, events); return; } diff --git a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/LoadBalancerTest.java b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/LoadBalancerTest.java index 7584850f5c..152872bdfe 100644 --- a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/LoadBalancerTest.java +++ b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/LoadBalancerTest.java @@ -39,6 +39,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -678,14 +679,27 @@ void resubscribeToEventsWhenAllHostsAreUnhealthy() throws Exception { assertSelectThrows(is(UNHEALTHY_HOST_EXCEPTION)); } assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(2)); - // Assert the next select attempt after resubscribe internal triggers re-subscribe + // Advance time to allow re-subscribe on the next select attempt testExecutor.advanceTimeBy(DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL.toMillis() * 2, MILLISECONDS); assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(2)); + + // Grab previous Subscriber to simulate more events after cancellation + Subscriber>> oldSubscriber = + sequentialPublisherSubscriberFunction.subscriber(); + assertThat(oldSubscriber, is(notNullValue())); + + // Assert the next select attempt triggers re-subscribe assertSelectThrows(instanceOf(NoActiveHostException.class)); assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(3)); // Verify state after re-subscribe assertAddresses(lb.usedAddresses(), "address-1", "address-2"); + + // Events for the oldSubscriber must be discarded + oldSubscriber.onNext(Arrays.asList(upEvent("address-10"), upEvent("address-11"))); + assertAddresses(lb.usedAddresses(), "address-1", "address-2"); + + // Events for the new Subscriber change the state sendServiceDiscoveryEvents(upEvent("address-2"), upEvent("address-3"), upEvent("address-4")); assertAddresses(lb.usedAddresses(), "address-2", "address-3", "address-4"); diff --git a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/LoadBalancerTestScaffold.java b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/LoadBalancerTestScaffold.java index 71b13e0dee..828d9e8712 100644 --- a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/LoadBalancerTestScaffold.java +++ b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/LoadBalancerTestScaffold.java @@ -116,26 +116,27 @@ abstract TestableLoadBalancer newTestLoadBal TestPublisher>> serviceDiscoveryPublisher, TestConnectionFactory connectionFactory); - void sendServiceDiscoveryEvents(final ServiceDiscovererEvent... events) { + @SafeVarargs + final void sendServiceDiscoveryEvents(final ServiceDiscovererEvent... events) { sendServiceDiscoveryEvents(serviceDiscoveryPublisher, events); } @SuppressWarnings("unchecked") private void sendServiceDiscoveryEvents( TestPublisher>> serviceDiscoveryPublisher, - final ServiceDiscovererEvent... events) { + final ServiceDiscovererEvent... events) { serviceDiscoveryPublisher.onNext(Arrays.asList(events)); } - ServiceDiscovererEvent upEvent(final String address) { + ServiceDiscovererEvent upEvent(final String address) { return new DefaultServiceDiscovererEvent<>(address, AVAILABLE); } - ServiceDiscovererEvent downEvent(final String address) { + ServiceDiscovererEvent downEvent(final String address) { return new DefaultServiceDiscovererEvent<>(address, eagerConnectionShutdown() ? UNAVAILABLE : EXPIRED); } - ServiceDiscovererEvent downEvent(final String address, ServiceDiscovererEvent.Status status) { + ServiceDiscovererEvent downEvent(final String address, ServiceDiscovererEvent.Status status) { return new DefaultServiceDiscovererEvent<>(address, status); } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index 6bf279bb31..e76753d9d4 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -135,6 +135,8 @@ final class RoundRobinLoadBalancer> usedHosts = emptyList(); + @Nullable + private volatile EventSubscriber currentSubscriber; private final String id; private final String targetResource; @@ -207,10 +209,13 @@ private void subscribeToEvents(boolean resubscribe) { // This method is invoked only when we are in RESUBSCRIBING state. Only one thread can own this state. assert nextResubscribeTime == RESUBSCRIBING; if (resubscribe) { + assert healthCheckConfig != null : "Resubscribe can happen only when health-checking is configured"; LOGGER.debug("{}: resubscribing to the ServiceDiscoverer event publisher.", this); discoveryCancellable.cancelCurrent(); } - toSource(eventPublisher).subscribe(new EventSubscriber(resubscribe)); + final EventSubscriber eventSubscriber = new EventSubscriber(resubscribe); + this.currentSubscriber = eventSubscriber; + toSource(eventPublisher).subscribe(eventSubscriber); if (healthCheckConfig != null) { assert healthCheckConfig.executor instanceof NormalizedTimeSourceExecutor; nextResubscribeTime = nextResubscribeTime(healthCheckConfig, this); @@ -276,6 +281,15 @@ public void onNext(@Nullable final Collection event : events) { final ServiceDiscovererEvent.Status eventStatus = event.status(); LOGGER.debug("{}: received new ServiceDiscoverer event {}. Inferred status: {}.",