Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid race b/w ServiceDiscoverer events after cancel and re-subscribe #3005

Merged
merged 2 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
private static final long RESUBSCRIBING = -1L;

private volatile long nextResubscribeTime = RESUBSCRIBING;
@Nullable
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
private volatile EventSubscriber currentSubscriber;

// writes are protected by `sequentialExecutor` but the field can be read by any thread.
private volatile HostSelector<ResolvedAddress, C> hostSelector;
Expand Down Expand Up @@ -181,10 +183,13 @@ private void subscribeToEvents(boolean resubscribe) {
// This method is invoked only when we are in RESUBSCRIBING state. Only one thread can own this state.
assert nextResubscribeTime == RESUBSCRIBING;
if (resubscribe) {
assert healthCheckConfig != null : "Resubscribe can happen only when health-checking is configured";
LOGGER.debug("{}: resubscribing to the ServiceDiscoverer event publisher.", this);
discoveryCancellable.cancelCurrent();
}
toSource(eventPublisher).subscribe(new EventSubscriber(resubscribe));
final EventSubscriber eventSubscriber = new EventSubscriber(resubscribe);
this.currentSubscriber = eventSubscriber;
toSource(eventPublisher).subscribe(eventSubscriber);
if (healthCheckConfig != null) {
assert healthCheckConfig.executor instanceof NormalizedTimeSourceExecutor;
nextResubscribeTime = nextResubscribeTime(healthCheckConfig, this);
Expand Down Expand Up @@ -274,16 +279,27 @@ public void onSubscribe(final Subscription s) {
@Override
public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
if (events == null || events.isEmpty()) {
LOGGER.debug("{}: unexpectedly received null or empty list instead of events.",
DefaultLoadBalancer.this);
LOGGER.debug("{}: unexpectedly received null or empty collection instead of events: {}",
DefaultLoadBalancer.this, events);
return;
}
sequentialExecutor.execute(() -> sequentialOnNext(events));
}

private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
if (isClosed || events.isEmpty()) {
// nothing to do if the load balancer is closed or there are no events.
assert !events.isEmpty();
if (isClosed) {
// nothing to do if the load balancer is closed.
return;
}

// According to Reactive Streams Rule 1.8
// (https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.8) new events will
// stop eventually but not guaranteed to stop immediately after cancellation or could race with cancel.
// Therefore, we should check that this is the current Subscriber before processing new events.
if (currentSubscriber != this) {
LOGGER.debug("{}: received new events after cancelling previous subscription, discarding: {}",
DefaultLoadBalancer.this, events);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -678,14 +679,27 @@ void resubscribeToEventsWhenAllHostsAreUnhealthy() throws Exception {
assertSelectThrows(is(UNHEALTHY_HOST_EXCEPTION));
}
assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(2));
// Assert the next select attempt after resubscribe internal triggers re-subscribe
// Advance time to allow re-subscribe on the next select attempt
testExecutor.advanceTimeBy(DEFAULT_HEALTH_CHECK_RESUBSCRIBE_INTERVAL.toMillis() * 2, MILLISECONDS);
assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(2));

// Grab previous Subscriber to simulate more events after cancellation
Subscriber<? super Collection<ServiceDiscovererEvent<String>>> oldSubscriber =
sequentialPublisherSubscriberFunction.subscriber();
assertThat(oldSubscriber, is(notNullValue()));

// Assert the next select attempt triggers re-subscribe
assertSelectThrows(instanceOf(NoActiveHostException.class));
assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(3));

// Verify state after re-subscribe
assertAddresses(lb.usedAddresses(), "address-1", "address-2");

// Events for the oldSubscriber must be discarded
oldSubscriber.onNext(Arrays.asList(upEvent("address-10"), upEvent("address-11")));
assertAddresses(lb.usedAddresses(), "address-1", "address-2");

// Events for the new Subscriber change the state
sendServiceDiscoveryEvents(upEvent("address-2"), upEvent("address-3"), upEvent("address-4"));
assertAddresses(lb.usedAddresses(), "address-2", "address-3", "address-4");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,26 +116,27 @@ abstract TestableLoadBalancer<String, TestLoadBalancedConnection> newTestLoadBal
TestPublisher<Collection<ServiceDiscovererEvent<String>>> serviceDiscoveryPublisher,
TestConnectionFactory connectionFactory);

void sendServiceDiscoveryEvents(final ServiceDiscovererEvent... events) {
@SafeVarargs
final void sendServiceDiscoveryEvents(final ServiceDiscovererEvent<String>... events) {
sendServiceDiscoveryEvents(serviceDiscoveryPublisher, events);
}

@SuppressWarnings("unchecked")
private void sendServiceDiscoveryEvents(
TestPublisher<Collection<ServiceDiscovererEvent<String>>> serviceDiscoveryPublisher,
final ServiceDiscovererEvent... events) {
final ServiceDiscovererEvent<String>... events) {
serviceDiscoveryPublisher.onNext(Arrays.asList(events));
}

ServiceDiscovererEvent upEvent(final String address) {
ServiceDiscovererEvent<String> upEvent(final String address) {
return new DefaultServiceDiscovererEvent<>(address, AVAILABLE);
}

ServiceDiscovererEvent downEvent(final String address) {
ServiceDiscovererEvent<String> downEvent(final String address) {
return new DefaultServiceDiscovererEvent<>(address, eagerConnectionShutdown() ? UNAVAILABLE : EXPIRED);
}

ServiceDiscovererEvent downEvent(final String address, ServiceDiscovererEvent.Status status) {
ServiceDiscovererEvent<String> downEvent(final String address, ServiceDiscovererEvent.Status status) {
return new DefaultServiceDiscovererEvent<>(address, status);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ final class RoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedConnec
@SuppressWarnings("unused")
private volatile int index;
private volatile List<Host<ResolvedAddress, C>> usedHosts = emptyList();
@Nullable
private volatile EventSubscriber currentSubscriber;

private final String id;
private final String targetResource;
Expand Down Expand Up @@ -207,10 +209,13 @@ private void subscribeToEvents(boolean resubscribe) {
// This method is invoked only when we are in RESUBSCRIBING state. Only one thread can own this state.
assert nextResubscribeTime == RESUBSCRIBING;
if (resubscribe) {
assert healthCheckConfig != null : "Resubscribe can happen only when health-checking is configured";
LOGGER.debug("{}: resubscribing to the ServiceDiscoverer event publisher.", this);
discoveryCancellable.cancelCurrent();
}
toSource(eventPublisher).subscribe(new EventSubscriber(resubscribe));
final EventSubscriber eventSubscriber = new EventSubscriber(resubscribe);
this.currentSubscriber = eventSubscriber;
toSource(eventPublisher).subscribe(eventSubscriber);
if (healthCheckConfig != null) {
assert healthCheckConfig.executor instanceof NormalizedTimeSourceExecutor;
nextResubscribeTime = nextResubscribeTime(healthCheckConfig, this);
Expand Down Expand Up @@ -276,6 +281,15 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R
LOGGER.debug("{}: unexpectedly received null instead of events.", RoundRobinLoadBalancer.this);
return;
}
// According to Reactive Streams Rule 1.8
// (https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.8) new events will
// stop eventually but not guaranteed to stop immediately after cancellation or could race with cancel.
// Therefore, we should check that this is the current Subscriber before processing new events.
if (currentSubscriber != this) {
LOGGER.debug("{}: received new events after cancelling previous subscription, discarding: {}",
RoundRobinLoadBalancer.this, events);
return;
}
for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
final ServiceDiscovererEvent.Status eventStatus = event.status();
LOGGER.debug("{}: received new ServiceDiscoverer event {}. Inferred status: {}.",
Expand Down
Loading