Skip to content

Commit

Permalink
Add XDS gRPC support
Browse files Browse the repository at this point in the history
  • Loading branch information
alesj committed Dec 11, 2022
1 parent d4e5ec7 commit 2dedca9
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 178 deletions.
30 changes: 30 additions & 0 deletions extensions/grpc/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,36 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-xds</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.android</groupId>
<artifactId>annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny</artifactId>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
public class GrpcClientConfiguration {

public static final String DNS = "dns";
public static final String XDS = "xds";

/**
* Use new Vert.x gRPC client support.
Expand All @@ -21,6 +22,12 @@ public class GrpcClientConfiguration {
@ConfigItem(defaultValue = "false")
public boolean useQuarkusGrpcClient;

/**
* Configure XDS usage, if enabled.
*/
@ConfigItem
public Xds xds;

/**
* The gRPC service port.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ public class GrpcServerConfiguration {
@ConfigItem(defaultValue = "true")
public boolean useSeparateServer;

/**
* Configure XDS usage, if enabled.
*/
@ConfigItem
public Xds xds;

/**
* The gRPC Server port.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.quarkus.grpc.runtime.config;

import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigItem;

/**
* XDS config
* * <a href="https://github.com/grpc/grpc-java/tree/master/examples/example-xds">XDS usage</a>
*/
@ConfigGroup
public class Xds {
/**
* Explicitly enable use of XDS.
*/
@ConfigItem(defaultValue = "false")
public boolean enabled;

/**
* Use secure credentials.
*/
@ConfigItem(defaultValue = "false")
public boolean secure;

/**
* Optional explicit target.
*/
@ConfigItem
public Optional<String> target;

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

import java.util.function.Supplier;

public class GrpcHotReplacementInterceptor {
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;

public class GrpcHotReplacementInterceptor implements ServerInterceptor {
private static volatile Supplier<Boolean> interceptorAction;

static void register(Supplier<Boolean> onCall) {
Expand All @@ -12,4 +17,11 @@ static void register(Supplier<Boolean> onCall) {
public static boolean fire() {
return interceptorAction.get();
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
fire();
return next.startCall(call, headers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.quarkus.grpc.runtime.devmode;

import java.util.List;
import java.util.Map;

import io.grpc.Server;
import io.grpc.ServerInterceptor;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;

// TODO
public class XdsServerReloader {
private static volatile Server server;

public static Server getServer() {
return server;
}

public static void init(Server grpcServer) {
server = grpcServer;
}

public static void reset() {
shutdown();
}

public static void reinitialize(List<ServerServiceDefinition> serviceDefinitions,
Map<String, ServerMethodDefinition<?, ?>> methods,
List<ServerInterceptor> sortedInterceptors) {
server = null;
}

public static void shutdown() {
shutdown(server);
server = null;
}

public static void shutdown(Server current) {
if (current != null) {
current.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static io.grpc.netty.NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW;
import static io.quarkus.grpc.runtime.config.GrpcClientConfiguration.DNS;
import static io.quarkus.grpc.runtime.config.GrpcClientConfiguration.XDS;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -29,14 +31,19 @@

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ChannelCredentials;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.xds.XdsChannelCredentials;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.quarkus.arc.Arc;
Expand All @@ -49,6 +56,7 @@
import io.quarkus.grpc.runtime.config.GrpcClientConfiguration;
import io.quarkus.grpc.runtime.config.GrpcServerConfiguration;
import io.quarkus.grpc.runtime.config.SslClientConfig;
import io.quarkus.grpc.runtime.config.Xds;
import io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.util.ClassPathUtils;
Expand All @@ -71,6 +79,7 @@ private Channels() {
// Avoid direct instantiation
}

@SuppressWarnings("rawtypes")
public static Channel createChannel(String name, Set<String> perClientInterceptors) throws Exception {
InstanceHandle<GrpcClientConfigProvider> instance = Arc.container().instance(GrpcClientConfigProvider.class);

Expand All @@ -93,18 +102,21 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
}

boolean vertxGrpc = config.useQuarkusGrpcClient;
Xds xdsConfig = config.xds;

String host = config.host;
int port = config.port;
String nameResolver = config.nameResolver;

boolean stork = Stork.STORK.equalsIgnoreCase(nameResolver);
boolean xds = XDS.equalsIgnoreCase(nameResolver) || (xdsConfig != null && xdsConfig.enabled);

String[] resolverSplit = nameResolver.split(":");
String resolver = xds ? XDS : resolverSplit[0];

// TODO -- does this work for Vert.x gRPC client?
if (!vertxGrpc && GrpcClientConfiguration.DNS.equalsIgnoreCase(resolverSplit[0])) {
host = "/" + host; // dns name resolver needs triple slash at the beginning
if (!vertxGrpc && (DNS.equalsIgnoreCase(resolver) || xds)) {
host = "/" + host; // dns or xds name resolver needs triple slash at the beginning
}

// Client-side interceptors
Expand All @@ -122,10 +134,19 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
}

if (!vertxGrpc) {
String target = String.format("%s://%s:%d", resolverSplit[0], host, port);
String target = String.format("%s://%s:%d", resolver, host, port);
LOGGER.debugf("Target for client '%s': %s", name, target);

boolean xdsSecure = false;
if (xds) {
if (xdsConfig != null) {
target = xdsConfig.target.orElse(target);
xdsSecure = xdsConfig.secure;
}
}

SslContext context = null;
if (!plainText) {
if (!plainText && !xds) {
Path trustStorePath = config.ssl.trustStore.orElse(null);
Path certificatePath = config.ssl.certificate.orElse(null);
Path keyPath = config.ssl.key.orElse(null);
Expand All @@ -152,20 +173,29 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto

String loadBalancingPolicy = stork ? Stork.STORK : config.loadBalancingPolicy;

NettyChannelBuilder builder = NettyChannelBuilder
.forTarget(target)
// clients are intercepted using the IOThreadClientInterceptor interceptor which will decide on which
// thread the messages should be processed.
.directExecutor() // will use I/O thread - must not be blocked.
.offloadExecutor(Infrastructure.getDefaultExecutor())
.defaultLoadBalancingPolicy(loadBalancingPolicy)
.flowControlWindow(config.flowControlWindow.orElse(DEFAULT_FLOW_CONTROL_WINDOW))
.keepAliveWithoutCalls(config.keepAliveWithoutCalls)
.maxHedgedAttempts(config.maxHedgedAttempts)
.maxRetryAttempts(config.maxRetryAttempts)
.maxInboundMetadataSize(config.maxInboundMetadataSize.orElse(DEFAULT_MAX_HEADER_LIST_SIZE))
.maxInboundMessageSize(config.maxInboundMessageSize.orElse(DEFAULT_MAX_MESSAGE_SIZE))
.negotiationType(NegotiationType.valueOf(config.negotiationType.toUpperCase()));
ManagedChannelBuilder builder;
if (xds) {
ChannelCredentials credentials = InsecureChannelCredentials.create();
if (xdsSecure) {
credentials = XdsChannelCredentials.create(credentials);
}
builder = Grpc.newChannelBuilder(target, credentials);
} else {
builder = NettyChannelBuilder
.forTarget(target)
// clients are intercepted using the IOThreadClientInterceptor interceptor which will decide on which
// thread the messages should be processed.
.directExecutor() // will use I/O thread - must not be blocked.
.offloadExecutor(Infrastructure.getDefaultExecutor())
.defaultLoadBalancingPolicy(loadBalancingPolicy)
.flowControlWindow(config.flowControlWindow.orElse(DEFAULT_FLOW_CONTROL_WINDOW))
.keepAliveWithoutCalls(config.keepAliveWithoutCalls)
.maxHedgedAttempts(config.maxHedgedAttempts)
.maxRetryAttempts(config.maxRetryAttempts)
.maxInboundMetadataSize(config.maxInboundMetadataSize.orElse(DEFAULT_MAX_HEADER_LIST_SIZE))
.maxInboundMessageSize(config.maxInboundMessageSize.orElse(DEFAULT_MAX_MESSAGE_SIZE))
.negotiationType(NegotiationType.valueOf(config.negotiationType.toUpperCase()));
}

if (config.retry) {
builder.enableRetry();
Expand Down Expand Up @@ -203,17 +233,18 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
builder.keepAliveTimeout(idleTimeout.get().toMillis(), TimeUnit.MILLISECONDS);
}

if (plainText) {
if (plainText && !xds) {
builder.usePlaintext();
}
if (context != null) {
builder.sslContext(context);
if (context != null && (builder instanceof NettyChannelBuilder)) {
NettyChannelBuilder ncBuilder = (NettyChannelBuilder) builder;
ncBuilder.sslContext(context);
}

interceptorContainer.getSortedPerServiceInterceptors(perClientInterceptors).forEach(builder::intercept);
interceptorContainer.getSortedGlobalInterceptors().forEach(builder::intercept);

LOGGER.info("Creating Netty gRPC channel ...");
LOGGER.info(String.format("Creating %s gRPC channel ...", xds ? "XDS" : "Netty"));

return builder.build();
} else {
Expand Down Expand Up @@ -258,6 +289,7 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
Vertx vertx = Arc.container().instance(Vertx.class).get();
io.vertx.grpc.client.GrpcClient client = io.vertx.grpc.client.GrpcClient.client(vertx, options);
Channel channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(port, host));
LOGGER.debugf("Target for client '%s': %s", name, host + ":" + port);

List<ClientInterceptor> interceptors = new ArrayList<>();
interceptors.addAll(interceptorContainer.getSortedPerServiceInterceptors(perClientInterceptors));
Expand Down Expand Up @@ -285,7 +317,7 @@ private static GrpcClientConfiguration testConfig(GrpcServerConfiguration server
config.maxInboundMetadataSize = OptionalInt.empty();
config.maxRetryAttempts = 0;
config.maxTraceEvents = OptionalInt.empty();
config.nameResolver = GrpcClientConfiguration.DNS;
config.nameResolver = DNS;
config.negotiationType = "PLAINTEXT";
config.overrideAuthority = Optional.empty();
config.perRpcBufferLimit = OptionalLong.empty();
Expand Down

0 comments on commit 2dedca9

Please sign in to comment.