Skip to content

Commit

Permalink
Split xDS support into diff module - due to grpc-netty-shaded native …
Browse files Browse the repository at this point in the history
…issue
  • Loading branch information
alesj committed Dec 11, 2022
1 parent 91cb42a commit d0894e9
Show file tree
Hide file tree
Showing 11 changed files with 338 additions and 113 deletions.
5 changes: 5 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1906,6 +1906,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc-xds</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc-api</artifactId>
Expand Down
1 change: 1 addition & 0 deletions extensions/grpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@
<module>stubs</module>
<module>deployment</module>
<module>runtime</module>
<module>xds</module>
</modules>
</project>
30 changes: 0 additions & 30 deletions extensions/grpc/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,36 +79,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-xds</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.android</groupId>
<artifactId>annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -31,32 +30,27 @@

import grpc.health.v1.HealthOuterClass;
import io.grpc.BindableService;
import io.grpc.InsecureServerCredentials;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.xds.XdsServerBuilder;
import io.grpc.xds.XdsServerCredentials;
import io.quarkus.arc.Arc;
import io.quarkus.arc.Subclass;
import io.quarkus.grpc.runtime.config.GrpcConfiguration;
import io.quarkus.grpc.runtime.config.GrpcServerConfiguration;
import io.quarkus.grpc.runtime.config.GrpcServerNettyConfig;
import io.quarkus.grpc.runtime.config.Xds;
import io.quarkus.grpc.runtime.devmode.DevModeInterceptor;
import io.quarkus.grpc.runtime.devmode.GrpcHotReplacementInterceptor;
import io.quarkus.grpc.runtime.devmode.GrpcServerReloader;
import io.quarkus.grpc.runtime.devmode.XdsServerReloader;
import io.quarkus.grpc.runtime.health.GrpcHealthStorage;
import io.quarkus.grpc.runtime.reflection.ReflectionService;
import io.quarkus.grpc.runtime.supports.CompressionInterceptor;
import io.quarkus.grpc.runtime.supports.blocking.BlockingServerInterceptor;
import io.quarkus.grpc.spi.GrpcBuilderProvider;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.QuarkusBindException;
import io.quarkus.runtime.RuntimeValue;
Expand All @@ -68,8 +62,6 @@
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.VertxInternal;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
Expand Down Expand Up @@ -428,34 +420,15 @@ public RuntimeValue<ServerInterceptorStorage> initServerInterceptorStorage(
private Map.Entry<Integer, Server> buildServer(Vertx vertx, GrpcServerConfiguration configuration,
Map<String, List<String>> blockingMethodsPerService, GrpcContainer grpcContainer, LaunchMode launchMode) {

Xds xds = configuration.xds;
boolean isXDS = xds != null && xds.enabled;

int port = launchMode == LaunchMode.TEST ? configuration.testPort : configuration.port;

AtomicBoolean usePlainText = new AtomicBoolean();

GrpcBuilderProvider provider = GrpcBuilderProvider.findServerBuilderProvider(configuration);

ServerBuilder builder;
if (isXDS) {
ServerCredentials credentials = InsecureServerCredentials.create();
if (xds.secure) {
credentials = XdsServerCredentials.create(credentials);
}
builder = XdsServerBuilder.forPort(port, credentials);
// wrap with Vert.x context, so that the context interceptors work
VertxInternal vxi = (VertxInternal) vertx;
Executor delegate = vertx.nettyEventLoopGroup();
EventLoopContext context = vxi.createEventLoopContext();
Executor executor = command -> delegate.execute(() -> {
context.dispatch(command);
});
builder.executor(executor);
// custom XDS interceptors
if (launchMode == LaunchMode.DEVELOPMENT) {
builder.intercept(new DevModeInterceptor(Thread.currentThread().getContextClassLoader()));
// TODO -- see GrpcHotReplacementInterceptor usage -- OK?
builder.intercept(new GrpcHotReplacementInterceptor());
}
if (provider != null) {
builder = provider.createServerBuilder(vertx, configuration, launchMode);
} else {
VertxServerBuilder vsBuilder = VertxServerBuilder.forAddress(vertx, configuration.host, port);
// add Vert.x specific stuff here
Expand Down Expand Up @@ -514,7 +487,7 @@ private Map.Entry<Integer, Server> buildServer(Vertx vertx, GrpcServerConfigurat

LOGGER.debugf("Starting gRPC Server on %s:%d [%s] ...",
configuration.host, port,
isXDS ? "XDS enabled" : "SSL enabled: " + !usePlainText.get());
provider != null ? provider.serverInfo() : "SSL enabled: " + !usePlainText.get());

return new AbstractMap.SimpleEntry<>(port, builder.build());
}
Expand Down Expand Up @@ -658,24 +631,4 @@ public void run(Runnable command) {
}
}
}

private static class DevModeInterceptor implements ServerInterceptor {
private final ClassLoader classLoader;

public DevModeInterceptor(ClassLoader contextClassLoader) {
classLoader = contextClassLoader;
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,
ServerCallHandler<ReqT, RespT> next) {
ClassLoader originalTccl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
try {
return next.startCall(serverCall, metadata);
} finally {
Thread.currentThread().setContextClassLoader(originalTccl);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.quarkus.grpc.runtime.devmode;

import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;

public class DevModeInterceptor implements ServerInterceptor {
private final ClassLoader classLoader;

public DevModeInterceptor(ClassLoader contextClassLoader) {
classLoader = contextClassLoader;
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,
ServerCallHandler<ReqT, RespT> next) {
ClassLoader originalTccl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
try {
return next.startCall(serverCall, metadata);
} finally {
Thread.currentThread().setContextClassLoader(originalTccl);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static io.grpc.netty.NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW;
import static io.quarkus.grpc.runtime.config.GrpcClientConfiguration.DNS;
import static io.quarkus.grpc.runtime.config.GrpcClientConfiguration.XDS;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -31,19 +30,15 @@

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ChannelCredentials;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.xds.XdsChannelCredentials;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.quarkus.arc.Arc;
Expand All @@ -56,8 +51,8 @@
import io.quarkus.grpc.runtime.config.GrpcClientConfiguration;
import io.quarkus.grpc.runtime.config.GrpcServerConfiguration;
import io.quarkus.grpc.runtime.config.SslClientConfig;
import io.quarkus.grpc.runtime.config.Xds;
import io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor;
import io.quarkus.grpc.spi.GrpcBuilderProvider;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.util.ClassPathUtils;
import io.smallrye.mutiny.infrastructure.Infrastructure;
Expand Down Expand Up @@ -101,21 +96,23 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
throw new IllegalStateException("gRPC client " + name + " is missing configuration.");
}

GrpcBuilderProvider provider = GrpcBuilderProvider.findChannelBuilderProvider(config);

boolean vertxGrpc = config.useQuarkusGrpcClient;
Xds xdsConfig = config.xds;

String host = config.host;
int port = config.port;
String nameResolver = config.nameResolver;

boolean stork = Stork.STORK.equalsIgnoreCase(nameResolver);
boolean xds = XDS.equalsIgnoreCase(nameResolver) || (xdsConfig != null && xdsConfig.enabled);

String[] resolverSplit = nameResolver.split(":");
String resolver = xds ? XDS : resolverSplit[0];
String resolver = provider != null ? provider.resolver() : resolverSplit[0];

// TODO -- does this work for Vert.x gRPC client?
if (!vertxGrpc && (DNS.equalsIgnoreCase(resolver) || xds)) {
if (provider != null) {
host = provider.adjustHost(host);
} else if (!vertxGrpc && DNS.equalsIgnoreCase(resolver)) {
host = "/" + host; // dns or xds name resolver needs triple slash at the beginning
}

Expand All @@ -137,16 +134,8 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
String target = String.format("%s://%s:%d", resolver, host, port);
LOGGER.debugf("Target for client '%s': %s", name, target);

boolean xdsSecure = false;
if (xds) {
if (xdsConfig != null) {
target = xdsConfig.target.orElse(target);
xdsSecure = xdsConfig.secure;
}
}

SslContext context = null;
if (!plainText && !xds) {
if (!plainText && provider == null) {
Path trustStorePath = config.ssl.trustStore.orElse(null);
Path certificatePath = config.ssl.certificate.orElse(null);
Path keyPath = config.ssl.key.orElse(null);
Expand All @@ -173,13 +162,9 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto

String loadBalancingPolicy = stork ? Stork.STORK : config.loadBalancingPolicy;

ManagedChannelBuilder builder;
if (xds) {
ChannelCredentials credentials = InsecureChannelCredentials.create();
if (xdsSecure) {
credentials = XdsChannelCredentials.create(credentials);
}
builder = Grpc.newChannelBuilder(target, credentials);
ManagedChannelBuilder<?> builder;
if (provider != null) {
builder = provider.createChannelBuilder(config, target);
} else {
builder = NettyChannelBuilder
.forTarget(target)
Expand Down Expand Up @@ -233,7 +218,7 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
builder.keepAliveTimeout(idleTimeout.get().toMillis(), TimeUnit.MILLISECONDS);
}

if (plainText && !xds) {
if (plainText && provider == null) {
builder.usePlaintext();
}
if (context != null && (builder instanceof NettyChannelBuilder)) {
Expand All @@ -244,7 +229,8 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
interceptorContainer.getSortedPerServiceInterceptors(perClientInterceptors).forEach(builder::intercept);
interceptorContainer.getSortedGlobalInterceptors().forEach(builder::intercept);

LOGGER.info(String.format("Creating %s gRPC channel ...", xds ? "XDS" : "Netty"));
LOGGER.info(String.format("Creating %s gRPC channel ...",
provider != null ? provider.channelInfo() : "Netty"));

return builder.build();
} else {
Expand Down
Loading

0 comments on commit d0894e9

Please sign in to comment.