Skip to content

Commit

Permalink
RoundRobinLoadBalancer: further logging enhancements (#1870)
Browse files Browse the repository at this point in the history
Motivation:

There are still a few gaps in RRLB logging.

Modifications:

- Add a counter for `targetResource` to distinguish different clients
for the same target host;
- Enhance `RoundRobinLoadBalancer#toString()` to include all hosts;
- Use consistent "Load balancer for" pattern for all log events;
- Avoid double logging when health check passes (it also logs that it
cancelled);
- Log the full `host` state when health check is cancelled to let us
know its new state;
- Log connection id if we close a new connection when health check
finishes;
- Log if the health check terminated with an unexpected error;
- Fix order of log arguments when we observe connect failure;

Result:

Better visibility into RRLB execution.
  • Loading branch information
idelpivnitskiy authored Oct 8, 2021
1 parent 8c007d7 commit f9558ef
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_INTERVAL;
import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.EAGER_CONNECTION_SHUTDOWN_ENABLED;
import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.FACTORY_COUNT;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -166,8 +167,8 @@ public RoundRobinLoadBalancer(final Publisher<? extends ServiceDiscovererEvent<R
final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory,
final boolean eagerConnectionShutdown,
@Nullable final HealthCheckConfig healthCheckConfig) {
this("unknown", eventPublisher.map(Collections::singletonList), connectionFactory,
eagerConnectionShutdown, healthCheckConfig);
this("unknown#" + FACTORY_COUNT.incrementAndGet(), eventPublisher.map(Collections::singletonList),
connectionFactory, eagerConnectionShutdown, healthCheckConfig);
}

/**
Expand Down Expand Up @@ -213,8 +214,8 @@ public void onSubscribe(final Subscription s) {
@Override
public void onNext(final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
LOGGER.debug("Load balancer {}: Received new ServiceDiscoverer event {}.",
RoundRobinLoadBalancer.this, event);
LOGGER.debug("Load balancer for {}: received new ServiceDiscoverer event {}.",
targetResource, event);

@SuppressWarnings("unchecked")
final List<Host<ResolvedAddress, C>> usedAddresses =
Expand Down Expand Up @@ -248,8 +249,8 @@ public void onNext(final Collection<? extends ServiceDiscovererEvent<ResolvedAdd
}
});

LOGGER.debug("Load balancer {}: Now using {} addresses: {}.",
RoundRobinLoadBalancer.this, usedAddresses.size(), usedAddresses);
LOGGER.debug("Load balancer for {}: now using {} addresses: {}.",
targetResource, usedAddresses.size(), usedAddresses);

if (event.isAvailable()) {
if (usedAddresses.size() == 1) {
Expand Down Expand Up @@ -340,16 +341,16 @@ public void onError(final Throwable t) {
List<Host<ResolvedAddress, C>> hosts = usedHosts;
eventStreamProcessor.onError(t);
LOGGER.error(
"Load balancer {}. Service discoverer {} emitted an error. Last seen addresses (size {}) {}",
RoundRobinLoadBalancer.this, eventPublisher, hosts.size(), hosts, t);
"Load balancer for {}: service discoverer {} emitted an error. Last seen addresses (size {}): {}",
targetResource, eventPublisher, hosts.size(), hosts, t);
}

@Override
public void onComplete() {
List<Host<ResolvedAddress, C>> hosts = usedHosts;
eventStreamProcessor.onComplete();
LOGGER.error("Load balancer {}. Service discoverer {} completed. Last seen addresses (size {}) {}",
RoundRobinLoadBalancer.this, eventPublisher, hosts.size(), hosts);
LOGGER.error("Load balancer for {}: service discoverer {} completed. Last seen addresses (size {}): {}",
targetResource, eventPublisher, hosts.size(), hosts);
}
});
asyncCloseable = toAsyncCloseable(graceful -> {
Expand Down Expand Up @@ -395,6 +396,7 @@ public Publisher<Object> eventStream() {
public String toString() {
return "RoundRobinLoadBalancer{" +
"targetResource='" + targetResource + '\'' +
", usedHosts=" + usedHosts +
'}';
}

Expand Down Expand Up @@ -440,9 +442,8 @@ private Single<C> selectConnection0(Predicate<C> selector) {
}
}
if (pickedHost == null) {
return failed(StacklessNoAvailableHostException.newInstance(
"Failed to pick an active host for " + targetResource
+ ". Either all are busy or all are expired: " + usedHosts,
return failed(StacklessNoAvailableHostException.newInstance("Failed to pick an active host for " +
targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts,
RoundRobinLoadBalancer.class, "selectConnection0(...)"));
}
// No connection was selected: create a new one.
Expand Down Expand Up @@ -522,8 +523,8 @@ public <T extends C> LoadBalancer<T> newLoadBalancer(
final String targetResource,
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new RoundRobinLoadBalancer<>(targetResource, eventPublisher, connectionFactory,
EAGER_CONNECTION_SHUTDOWN_ENABLED,
return new RoundRobinLoadBalancer<>(requireNonNull(targetResource) + '#' + FACTORY_COUNT.incrementAndGet(),
eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED,
new HealthCheckConfig(SharedExecutor.getInstance(),
DEFAULT_HEALTH_CHECK_INTERVAL, DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD));
}
Expand Down Expand Up @@ -592,7 +593,7 @@ void markClosed() {
final ConnState oldState = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE);
final Object[] toRemove = oldState.connections;
cancelIfHealthCheck(oldState.state);
LOGGER.debug("Load balancer for {}: Closing {} connection(s) gracefully to closed address: {}.",
LOGGER.debug("Load balancer for {}: closing {} connection(s) gracefully to the closed address: {}.",
targetResource, toRemove.length, address);
for (Object conn : toRemove) {
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -621,7 +622,7 @@ void markExpired() {
}
}

void markHealthy() {
void markHealthy(final HealthCheck<Addr, C> successfulHealthCheckState) {
// Marking healthy is generally called from a successful health check, after a connection was added.
// However, it is possible that in the meantime, the host entered an EXPIRED state, then ACTIVE, then failed
// to open connections and entered the UNHEALTHY state before the original thread continues execution here.
Expand All @@ -635,7 +636,9 @@ void markHealthy() {
}
return previous;
}).state;
cancelIfHealthCheck(oldState);
if (oldState != successfulHealthCheckState) {
cancelIfHealthCheck(oldState);
}
}

void markUnhealthy(final Throwable cause, final ConnectionFactory<Addr, ? extends C> connectionFactory) {
Expand All @@ -655,8 +658,8 @@ void markUnhealthy(final Throwable cause, final ConnectionFactory<Addr, ? extend
if (connStateUpdater.compareAndSet(this, previous,
new ConnState(previous.connections, nextState))) {
LOGGER.debug("Load balancer for {}: failed to open a new connection to the host on address {}" +
" {} times ({} consecutive failures will trigger health check).",
targetResource, nextState.failedConnections, address,
" {} time(s) ({} consecutive failures will trigger health check).",
targetResource, address, nextState.failedConnections,
healthCheckConfig.failedThreshold, cause);
break;
}
Expand All @@ -668,7 +671,7 @@ void markUnhealthy(final Throwable cause, final ConnectionFactory<Addr, ? extend
final ConnState nextState = new ConnState(previous.connections, healthCheck);
if (connStateUpdater.compareAndSet(this, previous, nextState)) {
LOGGER.debug("Load balancer for {}: failed to open a new connection to the host on address {}" +
" {} times. Threshold reached, triggering health check for this host.",
" {} time(s). Threshold reached, triggering health check for this host.",
targetResource, address, healthCheckConfig.failedThreshold, cause);
healthCheck.schedule();
break;
Expand Down Expand Up @@ -793,8 +796,7 @@ private void cancelIfHealthCheck(Object o) {
if (HealthCheck.class.equals(o.getClass())) {
@SuppressWarnings("unchecked")
HealthCheck<Addr, C> healthCheck = (HealthCheck<Addr, C>) o;
LOGGER.debug("Load balancer for {}: Health check for address {} cancelled.",
targetResource, healthCheck.host.address);
LOGGER.debug("Load balancer for {}: health check cancelled for {}.", targetResource, healthCheck.host);
healthCheck.cancel();
}
}
Expand Down Expand Up @@ -856,27 +858,30 @@ public void schedule() {
cause -> cause == RESCHEDULE_SIGNAL,
host.healthCheckConfig.healthCheckInterval,
host.healthCheckConfig.executor)))
.whenOnError(t -> LOGGER.error("Load balancer for {}: health check terminated with " +
"an unexpected error for {}.", host.targetResource, host, t))
.subscribe());
}

public Completable reconnect() {
return connectionFactory.newConnection(host.address, null)
.onErrorMap(cause -> {
LOGGER.debug("Load balancer for {}: Health check failed for address {}.",
host.targetResource, host.address, cause);
LOGGER.debug("Load balancer for {}: health check failed for {}.",
host.targetResource, host, cause);
return RESCHEDULE_SIGNAL;
})
.flatMapCompletable(newCnx -> {
if (host.addConnection(newCnx)) {
LOGGER.debug("Load balancer for {}: Health check passed for address {}.",
host.targetResource, host.address);
host.markHealthy();
host.markHealthy(this);
LOGGER.debug("Load balancer for {}: health check passed for {}.",
host.targetResource, host);
return completed();
} else {
LOGGER.debug("Load balancer for {}: Health check finished for address {}." +
" Host rejected connection.", host.targetResource, host.address);
LOGGER.debug("Load balancer for {}: health check finished for {}, but the host " +
"rejected a new connection {}. Closing it now.",
host.targetResource, host, newCnx);
return newCnx.closeAsync();
}
return completed();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -63,6 +64,7 @@
public final class RoundRobinLoadBalancerFactory<ResolvedAddress, C extends LoadBalancedConnection>
implements LoadBalancerFactory<ResolvedAddress, C> {

static final AtomicInteger FACTORY_COUNT = new AtomicInteger();
static final boolean EAGER_CONNECTION_SHUTDOWN_ENABLED = true;
static final Duration DEFAULT_HEALTH_CHECK_INTERVAL = Duration.ofSeconds(1);
static final int DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD = 5; // higher than default for AutoRetryStrategy
Expand Down Expand Up @@ -91,8 +93,8 @@ public <T extends C> LoadBalancer<T> newLoadBalancer(
final String targetResource,
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new RoundRobinLoadBalancer<>(
targetResource, eventPublisher, connectionFactory, eagerConnectionShutdown, healthCheckConfig);
return new RoundRobinLoadBalancer<>(requireNonNull(targetResource) + '#' + FACTORY_COUNT.incrementAndGet(),
eventPublisher, connectionFactory, eagerConnectionShutdown, healthCheckConfig);
}

/**
Expand Down

0 comments on commit f9558ef

Please sign in to comment.