Skip to content

Commit

Permalink
Merge pull request #23826 from michalszynkiewicz/collect-grpc-stats-f…
Browse files Browse the repository at this point in the history
…or-stork

gRPC - gather call statistics
  • Loading branch information
michalszynkiewicz authored Feb 23, 2022
2 parents 9755810 + 8d0d534 commit 7231534
Show file tree
Hide file tree
Showing 30 changed files with 659 additions and 63 deletions.
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<smallrye-reactive-types-converter.version>2.6.0</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>2.18.1</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>3.14.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>1.0.0</smallrye-stork.version>
<smallrye-stork.version>1.1.0</smallrye-stork.version>
<jakarta.activation.version>1.2.1</jakarta.activation.version>
<jakarta.annotation-api.version>1.3.5</jakarta.annotation-api.version>
<jakarta.el-impl.version>3.0.4</jakarta.el-impl.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.quarkus.grpc.runtime.ClientInterceptorStorage;
import io.quarkus.grpc.runtime.GrpcClientInterceptorContainer;
import io.quarkus.grpc.runtime.GrpcClientRecorder;
import io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor;
import io.quarkus.grpc.runtime.supports.Channels;
import io.quarkus.grpc.runtime.supports.GrpcClientConfigProvider;
import io.quarkus.grpc.runtime.supports.IOThreadClientInterceptor;
Expand All @@ -88,6 +89,11 @@ void registerBeans(BuildProducer<AdditionalBeanBuildItem> beans) {
GrpcClientInterceptorContainer.class, IOThreadClientInterceptor.class).build());
}

@BuildStep
void registerStorkInterceptor(BuildProducer<AdditionalBeanBuildItem> beans) {
beans.produce(new AdditionalBeanBuildItem(StorkMeasuringGrpcInterceptor.class));
}

@BuildStep
void discoverInjectedClients(BeanDiscoveryFinishedBuildItem beanDiscovery,
BuildProducer<GrpcClientBuildItem> clients,
Expand Down Expand Up @@ -385,6 +391,8 @@ SyntheticBeanBuildItem clientInterceptorStorage(GrpcClientRecorder recorder, Rec
globalInterceptors.add(recorderContext.classProxy(globalInterceptor));
}

// it's okay if this one is not used:
superfluousInterceptors.remove(StorkMeasuringGrpcInterceptor.class.getName());
if (!superfluousInterceptors.isEmpty()) {
LOGGER.warnf("At least one unused gRPC client interceptor found: %s. If there are meant to be used globally, " +
"annotate them with @GlobalInterceptor.", String.join(", ", superfluousInterceptors));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import io.quarkus.grpc.runtime.GrpcContainer;
import io.quarkus.grpc.runtime.GrpcServerRecorder;
import io.quarkus.grpc.runtime.ServerInterceptorStorage;
import io.quarkus.grpc.runtime.config.GrpcClientBuildTimeConfig;
import io.quarkus.grpc.runtime.config.GrpcConfiguration;
import io.quarkus.grpc.runtime.config.GrpcServerBuildTimeConfig;
import io.quarkus.grpc.runtime.health.GrpcHealthEndpoint;
Expand Down Expand Up @@ -525,8 +526,8 @@ ExtensionSslNativeSupportBuildItem extensionSslNativeSupport() {

@BuildStep
@Record(ExecutionTime.STATIC_INIT)
void setUpStork(GrpcStorkRecorder storkRecorder) {
storkRecorder.init();
void setUpStork(GrpcStorkRecorder storkRecorder, GrpcClientBuildTimeConfig config) {
storkRecorder.init(config.storkProactiveConnections);
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ public void initializeGrpcServer(RuntimeValue<Vertx> vertxSupplier,
}
Vertx vertx = vertxSupplier.getValue();
if (hasNoServices(grpcContainer.getServices()) && LaunchMode.current() != LaunchMode.DEVELOPMENT) {
throw new IllegalStateException(
"Unable to find beans exposing the `BindableService` interface - not starting the gRPC server");
LOGGER.error("Unable to find beans exposing the `BindableService` interface - not starting the gRPC server");
}

this.blockingMethodsPerService = blockingMethodsPerServiceImplementationClass;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.quarkus.grpc.runtime.config;

import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;

@ConfigRoot(phase = ConfigPhase.BUILD_TIME)
public class GrpcClientBuildTimeConfig {

/**
* If set to true, and a Stork load balancer is used, connections with all available service instances will be
* requested proactively. This means better load balancing at the cost of having multiple active connections.
*/
@ConfigItem(defaultValue = "true")
public boolean storkProactiveConnections;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor.STORK_MEASURE_TIME;
import static io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor.STORK_SERVICE_INSTANCE;

import java.util.Collections;
import java.util.Comparator;
Expand All @@ -23,11 +25,22 @@
import io.grpc.Status;
import io.grpc.internal.JsonUtil;
import io.smallrye.stork.Stork;
import io.smallrye.stork.api.Service;
import io.smallrye.stork.api.ServiceInstance;

public class GrpcLoadBalancerProvider extends LoadBalancerProvider {
private static final Logger log = Logger.getLogger(GrpcLoadBalancerProvider.class);

private final boolean requestConnections;

/**
* @param requestConnections if true, the load balancer will proactively request connections from available channels.
* This leads to better load balancing at the cost of keeping active connections.
*/
public GrpcLoadBalancerProvider(boolean requestConnections) {
this.requestConnections = requestConnections;
}

@Override
public boolean isAvailable() {
return true;
Expand Down Expand Up @@ -102,6 +115,8 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {
log.error("gRPC Sub Channel failed", status == null ? null : status.getCause());
helper.refreshNameResolution();
}
log.debugf("subchannel changed state to %s for %s", stateInfo.getState(),
serviceInstance.getId());
switch (stateInfo.getState()) {
case READY:
activeSubchannels.add(serviceInstance);
Expand All @@ -114,7 +129,6 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {
case IDLE:
case SHUTDOWN:
activeSubchannels.remove(serviceInstance);
log.debugf("subchannel changed state to %s", stateInfo.getState());
if (activeSubchannels.isEmpty()
&& state.compareAndSet(ConnectivityState.READY, stateInfo.getState())) {
helper.updateBalancingState(state.get(), picker);
Expand All @@ -123,6 +137,9 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {
}
}
});
if (requestConnections) {
subchannel.requestConnection();
}
subChannels.put(serviceInstance, subchannel);
}

Expand Down Expand Up @@ -152,31 +169,39 @@ static class StorkLoadBalancerConfig {
static class StorkSubchannelPicker extends LoadBalancer.SubchannelPicker {
private final Map<ServiceInstance, LoadBalancer.Subchannel> subChannels;
private final String serviceName;
private final Set<ServiceInstance> activeServerInstances;
private final Set<ServiceInstance> activeServiceInstances;

StorkSubchannelPicker(Map<ServiceInstance, LoadBalancer.Subchannel> subChannels,
String serviceName, Set<ServiceInstance> activeServerInstances) {
String serviceName, Set<ServiceInstance> activeServiceInstances) {
this.subChannels = subChannels;
this.serviceName = serviceName;
this.activeServerInstances = activeServerInstances;
this.activeServiceInstances = activeServiceInstances;
}

@Override
public LoadBalancer.PickResult pickSubchannel(LoadBalancer.PickSubchannelArgs args) {
ServiceInstance serviceInstance = pickServerInstance();

Boolean measureTime = STORK_MEASURE_TIME.get();
measureTime = measureTime != null && measureTime;
ServiceInstance serviceInstance = pickServerInstance(measureTime);
LoadBalancer.Subchannel subchannel = subChannels.get(serviceInstance);
return LoadBalancer.PickResult.withSubchannel(subchannel);

if (serviceInstance.gatherStatistics() && STORK_SERVICE_INSTANCE.get() != null) {
STORK_SERVICE_INSTANCE.get().set(serviceInstance);
return LoadBalancer.PickResult.withSubchannel(subchannel);
} else {
return LoadBalancer.PickResult.withSubchannel(subchannel);
}
}

private ServiceInstance pickServerInstance() {
io.smallrye.stork.api.LoadBalancer lb = Stork.getInstance().getService(serviceName).getLoadBalancer();
private ServiceInstance pickServerInstance(boolean measureTime) {
Service service = Stork.getInstance().getService(serviceName);

Set<ServiceInstance> toChooseFrom = this.activeServerInstances;
if (activeServerInstances.isEmpty()) {
Set<ServiceInstance> toChooseFrom = this.activeServiceInstances;
if (activeServiceInstances.isEmpty()) {
toChooseFrom = subChannels.keySet();
log.debugf("no active service instances, using all subChannels: %s", toChooseFrom);
}
return lb.selectServiceInstance(toChooseFrom);
return service.selectInstanceAndRecordStart(toChooseFrom, measureTime);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

@Recorder
public class GrpcStorkRecorder {
public void init() {
public void init(boolean proactiveConnections) {
NameResolverRegistry.getDefaultRegistry().register(new GrpcStorkServiceDiscovery());
LoadBalancerRegistry.getDefaultRegistry().register(new GrpcLoadBalancerProvider());
LoadBalancerRegistry.getDefaultRegistry().register(new GrpcLoadBalancerProvider(proactiveConnections));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.quarkus.grpc.runtime.stork;

import java.util.concurrent.atomic.AtomicReference;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.spi.Prioritized;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.smallrye.stork.api.ServiceInstance;

@ApplicationScoped
public class StorkMeasuringGrpcInterceptor implements ClientInterceptor, Prioritized {

public static final Context.Key<AtomicReference<ServiceInstance>> STORK_SERVICE_INSTANCE = Context
.key("stork.service-instance");
public static final Context.Key<Boolean> STORK_MEASURE_TIME = Context.key("stork.measure-time");

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
Channel next) {
return new StorkMeasuringCall<>(next.newCall(method, callOptions), method.getType());
}

@Override
public int getPriority() {
return Integer.MAX_VALUE - 100;
}

private static class StorkMeasuringCall<ReqT, RespT> extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> {
ServiceInstance serviceInstance;
final boolean recordTime;

protected StorkMeasuringCall(ClientCall<ReqT, RespT> delegate,
MethodDescriptor.MethodType type) {
super(delegate);
this.recordTime = type == MethodDescriptor.MethodType.UNARY;
}

@Override
public void start(final ClientCall.Listener<RespT> responseListener, final Metadata metadata) {
Context context = Context.current().withValues(STORK_SERVICE_INSTANCE, new AtomicReference<>(),
STORK_MEASURE_TIME, recordTime);
Context oldContext = context.attach();
try {
super.start(new StorkMeasuringCallListener<>(responseListener, this), metadata);
serviceInstance = STORK_SERVICE_INSTANCE.get().get();
} finally {
context.detach(oldContext);
}
}

void recordReply() {
if (serviceInstance != null && recordTime) {
serviceInstance.recordReply();
}
}

void recordEnd(Throwable error) {
if (serviceInstance != null) {
serviceInstance.recordEnd(error);
}
}
}

private static class StorkMeasuringCallListener<RespT>
extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {
final StorkMeasuringCall<?, ?> collector;

public StorkMeasuringCallListener(ClientCall.Listener<RespT> responseListener, StorkMeasuringCall<?, ?> collector) {
super(responseListener);
this.collector = collector;
}

@Override
public void onMessage(RespT message) {
collector.recordReply();
super.onMessage(message);
}

@Override
public void onClose(Status status, Metadata trailers) {
Exception error = null;
if (!status.isOk()) {
error = status.asException(trailers);
}
collector.recordEnd(error);
super.onClose(status, trailers);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
Expand Down Expand Up @@ -41,6 +42,7 @@
import io.quarkus.grpc.runtime.config.GrpcClientConfiguration;
import io.quarkus.grpc.runtime.config.GrpcServerConfiguration;
import io.quarkus.grpc.runtime.config.SslClientConfig;
import io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.util.ClassPathUtils;
import io.smallrye.mutiny.infrastructure.Infrastructure;
Expand Down Expand Up @@ -122,8 +124,10 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto

String loadBalancingPolicy = config.loadBalancingPolicy;

boolean stork = false;
if (Stork.STORK.equalsIgnoreCase(nameResolver)) {
loadBalancingPolicy = Stork.STORK;
stork = true;
}

NettyChannelBuilder builder = NettyChannelBuilder
Expand Down Expand Up @@ -187,6 +191,10 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
// Client-side interceptors
GrpcClientInterceptorContainer interceptorContainer = Arc.container()
.instance(GrpcClientInterceptorContainer.class).get();
if (stork) {
perClientInterceptors = new HashSet<>(perClientInterceptors);
perClientInterceptors.add(StorkMeasuringGrpcInterceptor.class.getName());
}
interceptorContainer.getSortedPerServiceInterceptors(perClientInterceptors).forEach(builder::intercept);
interceptorContainer.getSortedGlobalInterceptors().forEach(builder::intercept);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
</dependency>
<dependency>
<groupId>io.smallrye.stork</groupId>
<artifactId>stork-load-balancer-response-time</artifactId>
<artifactId>stork-load-balancer-least-response-time</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down

This file was deleted.

4 changes: 4 additions & 0 deletions extensions/smallrye-stork/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-deployment</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Loading

0 comments on commit 7231534

Please sign in to comment.