Skip to content

Commit

Permalink
Allow injecting load balancer instead of creating with reflection (#1641
Browse files Browse the repository at this point in the history
)
  • Loading branch information
jguerra authored Sep 14, 2023
1 parent 3691b1f commit 7ff79a3
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class DefaultClientChannelManager implements ClientChannelManager {

public static final String METRIC_PREFIX = "connectionpool";

private final Resolver <? extends DiscoveryResult> dynamicServerResolver;
private final Resolver<DiscoveryResult> dynamicServerResolver;
private final ConnectionPoolConfig connPoolConfig;
private final IClientConfig clientConfig;
private final Registry spectatorRegistry;
Expand Down Expand Up @@ -100,40 +100,12 @@ public class DefaultClientChannelManager implements ClientChannelManager {

public DefaultClientChannelManager(
OriginName originName, IClientConfig clientConfig, Registry spectatorRegistry) {
this.originName = Objects.requireNonNull(originName, "originName");
this.dynamicServerResolver = new DynamicServerResolver(clientConfig, new ServerPoolListener());

String metricId = originName.getMetricId();

this.clientConfig = clientConfig;
this.spectatorRegistry = spectatorRegistry;
this.perServerPools = new ConcurrentHashMap<>(200);

this.connPoolConfig = new ConnectionPoolConfigImpl(originName, this.clientConfig);

this.createNewConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create", metricId);
this.createConnSucceededCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create_success", metricId);
this.createConnFailedCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create_fail", metricId);

this.closeConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_close", metricId);
this.closeAbovePoolHighWaterMarkCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeAbovePoolHighWaterMark", metricId);
this.closeExpiredConnLifetimeCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeExpiredConnLifetime", metricId);
this.requestConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_request", metricId);
this.reuseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_reuse", metricId);
this.releaseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_release", metricId);
this.alreadyClosedCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_alreadyClosed", metricId);
this.connTakenFromPoolIsNotOpen = SpectatorUtils.newCounter(METRIC_PREFIX + "_fromPoolIsClosed", metricId);
this.maxConnsPerHostExceededCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_maxConnsPerHostExceeded", metricId);
this.closeWrtBusyConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeWrtBusyConnCounter", metricId);
this.connEstablishTimer = PercentileTimer.get(spectatorRegistry, spectatorRegistry.createId(METRIC_PREFIX + "_createTiming", "id", metricId));
this.connsInPool = SpectatorUtils.newGauge(METRIC_PREFIX + "_inPool", metricId, new AtomicInteger());
this.connsInUse = SpectatorUtils.newGauge(METRIC_PREFIX + "_inUse", metricId, new AtomicInteger());
this(originName, clientConfig, new DynamicServerResolver(clientConfig), spectatorRegistry);
}

@VisibleForTesting
public DefaultClientChannelManager(
OriginName originName, IClientConfig clientConfig,
Resolver<? extends DiscoveryResult> resolver, Registry spectatorRegistry) {
Resolver<DiscoveryResult> resolver, Registry spectatorRegistry) {
this.originName = Objects.requireNonNull(originName, "originName");
this.dynamicServerResolver = resolver;

Expand All @@ -151,7 +123,7 @@ public DefaultClientChannelManager(

this.closeConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_close", metricId);
this.closeAbovePoolHighWaterMarkCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeAbovePoolHighWaterMark", metricId);
this.closeExpiredConnLifetimeCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "__closeExpiredConnLifetime", metricId);
this.closeExpiredConnLifetimeCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeExpiredConnLifetime", metricId);
this.requestConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_request", metricId);
this.reuseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_reuse", metricId);
this.releaseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_release", metricId);
Expand All @@ -167,6 +139,7 @@ public DefaultClientChannelManager(
@Override
public void init()
{
dynamicServerResolver.setListener(new ServerPoolListener());
// Load channel initializer and conn factory.
// We don't do this within the constructor because some subclass may not be initialized until post-construct.
this.channelInitializer = createChannelInitializer(clientConfig, connPoolConfig, spectatorRegistry);
Expand Down Expand Up @@ -413,7 +386,6 @@ protected IConnectionPool createConnectionPool(
}

final class ServerPoolListener implements ResolverListener<DiscoveryResult> {

@Override
public void onChange(List<DiscoveryResult> removedSet) {
if (!removedSet.isEmpty()) {
Expand All @@ -427,7 +399,6 @@ public void onChange(List<DiscoveryResult> removedSet) {
}
}
}

}

@Override
Expand Down Expand Up @@ -477,4 +448,5 @@ static SocketAddress pickAddressInternal(ResolverResult chosenServer, @Nullable
protected SocketAddress pickAddress(DiscoveryResult chosenServer) {
return pickAddressInternal(chosenServer, connPoolConfig.getOriginName());
}

}
2 changes: 1 addition & 1 deletion zuul-discovery/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dependencies {
implementation libraries.guava
implementation libraries.slf4j

implementation "com.netflix.ribbon:ribbon-loadbalancer:${versions_ribbon}"
api "com.netflix.ribbon:ribbon-loadbalancer:${versions_ribbon}"
implementation "com.netflix.ribbon:ribbon-core:${versions_ribbon}"
implementation "com.netflix.ribbon:ribbon-eureka:${versions_ribbon}"
implementation "com.netflix.ribbon:ribbon-archaius:${versions_ribbon}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import com.netflix.zuul.resolver.ResolverListener;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Argha C
Expand All @@ -40,15 +43,37 @@
*/
public class DynamicServerResolver implements Resolver<DiscoveryResult> {

private static final Logger LOG = LoggerFactory.getLogger(DynamicServerResolver.class);

private final DynamicServerListLoadBalancer<?> loadBalancer;
ResolverListener<DiscoveryResult> listener;
private ResolverListener<DiscoveryResult> listener;

@Deprecated
public DynamicServerResolver(IClientConfig clientConfig, ResolverListener<DiscoveryResult> listener) {
this.loadBalancer = createLoadBalancer(clientConfig);
this.loadBalancer.addServerListChangeListener(this::onUpdate);
this.listener = listener;
}

public DynamicServerResolver(IClientConfig clientConfig) {
this(createLoadBalancer(clientConfig));
}

public DynamicServerResolver(DynamicServerListLoadBalancer<?> loadBalancer) {
this.loadBalancer = Objects.requireNonNull(loadBalancer);
}

@Override
public void setListener(ResolverListener<DiscoveryResult> listener) {
if(this.listener != null) {
LOG.warn("Ignoring call to setListener, because a listener was already set");
return;
}

this.listener = Objects.requireNonNull(listener);
this.loadBalancer.addServerListChangeListener(this::onUpdate);
}

@Override
public DiscoveryResult resolve(@Nullable Object key) {
final Server server = loadBalancer.chooseServer(key);
Expand All @@ -65,7 +90,7 @@ public void shutdown() {
loadBalancer.shutdown();
}

private DynamicServerListLoadBalancer<?> createLoadBalancer(IClientConfig clientConfig) {
private static DynamicServerListLoadBalancer<?> createLoadBalancer(IClientConfig clientConfig) {
//TODO(argha-c): Revisit this style of LB initialization post modularization. Ideally the LB should be pluggable.

// Use a hard coded string for the LB default name to avoid a dependency on Ribbon classes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ public interface Resolver<T> {
* hook to perform activities on shutdown
*/
void shutdown();

default void setListener(ResolverListener<T> listener) { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public List<DiscoveryResult> updatedList() {
}

final CustomListener listener = new CustomListener();
final DynamicServerResolver resolver = new DynamicServerResolver(new DefaultClientConfigImpl(), listener);
final DynamicServerResolver resolver = new DynamicServerResolver(new DefaultClientConfigImpl());
resolver.setListener(listener);

final InstanceInfo first = Builder.newBuilder()
.setAppName("zuul-discovery-1")
Expand All @@ -73,11 +74,7 @@ public List<DiscoveryResult> updatedList() {

@Test
void properSentinelValueWhenServersUnavailable() {
final DynamicServerResolver resolver = new DynamicServerResolver(new DefaultClientConfigImpl(), new ResolverListener<DiscoveryResult>() {
@Override
public void onChange(List<DiscoveryResult> removedSet) {
}
});
final DynamicServerResolver resolver = new DynamicServerResolver(new DefaultClientConfigImpl());

final DiscoveryResult nonExistentServer = resolver.resolve(null);

Expand Down

0 comments on commit 7ff79a3

Please sign in to comment.