Skip to content

Commit

Permalink
LoadBalancer and ConnectionFactory: add a context of the caller (#…
Browse files Browse the repository at this point in the history
…2168)

Motivation:

`LoadBalancer` and `ConnectionFactory` API is disconnected from  the
caller, lacks context who/why selects/creates a connection, and has no
API to propagate additional information from the caller (request).

Modifications:

- `LoadBalancer`: add new `selectConnection(Predicate, ContextMap)`
method, deprecate `selectConnection(Predicate)`;
- `ConnectionFactory`: add new
`newConnection(Address, ContextMap, TransportObserver)` method,
deprecate `newConnection(Address, TransportObserver)`;
- Migrate all LB/CF implementations and tests to use new API;
- Add `DeprecatedToNewConnectionFactoryFilter` after each user-defined
CF to make sure old/new filters work in the same pipeline;

Result:

Information can be propagated from the caller to the
`LoadBalancer` and `ConnectionFactory`.
  • Loading branch information
idelpivnitskiy authored Apr 13, 2022
1 parent ac4dddb commit 4141e33
Show file tree
Hide file tree
Showing 32 changed files with 456 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory;
import io.servicetalk.transport.api.TransportObserver;

Expand Down Expand Up @@ -98,6 +99,7 @@ private ConnFactory() {

@Override
public Single<LoadBalancedConnection> newConnection(final InetSocketAddress inetSocketAddress,
@Nullable final ContextMap context,
@Nullable final TransportObserver observer) {
return succeeded(new LoadBalancedConnection() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
*/
package io.servicetalk.client.api;

import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.transport.api.TransportObserver;

import javax.annotation.Nullable;

import static io.servicetalk.client.api.DeprecatedToNewConnectionFactoryFilter.CONNECTION_FACTORY_CONTEXT_MAP_KEY;

/**
* A factory for creating new connections.
*
Expand All @@ -38,6 +42,32 @@ public interface ConnectionFactory<ResolvedAddress, C extends ListenableAsyncClo
* @param address to connect.
* @param observer {@link TransportObserver} for the newly created connection.
* @return {@link Single} that emits the created connection.
* @deprecated Use {@link #newConnection(Object, ContextMap, TransportObserver)}.
*/
@Deprecated // FIXME: 0.43 - remove deprecated method
default Single<C> newConnection(ResolvedAddress address, @Nullable TransportObserver observer) {
throw new UnsupportedOperationException("ConnectionFactory#newConnection(ResolvedAddress, TransportObserver) " +
"is not supported by " + getClass());
}

/**
* Creates and asynchronously returns a connection.
*
* @param address to connect.
* @param context {@link ContextMap context} of the caller (e.g. request context) or {@code null} if no context
* provided. {@code null} context may also mean that a connection is created outside the normal request processing
* (e.g. health-checking).
* @param observer {@link TransportObserver} for the newly created connection or {@code null} if no observer
* provided.
* @return {@link Single} that emits the created connection.
*/
Single<C> newConnection(ResolvedAddress address, @Nullable TransportObserver observer);
default Single<C> newConnection(ResolvedAddress address, @Nullable ContextMap context,
@Nullable TransportObserver observer) { // FIXME: 0.43 - remove default impl
return Single.defer(() -> {
if (context != null) {
AsyncContext.put(CONNECTION_FACTORY_CONTEXT_MAP_KEY, context);
}
return newConnection(address, observer).shareContextOnSubscribe();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ static <RA, C extends ListenableAsyncCloseable> ConnectionFactoryFilter<RA, C> i
*/
default ConnectionFactoryFilter<ResolvedAddress, C> append(ConnectionFactoryFilter<ResolvedAddress, C> before) {
requireNonNull(before);
return withStrategy(service -> create(before.create(service)),
return withStrategy(service -> create(before.create(
new DeprecatedToNewConnectionFactoryFilter<ResolvedAddress, C>().create(service))),
this.requiredOffloads().merge(before.requiredOffloads()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
*/
package io.servicetalk.client.api;

import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.transport.api.TransportObserver;

import javax.annotation.Nullable;

import static io.servicetalk.client.api.DeprecatedToNewConnectionFactoryFilter.CONNECTION_FACTORY_CONTEXT_MAP_KEY;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -43,11 +46,23 @@ public DelegatingConnectionFactory(final ConnectionFactory<ResolvedAddress, C> d
this.delegate = requireNonNull(delegate);
}

@Deprecated
@Override
public Single<C> newConnection(final ResolvedAddress resolvedAddress, @Nullable final TransportObserver observer) {
return delegate.newConnection(resolvedAddress, observer);
}

@Override
public Single<C> newConnection(final ResolvedAddress resolvedAddress, @Nullable final ContextMap context,
@Nullable final TransportObserver observer) {
return Single.defer(() -> {
if (context != null) {
AsyncContext.put(CONNECTION_FACTORY_CONTEXT_MAP_KEY, context);
}
return newConnection(resolvedAddress, observer).shareContextOnSubscribe();
});
}

@Override
public Completable onClose() {
return delegate.onClose();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.client.api;

import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.transport.api.ExecutionStrategy;
import io.servicetalk.transport.api.TransportObserver;

import javax.annotation.Nullable;

import static io.servicetalk.context.api.ContextMap.Key.newKey;

@Deprecated // FIXME: 0.43 - remove as no longer required
final class DeprecatedToNewConnectionFactoryFilter<ResolvedAddress, C extends ListenableAsyncCloseable>
implements ConnectionFactoryFilter<ResolvedAddress, C> {

/**
* Key that propagates {@link ContextMap} argument between new and deprecated
* {@code ConnectionFactory#newConnection(...)} methods.
*/
public static final ContextMap.Key<ContextMap> CONNECTION_FACTORY_CONTEXT_MAP_KEY =
newKey("CONNECTION_FACTORY_CONTEXT_MAP_KEY", ContextMap.class);

@Override
public ConnectionFactory<ResolvedAddress, C> create(final ConnectionFactory<ResolvedAddress, C> original) {
return new DelegatingConnectionFactory<ResolvedAddress, C>(original) {
@Override
public Single<C> newConnection(final ResolvedAddress address, @Nullable final TransportObserver observer) {
return Single.defer(() -> {
final ContextMap context = AsyncContext.get(CONNECTION_FACTORY_CONTEXT_MAP_KEY);
return delegate().newConnection(address, context, observer).shareContextOnSubscribe();
});
}

@Override
public Single<C> newConnection(final ResolvedAddress resolvedAddress,
@Nullable final ContextMap context,
@Nullable final TransportObserver observer) {
return delegate().newConnection(resolvedAddress, context, observer);
}
};
}

@Override
public ExecutionStrategy requiredOffloads() {
return ExecutionStrategy.offloadNone();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SubscribableSingle;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.transport.api.ExecutionStrategy;
import io.servicetalk.transport.api.TransportObserver;

Expand Down Expand Up @@ -149,12 +150,13 @@ private LimitingFilter(final ConnectionFactory<ResolvedAddress, C> original,

@Override
public Single<C> newConnection(final ResolvedAddress resolvedAddress,
@Nullable final ContextMap context,
@Nullable final TransportObserver observer) {
return new SubscribableSingle<C>() {
@Override
protected void handleSubscribe(final Subscriber<? super C> subscriber) {
if (limiter.isConnectAllowed(resolvedAddress)) {
toSource(delegate().newConnection(resolvedAddress, observer))
toSource(delegate().newConnection(resolvedAddress, context, observer))
.subscribe(new CountingSubscriber<>(subscriber, limiter, resolvedAddress));
} else {
deliverErrorFromSource(subscriber, limiter.newConnectionRefusedException(resolvedAddress));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;

import java.net.SocketAddress;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nullable;

/**
* Given multiple {@link SocketAddress}es select the most desired {@link SocketAddress} to use. This is typically used
Expand All @@ -42,8 +44,31 @@ public interface LoadBalancer<C extends LoadBalancedConnection> extends Listenab
* {@link Single#failed(Throwable) failed Single} with {@link NoAvailableHostException} can be returned if no
* connection can be selected at this time or with {@link ConnectionRejectedException} if a newly created connection
* was rejected by the {@code selector} or this load balancer.
* @deprecated Use {@link #selectConnection(Predicate, ContextMap)}.
*/
Single<C> selectConnection(Predicate<C> selector);
@Deprecated
default Single<C> selectConnection(Predicate<C> selector) { // FIXME: 0.43 - remove deprecated method
throw new UnsupportedOperationException("LoadBalancer#selectConnection(Predicate) is not supported by " +
getClass());
}

/**
* Select the most appropriate connection for a request. Returned connection may be used concurrently for other
* requests.
*
* @param selector A {@link Function} that evaluates a connection for selection. This selector should return
* {@code null} if the connection <strong>MUST</strong> not be selected. This selector is guaranteed to be called
* for any connection that is returned from this method.
* @param context A {@link ContextMap context} of the caller (e.g. request context) or {@code null} if no context
* provided.
* @return a {@link Single} that completes with the most appropriate connection to use. A
* {@link Single#failed(Throwable) failed Single} with {@link NoAvailableHostException} can be returned if no
* connection can be selected at this time or with {@link ConnectionRejectedException} if a newly created connection
* was rejected by the {@code selector} or this load balancer.
*/
default Single<C> selectConnection(Predicate<C> selector, @Nullable ContextMap context) {
return selectConnection(selector); // FIXME: 0.43 - remove default impl
}

/**
* A {@link Publisher} of events provided by this {@link LoadBalancer}. This maybe used to broadcast internal state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@
*/
package io.servicetalk.client.api;

import io.servicetalk.context.api.ContextMap;

import java.util.function.Predicate;

/**
* A hint from {@link LoadBalancer#eventStream()} that the internal state of the {@link LoadBalancer} is ready such
* {@link LoadBalancer#selectConnection(Predicate)} is not likely to fail. Note that the return status of
* {@link LoadBalancer#selectConnection(Predicate)} may depend upon many factors including but not limited to:
* {@link LoadBalancer#selectConnection(Predicate, ContextMap)} is not likely to fail. Note that the return status of
* {@link LoadBalancer#selectConnection(Predicate, ContextMap)} may depend upon many factors including but not limited
* to:
* <ul>
* <li>Instantaneous demand vs the amount of resources (e.g. connections) on hand</li>
* <li>If the {@link LoadBalancer} favors queuing requests or "fail fast" behavior</li>
* <li>The dynamic nature of host availability may result in no hosts being available</li>
* </ul>
* This is meant to emphasize that {@link #isReady()} returning {@code true} doesn't necessarily mean
* {@link LoadBalancer#selectConnection(Predicate)} will always return successfully.
* {@link LoadBalancer#selectConnection(Predicate, ContextMap)} will always return successfully.
*/
public interface LoadBalancerReadyEvent {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.transport.api.ExecutionStrategy;
import io.servicetalk.transport.api.TransportObserver;

Expand Down Expand Up @@ -65,14 +66,15 @@ public ConnectionFactory<ResolvedAddress, C> create(final ConnectionFactory<Reso
return new DelegatingConnectionFactory<ResolvedAddress, C>(original) {
@Override
public Single<C> newConnection(final ResolvedAddress resolvedAddress,
@Nullable final ContextMap context,
@Nullable final TransportObserver originalObserver) {
final TransportObserver newObserver;
try {
newObserver = observerFactory.apply(resolvedAddress);
} catch (Throwable t) {
return failed(t);
}
return delegate().newConnection(resolvedAddress, originalObserver == null ? newObserver :
return delegate().newConnection(resolvedAddress, context, originalObserver == null ? newObserver :
newObserver == null ? originalObserver : combine(originalObserver, newObserver));
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.transport.api.TransportObserver;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -61,9 +62,10 @@ class FactoryOrder implements ConnectionFactory<InetSocketAddress, ListenableAsy

@Override
public Single<ListenableAsyncCloseable> newConnection(final InetSocketAddress unused,
@Nullable final ContextMap context,
@Nullable final TransportObserver observer) {
connectOrder.add(order);
return original.newConnection(unused, observer);
return original.newConnection(unused, context, observer);
}

@Override
Expand Down Expand Up @@ -101,6 +103,7 @@ public ConnectionFactory<InetSocketAddress, ListenableAsyncCloseable> create(
new ConnectionFactory<InetSocketAddress, ListenableAsyncCloseable>() {
@Override
public Single<ListenableAsyncCloseable> newConnection(final InetSocketAddress unused,
@Nullable final ContextMap context,
@Nullable final TransportObserver observer) {
return Single.succeeded(DUMMY_CLOSABLE);
}
Expand All @@ -118,8 +121,8 @@ public Completable closeAsync() {

ConnectionFactory<InetSocketAddress, ListenableAsyncCloseable> factory = combined.create(root);

ListenableAsyncCloseable connection = factory.newConnection(mock(InetSocketAddress.class),
null).toFuture().get();
ListenableAsyncCloseable connection = factory.newConnection(mock(InetSocketAddress.class), null, null)
.toFuture().get();

assertThat(connection, is(sameInstance(DUMMY_CLOSABLE)));
assertThat(createOrder, is(hasSize(2)));
Expand Down
Loading

0 comments on commit 4141e33

Please sign in to comment.