Skip to content

Commit

Permalink
Avoid race b/w ServiceDiscoverer events after cancel and re-subscribe
Browse files Browse the repository at this point in the history
Motivation:

#2514 added an ability for `LoadBalancer` to cancel discovery stream and
re-subscribe in attempt to trigger a new resolution when it's internal
state became `UNHEALTHY`. However, it did not account for a possibility
of new events racing with cancellation or a Publisher not stopping
updates immediately. As a result, it's possible to end up in a corrupted
state if old `EventSubscriber` receives new events after a new
`EventSubscriber` received a new "state of the world".

Modifications:

- Track the current `EventSubscriber` and process events only if it's
current.
- Enhance `resubscribeToEventsWhenAllHostsAreUnhealthy()` to simulate
described behavior.

Result:

New "state of the world" can not be corrupted by events emitted for the
old `EventSubscriber`.
  • Loading branch information
idelpivnitskiy committed Jul 12, 2024
1 parent 9a364b6 commit d48d8f5
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
private static final long RESUBSCRIBING = -1L;

private volatile long nextResubscribeTime = RESUBSCRIBING;
@Nullable
private volatile EventSubscriber currentSubscriber;

// writes are protected by `sequentialExecutor` but the field can be read by any thread.
private volatile HostSelector<ResolvedAddress, C> hostSelector;
Expand Down Expand Up @@ -180,10 +182,13 @@ private void subscribeToEvents(boolean resubscribe) {
// This method is invoked only when we are in RESUBSCRIBING state. Only one thread can own this state.
assert nextResubscribeTime == RESUBSCRIBING;
if (resubscribe) {
assert healthCheckConfig != null : "Resubscribe can happen only when health-checking is configured";
LOGGER.debug("{}: resubscribing to the ServiceDiscoverer event publisher.", this);
discoveryCancellable.cancelCurrent();
}
toSource(eventPublisher).subscribe(new EventSubscriber(resubscribe));
final EventSubscriber eventSubscriber = new EventSubscriber(resubscribe);
this.currentSubscriber = eventSubscriber;
toSource(eventPublisher).subscribe(eventSubscriber);
if (healthCheckConfig != null) {
assert healthCheckConfig.executor instanceof NormalizedTimeSourceExecutor;
nextResubscribeTime = nextResubscribeTime(healthCheckConfig, this);
Expand Down Expand Up @@ -273,16 +278,27 @@ public void onSubscribe(final Subscription s) {
@Override
public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
if (events == null || events.isEmpty()) {
LOGGER.debug("{}: unexpectedly received null or empty list instead of events.",
DefaultLoadBalancer.this);
LOGGER.debug("{}: unexpectedly received null or empty collection instead of events: {}",
DefaultLoadBalancer.this, events);
return;
}
sequentialExecutor.execute(() -> sequentialOnNext(events));
}

private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
if (isClosed || events.isEmpty()) {
// nothing to do if the load balancer is closed or there are no events.
assert !events.isEmpty();
if (isClosed) {
// nothing to do if the load balancer is closed.
return;
}

// According to Reactive Streams Rule 1.8
// (https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.8) new events will
// stop eventually but not guaranteed to stop immediately after cancellation or could race with cancel.
// Therefore, we should check that this is the current Subscriber before processing new events.
if (healthCheckConfig != null && currentSubscriber != this) {
LOGGER.debug("{}: received new events after cancelling previous subscription, discarding: {}",
DefaultLoadBalancer.this, events);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -678,14 +679,27 @@ void resubscribeToEventsWhenAllHostsAreUnhealthy() throws Exception {
assertSelectThrows(is(UNHEALTHY_HOST_EXCEPTION));
}
assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(2));
// Assert the next select attempt after resubscribe internal triggers re-subscribe
// Advance time to allow re-subscribe on the next select attempt
testExecutor.advanceTimeBy(DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL.toMillis() * 2, MILLISECONDS);
assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(2));

// Grab previous Subscriber to simulate more events after cancellation
Subscriber<? super Collection<ServiceDiscovererEvent<String>>> oldSubscriber =
sequentialPublisherSubscriberFunction.subscriber();
assertThat(oldSubscriber, is(notNullValue()));

// Assert the next select attempt triggers re-subscribe
assertSelectThrows(instanceOf(NoActiveHostException.class));
assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(3));

// Verify state after re-subscribe
assertAddresses(lb.usedAddresses(), "address-1", "address-2");

// Events for the oldSubscriber must be discarded
oldSubscriber.onNext(Arrays.asList(upEvent("address-10"), upEvent("address-11")));
assertAddresses(lb.usedAddresses(), "address-1", "address-2");

// Events for the new Subscriber change the state
sendServiceDiscoveryEvents(upEvent("address-2"), upEvent("address-3"), upEvent("address-4"));
assertAddresses(lb.usedAddresses(), "address-2", "address-3", "address-4");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,26 +116,27 @@ abstract TestableLoadBalancer<String, TestLoadBalancedConnection> newTestLoadBal
TestPublisher<Collection<ServiceDiscovererEvent<String>>> serviceDiscoveryPublisher,
TestConnectionFactory connectionFactory);

void sendServiceDiscoveryEvents(final ServiceDiscovererEvent... events) {
@SafeVarargs
final void sendServiceDiscoveryEvents(final ServiceDiscovererEvent<String>... events) {
sendServiceDiscoveryEvents(serviceDiscoveryPublisher, events);
}

@SuppressWarnings("unchecked")
private void sendServiceDiscoveryEvents(
TestPublisher<Collection<ServiceDiscovererEvent<String>>> serviceDiscoveryPublisher,
final ServiceDiscovererEvent... events) {
final ServiceDiscovererEvent<String>... events) {
serviceDiscoveryPublisher.onNext(Arrays.asList(events));
}

ServiceDiscovererEvent upEvent(final String address) {
ServiceDiscovererEvent<String> upEvent(final String address) {
return new DefaultServiceDiscovererEvent<>(address, AVAILABLE);
}

ServiceDiscovererEvent downEvent(final String address) {
ServiceDiscovererEvent<String> downEvent(final String address) {
return new DefaultServiceDiscovererEvent<>(address, eagerConnectionShutdown() ? UNAVAILABLE : EXPIRED);
}

ServiceDiscovererEvent downEvent(final String address, ServiceDiscovererEvent.Status status) {
ServiceDiscovererEvent<String> downEvent(final String address, ServiceDiscovererEvent.Status status) {
return new DefaultServiceDiscovererEvent<>(address, status);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ final class RoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedConnec
@SuppressWarnings("unused")
private volatile int index;
private volatile List<Host<ResolvedAddress, C>> usedHosts = emptyList();
@Nullable
private volatile EventSubscriber currentSubscriber;

private final String id;
private final String targetResource;
Expand Down Expand Up @@ -207,10 +209,13 @@ private void subscribeToEvents(boolean resubscribe) {
// This method is invoked only when we are in RESUBSCRIBING state. Only one thread can own this state.
assert nextResubscribeTime == RESUBSCRIBING;
if (resubscribe) {
assert healthCheckConfig != null : "Resubscribe can happen only when health-checking is configured";
LOGGER.debug("{}: resubscribing to the ServiceDiscoverer event publisher.", this);
discoveryCancellable.cancelCurrent();
}
toSource(eventPublisher).subscribe(new EventSubscriber(resubscribe));
final EventSubscriber eventSubscriber = new EventSubscriber(resubscribe);
this.currentSubscriber = eventSubscriber;
toSource(eventPublisher).subscribe(eventSubscriber);
if (healthCheckConfig != null) {
assert healthCheckConfig.executor instanceof NormalizedTimeSourceExecutor;
nextResubscribeTime = nextResubscribeTime(healthCheckConfig, this);
Expand Down Expand Up @@ -276,6 +281,15 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R
LOGGER.debug("{}: unexpectedly received null instead of events.", RoundRobinLoadBalancer.this);
return;
}
// According to Reactive Streams Rule 1.8
// (https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.8) new events will
// stop eventually but not guaranteed to stop immediately after cancellation or could race with cancel.
// Therefore, we should check that this is the current Subscriber before processing new events.
if (healthCheckConfig != null && currentSubscriber != this) {
LOGGER.debug("{}: received new events after cancelling previous subscription, discarding: {}",
RoundRobinLoadBalancer.this, events);
return;
}
for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
final ServiceDiscovererEvent.Status eventStatus = event.status();
LOGGER.debug("{}: received new ServiceDiscoverer event {}. Inferred status: {}.",
Expand Down

0 comments on commit d48d8f5

Please sign in to comment.