Skip to content

Commit

Permalink
LoadBalancer should always consider the first events as initial state
Browse files Browse the repository at this point in the history
Motivation:

Currently, both implementations of the `LoadBalancer` assume 2
possibilities for the first events collection after re-subscribe:
"state of the world" or "delta" and tries to infer which one it is based
on what event statuses are present in the collection. However, it
doesn't provide any guarantees because even if SD impl keeps returning
deltas after a re-subscribe, a delta can contain only `AVAILABLE`
events, but it will be incorrectly interpreted as "state of the world".

Because of Reactive Streams rule clarified in apple#3002, `LoadBalancer`
implementations should always expect "state of the world" for the
first events after re-subscribe.

Modifications:

- Remove inference of the first collection of events after resubscribe
and always expect a new subscriber to start from "state of the world".
- Update `resubscribeToEventsWhenAllHostsAreUnhealthy()` test to reflect
behavior change.

Result:

Both `LoadBalancer` implementations follow `ServiceDiscoverer` contract
and expect the first collection of events after re-subscribe to
represent a "state of the world".
  • Loading branch information
idelpivnitskiy committed Jul 12, 2024
1 parent 821619a commit 9a364b6
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,29 +240,15 @@ private static <R, C extends LoadBalancedConnection> long nextResubscribeTime(
return result;
}

private static <ResolvedAddress> boolean onlyAvailable(
final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
boolean onlyAvailable = !events.isEmpty();
for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
if (!AVAILABLE.equals(event.status())) {
onlyAvailable = false;
break;
}
}
return onlyAvailable;
}

private static <ResolvedAddress, C extends LoadBalancedConnection> boolean notAvailable(
private static <ResolvedAddress, C extends LoadBalancedConnection> boolean contains(
final Host<ResolvedAddress, C> host,
final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
boolean available = false;
for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
if (host.address().equals(event.address())) {
available = true;
break;
return true;
}
}
return !available;
return false;
}

private final class EventSubscriber
Expand Down Expand Up @@ -395,18 +381,11 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
}
firstEventsAfterResubscribe = false;

if (!onlyAvailable(events)) {
// Looks like the current ServiceDiscoverer maintains a state between re-subscribes. It already
// assigned correct states to all hosts. Even if some of them were left UNHEALTHY, we should keep
// running health-checks.
return;
}
// Looks like the current ServiceDiscoverer doesn't maintain a state between re-subscribes and always
// starts from an empty state propagating only AVAILABLE events. To be in sync with the
// ServiceDiscoverer we should clean up and close gracefully all hosts that are not present in the
// initial collection of events, regardless of their current state.
// New Subscription to the ServiceDiscoverer always starts from a state of the world. To be in sync with
// the ServiceDiscoverer state, we should clean up and close gracefully all hosts that are not present
// in the initial collection of events, regardless of their current state.
for (Host<ResolvedAddress, C> host : nextHosts) {
if (notAvailable(host, events)) {
if (!contains(host, events)) {
host.closeAsyncGracefully().subscribe();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,9 +648,8 @@ void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception {
assertThat(selectedConnection, equalTo(properConnection.toFuture().get()));
}

@ParameterizedTest(name = "{displayName} [{index}]: sdReturnsDelta={0}")
@ValueSource(booleans = {false, true})
void resubscribeToEventsWhenAllHostsAreUnhealthy(boolean sdReturnsDelta) throws Exception {
@Test
void resubscribeToEventsWhenAllHostsAreUnhealthy() throws Exception {
serviceDiscoveryPublisher.onComplete();
assertThat(sequentialPublisherSubscriberFunction.isSubscribed(), is(false));
assertThat(sequentialPublisherSubscriberFunction.numberOfSubscribersSeen(), is(1));
Expand Down Expand Up @@ -687,17 +686,12 @@ void resubscribeToEventsWhenAllHostsAreUnhealthy(boolean sdReturnsDelta) throws

// Verify state after re-subscribe
assertAddresses(lb.usedAddresses(), "address-1", "address-2");
if (sdReturnsDelta) {
sendServiceDiscoveryEvents(upEvent("address-3"), upEvent("address-4"), downEvent("address-1"));
assertAddresses(lb.usedAddresses(), "address-2", "address-3", "address-4");
sendServiceDiscoveryEvents(downEvent("address-2"));
} else {
sendServiceDiscoveryEvents(upEvent("address-3"), upEvent("address-4"));
}
assertAddresses(lb.usedAddresses(), "address-3", "address-4");
sendServiceDiscoveryEvents(upEvent("address-2"), upEvent("address-3"), upEvent("address-4"));
assertAddresses(lb.usedAddresses(), "address-2", "address-3", "address-4");

// Verify the LB is recovered
Map<String, Matcher<? super String>> expected = new HashMap<>();
// While "address-2" is still in usedAddresses, it's kept UNHEALTHY and can not be selected
expected.put("address-3", is("address-3"));
expected.put("address-4", is("address-4"));
String selected1 = lb.selectConnection(any(), null).toFuture().get().address();
Expand All @@ -707,7 +701,7 @@ void resubscribeToEventsWhenAllHostsAreUnhealthy(boolean sdReturnsDelta) throws
// These asserts are flaky for p2c because we don't have deterministic selection.
expected.remove(selected1);
assertThat(lb.selectConnection(any(), null).toFuture().get().address(), is(anyOf(expected.values())));
assertConnectionCount(lb.usedAddresses(),
assertConnectionCount(lb.usedAddresses(), connectionsCount("address-2", 0),
connectionsCount("address-3", 1), connectionsCount("address-4", 1));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,29 +240,15 @@ private static <ResolvedAddress, C extends LoadBalancedConnection> boolean allUn
return allUnhealthy;
}

private static <ResolvedAddress> boolean onlyAvailable(
final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
boolean onlyAvailable = !events.isEmpty();
for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
if (!AVAILABLE.equals(event.status())) {
onlyAvailable = false;
break;
}
}
return onlyAvailable;
}

private static <ResolvedAddress, C extends LoadBalancedConnection> boolean notAvailable(
private static <ResolvedAddress, C extends LoadBalancedConnection> boolean contains(
final Host<ResolvedAddress, C> host,
final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
boolean available = false;
for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
if (host.address.equals(event.address())) {
available = true;
break;
return true;
}
}
return !available;
return false;
}

private final class EventSubscriber
Expand Down Expand Up @@ -349,19 +335,12 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R
}
firstEventsAfterResubscribe = false;

if (!onlyAvailable(events)) {
// Looks like the current ServiceDiscoverer maintains a state between re-subscribes. It already
// assigned correct states to all hosts. Even if some of them were left UNHEALTHY, we should keep
// running health-checks.
return;
}
// Looks like the current ServiceDiscoverer doesn't maintain a state between re-subscribes and always
// starts from an empty state propagating only AVAILABLE events. To be in sync with the
// ServiceDiscoverer we should clean up and close gracefully all hosts that are not present in the
// initial collection of events, regardless of their current state.
// New Subscription to the ServiceDiscoverer always starts from a state of the world. To be in sync with
// the ServiceDiscoverer state, we should clean up and close gracefully all hosts that are not present
// in the initial collection of events, regardless of their current state.
final List<Host<ResolvedAddress, C>> currentHosts = usedHosts;
for (Host<ResolvedAddress, C> host : currentHosts) {
if (notAvailable(host, events)) {
if (!contains(host, events)) {
host.closeAsyncGracefully().subscribe();
}
}
Expand Down

0 comments on commit 9a364b6

Please sign in to comment.