Skip to content

Commit

Permalink
Adjusting ServiceDiscovererEvent contract (#1906)
Browse files Browse the repository at this point in the history
Motivation:

Current API of `ServiceDiscovererEvent` is capable of representing the
boolean status of the host being either available or not available. As
seen in the implementation of RoundRobinLoadBalancer`, the resulting
states from the Service Discovery system can have more meanings than a
simple boolean. To prepare for enhanecements, the contract has been
adjusted to allow more states and make it possible for further additions
if those are needed by particular implementations.

Modifications:

- Deprecated `ServiceDiscovererEvent#isAvailable` method and added a
  default implementation to allow removal in child classes,
- Added `ServiceDiscovererEvent.Status` class with `AVAILABLE`, `EXPIRED`, and `UNAVAILABLE` statuses,
- Added `ServiceDiscovererEvent#status` method with default
  implementation to prevent breaking the existing implementations,
- Replaced existing uses of `isAvailable` with comparisons of `status`
  against the constants in `ServiceDiscoveryStatus`,
- `DefaultDnsClient` is capable of returning either of `UNAVAILABLE` or `EXPIRED` statuses for missing records, according to setting in `DefaultDnsServiceDiscovererBuilder#missingRecordStatus`.
- `RoundRobinLoadBalancer` is able to act accordingly to received status,
- `RoundRobinLoadBalancer#eagerConnectionShutdown` setting has been adjusted to allow no influence over received status and provides functionality when necessary for mapping `UNAVAILABLE` to `EXPIRED` and vice-versa.

Result:

`ServiceDiscovererEvent`s are prepared for more advanced, non-binary
statuses. An example is the now introduced `EXPIRED` event, which has a
different meaning than `UNAVAILABLE`. If the `ServiceDiscoverer` decides
to emit such a status, the subscribers can provide additional
functionality in that case such as the behavior provided by `RoundRobinLoadBalancer`.
  • Loading branch information
Dariusz Jedrzejczyk committed Nov 15, 2021
1 parent 6f50031 commit ad81b3f
Show file tree
Hide file tree
Showing 24 changed files with 720 additions and 323 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.EXPIRED;
import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverCompleteFromSource;
Expand Down Expand Up @@ -106,9 +108,13 @@ public DefaultPartitionedClientGroup(final Function<PartitionAttributes, Client>
this.unknownPartitionClient = unknownPartitionClient;
this.partitionMap = partitionMapFactory.newPartitionMap(event ->
new Partition<>(event, closedPartitionClient.apply(event)));
toSource(psdEvents.groupToMany(event -> event.isAvailable() ?
partitionMap.add(event.partitionAddress()).iterator() :
partitionMap.remove(event.partitionAddress()).iterator(), psdMaxQueueSize))
toSource(psdEvents
.groupToMany(event -> UNAVAILABLE.equals(event.status()) ?
partitionMap.remove(event.partitionAddress()).iterator()
// EXPIRED events neither add or remove new partitions so it's safe to call add
// as it will just return current partitions.
: partitionMap.add(event.partitionAddress()).iterator(),
psdMaxQueueSize))
.subscribe(new GroupedByPartitionSubscriber(clientFactory));
}

Expand Down Expand Up @@ -166,11 +172,12 @@ public Publisher<Collection<ServiceDiscovererEvent<R>>> discover(final U ignored

@Override
public boolean test(PSDE evt) {
if (EXPIRED.equals(evt.status())) {
return false;
}
MutableInt counter = addressCount.computeIfAbsent(evt.address(), __ -> new MutableInt());
boolean acceptEvent;
if (evt.isAvailable()) {
acceptEvent = ++counter.value == 1;
} else {
if (UNAVAILABLE.equals(evt.status())) {
acceptEvent = --counter.value == 0;
if (acceptEvent) {
// If address is unavailable and no more add events are pending stop tracking and
Expand All @@ -181,6 +188,8 @@ public boolean test(PSDE evt) {
partition.closeNow();
}
}
} else {
acceptEvent = ++counter.value == 1;
}
return acceptEvent;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.RandomAccess;
import javax.annotation.Nullable;

import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.AVAILABLE;
import static java.util.Collections.binarySearch;

/**
Expand All @@ -45,27 +46,32 @@ private ServiceDiscovererUtils() {
* @param newActiveAddresses The new list of active addresses.<b>This list must be modifiable</b> as it will be
* sorted with {@link List#sort(Comparator)}.
* @param comparator A comparator for the addresses and to use for binary searches.
* @param reporter A reporter for the numbers of available and unavailable events.
* @param reporter A reporter for the numbers of available and missing events.
* @param <T> The type of address.
* @param missingRecordStatus {@link ServiceDiscovererEvent.Status} to use for created
* {@link ServiceDiscovererEvent} when address present in current list but not in the new one.
* @return A list of {@link ServiceDiscovererEvent}s which represents the changes between
* {@code currentActiveAddresses} and {@code newActiveAddresses}, or {@code null} if there are no changes.
*/
@Nullable
public static <T> List<ServiceDiscovererEvent<T>> calculateDifference(List<? extends T> currentActiveAddresses,
List<? extends T> newActiveAddresses,
Comparator<T> comparator,
@Nullable TwoIntsConsumer reporter) {
public static <T> List<ServiceDiscovererEvent<T>> calculateDifference(
List<? extends T> currentActiveAddresses,
List<? extends T> newActiveAddresses,
Comparator<T> comparator,
@Nullable TwoIntsConsumer reporter,
ServiceDiscovererEvent.Status missingRecordStatus) {
// First sort the newAddresses so we can use binary search.
newActiveAddresses.sort(comparator);

// Calculate additions (in newAddresses, not in activeAddresses).
List<ServiceDiscovererEvent<T>> availableEvents =
relativeComplement(true, currentActiveAddresses, newActiveAddresses, comparator, null);
relativeComplement(currentActiveAddresses, newActiveAddresses, comparator, null, AVAILABLE);
// Store nAvailable now because the List may be updated on the next step.
final int nAvailable = availableEvents == null ? 0 : availableEvents.size();
// Calculate removals (in activeAddresses, not in newAddresses).
List<ServiceDiscovererEvent<T>> allEvents =
relativeComplement(false, newActiveAddresses, currentActiveAddresses, comparator, availableEvents);
relativeComplement(newActiveAddresses, currentActiveAddresses, comparator, availableEvents,
missingRecordStatus);

reportEvents(reporter, allEvents, nAvailable);
return allEvents;
Expand All @@ -89,32 +95,32 @@ private static <T> void reportEvents(@Nullable final TwoIntsConsumer reporter,
* {@code sortedA}).
* <p>
* See <a href="https://en.wikipedia.org/wiki/Venn_diagram#Overview">Set Mathematics</a>.
* @param available Will be used for {@link ServiceDiscovererEvent#isAvailable()} for each
* {@link ServiceDiscovererEvent} in the returned {@link List}.
* @param sortedA A sorted {@link List} of which no elements be present in the return value.
* @param sortedB A sorted {@link List} of which elements in this set that are not in {@code sortedA} will be in the
* return value.
* @param comparator Used for binary searches on {@code sortedA} for each element in {@code sortedB}.
* @param result List to append new results to.
* @param status {@link ServiceDiscovererEvent.Status} to use for created {@link ServiceDiscovererEvent}
* in the {@code result}.
* @param <T> The type of resolved address.
* @return the relative complement of {@code sortedA} and {@code sortedB} (elements in {@code sortedB} and not in
* {@code sortedA}).
*/
@Nullable
private static <T> List<ServiceDiscovererEvent<T>> relativeComplement(
boolean available, List<? extends T> sortedA, List<? extends T> sortedB, Comparator<T> comparator,
@Nullable List<ServiceDiscovererEvent<T>> result) {
List<? extends T> sortedA, List<? extends T> sortedB, Comparator<T> comparator,
@Nullable List<ServiceDiscovererEvent<T>> result, ServiceDiscovererEvent.Status status) {
if (sortedB instanceof RandomAccess) {
for (int i = 0; i < sortedB.size(); ++i) {
final T valueB = sortedB.get(i);
if (binarySearch(sortedA, valueB, comparator) < 0) {
if (result == null) {
result = new ArrayList<>(4);
result.add(new DefaultServiceDiscovererEvent<>(valueB, available));
result.add(new DefaultServiceDiscovererEvent<>(valueB, status));
} else if (comparator.compare(valueB, result.get(result.size() - 1).address()) != 0) {
// make sure we don't include duplicates. the input lists are sorted and we process in order so
// we verify the previous entry is not a duplicate.
result.add(new DefaultServiceDiscovererEvent<>(valueB, available));
result.add(new DefaultServiceDiscovererEvent<>(valueB, status));
}
}
}
Expand All @@ -123,11 +129,11 @@ private static <T> List<ServiceDiscovererEvent<T>> relativeComplement(
if (binarySearch(sortedA, valueB, comparator) < 0) {
if (result == null) {
result = new ArrayList<>(4);
result.add(new DefaultServiceDiscovererEvent<>(valueB, available));
result.add(new DefaultServiceDiscovererEvent<>(valueB, status));
} else if (comparator.compare(valueB, result.get(result.size() - 1).address()) != 0) {
// make sure we don't include duplicates. the input lists are sorted and we process in order so
// we verify the previous entry is not a duplicate.
result.add(new DefaultServiceDiscovererEvent<>(valueB, available));
result.add(new DefaultServiceDiscovererEvent<>(valueB, status));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.servicetalk.client.api;

import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.AVAILABLE;
import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -23,26 +25,44 @@
*/
public final class DefaultServiceDiscovererEvent<T> implements ServiceDiscovererEvent<T> {
private final T address;
private final boolean available;
private final Status status;

/**
* Create a new instance.
* @param address The address returned by {@link #address()}.
* @param available Value returned by {@link #available}.
* @param available Value used to determine {@link #status()}.
* @deprecated Use
* {@link #DefaultServiceDiscovererEvent(Object, io.servicetalk.client.api.ServiceDiscovererEvent.Status)}.
*/
@Deprecated
public DefaultServiceDiscovererEvent(T address, boolean available) {
this.address = requireNonNull(address);
this.available = available;
this.status = available ? AVAILABLE : UNAVAILABLE;
}

/**
* Create a new instance.
* @param address The address returned by {@link #address()}.
* @param status Value returned by {@link #status()}.
*/
public DefaultServiceDiscovererEvent(T address, Status status) {
this.address = requireNonNull(address);
this.status = requireNonNull(status);
}

@Override
public T address() {
return address;
}

@Override
public Status status() {
return status;
}

@Override
public boolean isAvailable() {
return available;
return AVAILABLE.equals(status);
}

@Override
Expand All @@ -53,27 +73,23 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}

final DefaultServiceDiscovererEvent<?> that = (DefaultServiceDiscovererEvent<?>) o;

if (available != that.available) {
return false;
}
return address.equals(that.address);
return status.equals(that.status) && address.equals(that.address);
}

@Override
public int hashCode() {
int result = address.hashCode();
result = 31 * result + (available ? 1 : 0);
result = 31 * result + status.hashCode();
return result;
}

@Override
public String toString() {
return "DefaultServiceDiscovererEvent{" +
"address=" + address +
", available=" + available +
", status=" + status +
", available=" + isAvailable() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/
package io.servicetalk.client.api;

import java.util.Locale;

import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.AVAILABLE;
import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;

/**
* Notification from the Service Discovery system that availability for an address has changed.
* @param <ResolvedAddress> the type of address after resolution.
Expand All @@ -26,10 +31,103 @@ public interface ServiceDiscovererEvent<ResolvedAddress> {
*/
ResolvedAddress address();

/**
* {@link Status Status} of the event instructing the {@link ServiceDiscoverer} what actions
* to take upon the associated {@link #address() address}.
* <p>
* Note, the default implementation calls {@link #isAvailable()} to allow frictionless adoption, but once the
* implementing class removes the override for the deprecated method {@link #isAvailable()},
* it will be also necessary to override {@link #status()}.
* @return {@link Status Status} of the associated {@link #address()}.
*/
default Status status() {
return isAvailable() ? AVAILABLE : UNAVAILABLE;
}

/**
* Determine if {@link #address()} is now available or unavailable.
* @return {@code true} if {@link #address()} is now available or false if the {@link #address()} is now
* unavailable.
* @deprecated Implement and use {@link #status()}. This method will be removed.
*/
boolean isAvailable();
@Deprecated
default boolean isAvailable() {
throw new UnsupportedOperationException("Migrate to status() method. This method may be implemented" +
" temporarily until migration to status() is complete.");
}

/**
* Status provided by the {@link ServiceDiscoverer} system that guides the actions of {@link LoadBalancer} upon the
* bound {@link ServiceDiscovererEvent#address()} (via {@link ServiceDiscovererEvent}).
*/
final class Status {

/**
* Signifies the {@link ServiceDiscovererEvent#address()} is available for use in connection establishment.
*/
public static final Status AVAILABLE = new Status("available");

/**
* Signifies the {@link ServiceDiscovererEvent#address()} is not available for use and all currently established
* connections should be closed.
*/
public static final Status UNAVAILABLE = new Status("unavailable");

/**
* Signifies the {@link ServiceDiscovererEvent#address()} is expired and should not be used for establishing
* new connections. It doesn't necessarily mean that the host should not be used in traffic routing over already
* established connections as long as they are kept open by the remote peer. The implementations can have
* different policies in that regard.
*/
public static final Status EXPIRED = new Status("expired");

private final String name;

private Status(final String name) {
if (name.isEmpty()) {
throw new IllegalArgumentException("Status name cannot be empty");
}
this.name = name.toLowerCase(Locale.ENGLISH);
}

/**
* Returns an {@link Status} for the specified name.
* @param name the status name.
* @return {@link Status} representing the status for given name.
*/
public static Status of(final String name) {
switch (name.toLowerCase(Locale.ENGLISH)) {
case "available":
return AVAILABLE;
case "unavailable":
return UNAVAILABLE;
case "expired":
return EXPIRED;
default:
return new Status(name);
}
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ServiceDiscovererEvent.Status)) {
return false;
}
final Status that = (Status) o;
return name.equals(that.name);
}

@Override
public int hashCode() {
return name.hashCode();
}

@Override
public String toString() {
return name;
}
}
}
1 change: 1 addition & 0 deletions servicetalk-dns-discovery-netty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies {
testImplementation project(":servicetalk-concurrent-test-internal")
testImplementation "commons-lang:commons-lang:$commonsLangVersion"
testImplementation "org.junit.jupiter:junit-jupiter-api"
testImplementation "org.junit.jupiter:junit-jupiter-params"
testImplementation "org.apache.directory.server:apacheds-protocol-dns:$apacheDirectoryServerVersion"
testImplementation "org.hamcrest:hamcrest:$hamcrestVersion"
testImplementation "org.mockito:mockito-core:$mockitoCoreVersion"
Expand Down
Loading

0 comments on commit ad81b3f

Please sign in to comment.