diff --git a/.github/workflows/ci-istio.yml b/.github/workflows/ci-istio.yml
new file mode 100644
index 00000000000000..2da3c22c39970a
--- /dev/null
+++ b/.github/workflows/ci-istio.yml
@@ -0,0 +1,97 @@
+name: Quarkus CI - Istio
+
+on:
+ workflow_dispatch:
+ schedule:
+ # 2am every weekday + saturday
+ - cron: '0 2 * * 1-6'
+
+env:
+ MAVEN_ARGS: -B -e
+
+jobs:
+ cache:
+ name: Build and save artifacts
+ runs-on: ubuntu-latest
+ if: "github.repository == 'quarkusio/quarkus' || github.event_name == 'workflow_dispatch'"
+ steps:
+ - uses: actions/checkout@v3
+ - uses: actions/setup-java@v3
+ with:
+ distribution: 'temurin'
+ java-version: '17'
+ - name: Install artifacts
+ run: ./mvnw ${MAVEN_ARGS} -DskipTests -DskipITs -Dinvoker.skip clean install -pl :quarkus-integration-test-istio-invoker -am
+ - name: Tar Maven repository
+ shell: bash
+ run: tar -I 'pigz -9' -cf maven-repo.tgz -C ~ .m2/repository
+ - name: Persist Maven repository
+ uses: actions/upload-artifact@v3
+ with:
+ name: maven-repo
+ path: maven-repo.tgz
+ retention-days: 1
+
+ kubernetes:
+ name: Istio + Kubernetes Integration Tests
+ needs: cache
+ runs-on: ubuntu-latest
+ if: "github.repository == 'quarkusio/quarkus' || github.event_name == 'workflow_dispatch'"
+ strategy:
+ fail-fast: false
+ matrix:
+ kubernetes: [v1.20.1]
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+ - uses: actions/setup-java@v3
+ with:
+ distribution: 'temurin'
+ java-version: '17'
+ - name: Download Maven repository
+ uses: actions/download-artifact@v3
+ with:
+ name: maven-repo
+ path: .
+ - name: Extract Maven repository
+ shell: bash
+ run: tar -xzf maven-repo.tgz -C ~
+ - name: Set up Minikube-Kubernetes
+ uses: manusa/actions-setup-minikube@v2.7.1
+ with:
+ minikube version: v1.16.0
+ kubernetes version: ${{ matrix.kubernetes }}
+ github token: ${{ secrets.GITHUB_TOKEN }}
+ start args: '--addons=metrics-server --force'
+ - name: Quay login
+ uses: docker/login-action@v2
+ with:
+ registry: quay.io
+ username: ${{ secrets.QUAY_QUARKUSCI_USERNAME }}
+ password: ${{ secrets.QUAY_QUARKUSCI_PASSWORD }}
+ - name: Get kubeconfig
+ id: kubeconfig
+ run: a="$(cat ~/.kube/config)"; a="${a//'%'/'%25'}"; a="${a//$'\n'/'%0A'}"; a="${a//$'\r'/'%0D'}"; echo "::set-output name=config::$a"
+ - name: Install Istio
+ uses: huang195/actions-install-istio@v1.0.0
+ with:
+ kubeconfig: "${{steps.kubeconfig.outputs.config}}"
+ istio version: '1.15.2'
+ - name: Run Istio Invoker Tests
+ run: |
+ export QUARKUS_CONTAINER_IMAGE_GROUP=quarkustesting
+ export QUARKUS_CONTAINER_IMAGE_TAG=${{ github.sha }}
+ export QUARKUS_CONTAINER_IMAGE_REGISTRY=quay.io
+ ./mvnw ${MAVEN_ARGS} clean install -pl :quarkus-integration-test-istio-invoker -De2e-tests -Dkubernetes-e2e-tests
+ - name: Report status
+ if: "always() && github.repository == 'quarkusio/quarkus'"
+ shell: bash
+ run: |
+ curl -Ls https://sh.jbang.dev | bash -s - app setup
+ ~/.jbang/bin/jbang .github/NativeBuildReport.java \
+ issueNumber=29536 \
+ runId=${{ github.run_id }} \
+ status=${{ job.status }} \
+ token=${{ secrets.GITHUB_API_TOKEN }} \
+ issueRepo=${{ github.repository }} \
+ thisRepo=${{ github.repository }}
diff --git a/bom/application/pom.xml b/bom/application/pom.xml
index 6ab6bd824e53c3..a7d1ec62228f02 100644
--- a/bom/application/pom.xml
+++ b/bom/application/pom.xml
@@ -1909,6 +1909,11 @@
+
+ io.quarkus
+ quarkus-grpc-xds
+ ${project.version}
+
io.quarkus
quarkus-grpc-api
diff --git a/extensions/grpc/pom.xml b/extensions/grpc/pom.xml
index 27cb1447a3e968..1d41c8feb45663 100644
--- a/extensions/grpc/pom.xml
+++ b/extensions/grpc/pom.xml
@@ -20,5 +20,6 @@
stubs
deployment
runtime
+ xds
\ No newline at end of file
diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java
index 307945967420dc..9291396794922d 100644
--- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java
+++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java
@@ -22,9 +22,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Supplier;
import java.util.regex.Pattern;
import javax.enterprise.inject.Instance;
@@ -33,9 +30,8 @@
import grpc.health.v1.HealthOuterClass;
import io.grpc.BindableService;
-import io.grpc.Metadata;
-import io.grpc.ServerCall;
-import io.grpc.ServerCallHandler;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerMethodDefinition;
@@ -45,26 +41,27 @@
import io.quarkus.grpc.runtime.config.GrpcConfiguration;
import io.quarkus.grpc.runtime.config.GrpcServerConfiguration;
import io.quarkus.grpc.runtime.config.GrpcServerNettyConfig;
+import io.quarkus.grpc.runtime.config.Xds;
+import io.quarkus.grpc.runtime.devmode.DevModeInterceptor;
import io.quarkus.grpc.runtime.devmode.GrpcHotReplacementInterceptor;
import io.quarkus.grpc.runtime.devmode.GrpcServerReloader;
+import io.quarkus.grpc.runtime.devmode.XdsServerReloader;
import io.quarkus.grpc.runtime.health.GrpcHealthStorage;
import io.quarkus.grpc.runtime.reflection.ReflectionService;
import io.quarkus.grpc.runtime.supports.CompressionInterceptor;
import io.quarkus.grpc.runtime.supports.blocking.BlockingServerInterceptor;
+import io.quarkus.grpc.spi.GrpcBuilderProvider;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.QuarkusBindException;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
-import io.quarkus.runtime.configuration.ProfileManager;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
-import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
-import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
@@ -88,6 +85,11 @@ public static List getServices() {
return services;
}
+ private static boolean isXds(GrpcServerConfiguration configuration) {
+ Xds xds = configuration.xds;
+ return xds != null && xds.enabled;
+ }
+
public void initializeGrpcServer(RuntimeValue vertxSupplier,
RuntimeValue routerSupplier,
GrpcConfiguration cfg,
@@ -112,10 +114,10 @@ public void initializeGrpcServer(RuntimeValue vertxSupplier,
if (launchMode == LaunchMode.DEVELOPMENT) {
// start single server, not in a verticle, regardless of the configuration.instances
// for reason unknown to me, verticles occasionally get undeployed on dev mode reload
- if (GrpcServerReloader.getServer() == null) {
- devModeStart(grpcContainer, vertx, configuration, blockingMethodsPerService, shutdown, launchMode);
- } else {
+ if (GrpcServerReloader.getServer() != null || XdsServerReloader.getServer() != null) {
devModeReload(grpcContainer, vertx, configuration, blockingMethodsPerService, shutdown);
+ } else {
+ devModeStart(grpcContainer, vertx, configuration, blockingMethodsPerService, shutdown, launchMode);
}
} else {
prodStart(grpcContainer, vertx, configuration, blockingMethodsPerService, launchMode);
@@ -126,6 +128,7 @@ public void initializeGrpcServer(RuntimeValue vertxSupplier,
}
}
+ // TODO -- handle XDS
private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration, RuntimeValue routerSupplier,
ShutdownContext shutdown, Map> blockingMethodsPerService,
GrpcContainer grpcContainer, LaunchMode launchMode) {
@@ -134,7 +137,9 @@ private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration,
List globalInterceptors = grpcContainer.getSortedGlobalInterceptors();
if (launchMode == LaunchMode.DEVELOPMENT) {
- globalInterceptors.add(0, new DevModeInterceptor(Thread.currentThread().getContextClassLoader())); // add as first
+ // add as last, so they will run first
+ globalInterceptors.add(new DevModeInterceptor(Thread.currentThread().getContextClassLoader()));
+ globalInterceptors.add(new GrpcHotReplacementInterceptor());
}
List toBeRegistered = collectServiceDefinitions(grpcContainer.getServices());
@@ -153,8 +158,7 @@ private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration,
definitions.add(service.definition);
}
- boolean reflectionServiceEnabled = configuration.enableReflectionService
- || ProfileManager.getLaunchMode() == LaunchMode.DEVELOPMENT;
+ boolean reflectionServiceEnabled = configuration.enableReflectionService || launchMode == LaunchMode.DEVELOPMENT;
if (reflectionServiceEnabled) {
LOGGER.info("Registering gRPC reflection service");
@@ -164,6 +168,8 @@ private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration,
bridge.bind(server);
}
+ initHealthStorage();
+
LOGGER.info("Starting new Vert.x gRPC server ...");
Route route = routerSupplier.getValue().route().handler(ctx -> {
if (!isGrpc(ctx)) {
@@ -186,23 +192,15 @@ private void prodStart(GrpcContainer grpcContainer, Vertx vertx, GrpcServerConfi
CompletableFuture startResult = new CompletableFuture<>();
vertx.deployVerticle(
- new Supplier() {
- @Override
- public Verticle get() {
- return new GrpcServerVerticle(configuration, grpcContainer, launchMode, blockingMethodsPerService);
- }
- },
+ () -> new GrpcServerVerticle(configuration, grpcContainer, launchMode, blockingMethodsPerService),
new DeploymentOptions().setInstances(configuration.instances),
- new Handler>() {
- @Override
- public void handle(AsyncResult result) {
- if (result.failed()) {
- startResult.completeExceptionally(result.cause());
- } else {
- GrpcServerRecorder.this.postStartup(configuration, launchMode == LaunchMode.TEST);
+ result -> {
+ if (result.failed()) {
+ startResult.completeExceptionally(result.cause());
+ } else {
+ GrpcServerRecorder.this.postStartup(configuration, launchMode == LaunchMode.TEST);
- startResult.complete(null);
- }
+ startResult.complete(null);
}
});
@@ -220,8 +218,10 @@ public void handle(AsyncResult result) {
private void postStartup(GrpcServerConfiguration configuration, boolean test) {
initHealthStorage();
- LOGGER.infof("gRPC Server started on %s:%d [SSL enabled: %s]",
- configuration.host, test ? configuration.testPort : configuration.port, !configuration.plainText);
+ LOGGER.infof("gRPC Server started on %s:%d [%s]",
+ configuration.host,
+ test ? configuration.testPort : configuration.port,
+ isXds(configuration) ? "XDS enabled" : "SSL enabled: " + !configuration.plainText);
}
private void initHealthStorage() {
@@ -236,51 +236,57 @@ private void initHealthStorage() {
private void devModeStart(GrpcContainer grpcContainer, Vertx vertx, GrpcServerConfiguration configuration,
Map> blockingMethodsPerService, ShutdownContext shutdown, LaunchMode launchMode) {
- CompletableFuture future = new CompletableFuture<>();
- devModeWrapper = new DevModeWrapper(Thread.currentThread().getContextClassLoader());
-
- Map.Entry portToServer = buildServer(vertx, configuration,
+ Map.Entry portToServer = buildServer(vertx, configuration,
blockingMethodsPerService, grpcContainer, launchMode);
- VertxServer vertxServer = portToServer.getValue()
- .start(new Handler<>() { // NOSONAR
- @Override
- public void handle(AsyncResult ar) {
- if (ar.failed()) {
- Throwable effectiveCause = getEffectiveThrowable(ar, portToServer);
- if (effectiveCause instanceof QuarkusBindException) {
- LOGGER.error("Unable to start the gRPC server");
- } else {
- LOGGER.error("Unable to start the gRPC server", effectiveCause);
- }
- future.completeExceptionally(effectiveCause);
- } else {
- postStartup(configuration, false);
- future.complete(true);
- grpcVerticleCount.incrementAndGet();
- }
- }
- });
- try {
- future.get(1, TimeUnit.MINUTES);
- } catch (TimeoutException e) {
- LOGGER.error("Failed to start grpc server in time", e);
- } catch (ExecutionException e) {
- throw new RuntimeException("grpc server start failed", e);
- } catch (InterruptedException e) {
- LOGGER.warn("Waiting for grpc server start interrupted", e);
- Thread.currentThread().interrupt();
- }
+ Server server = portToServer.getValue();
+ if (server instanceof VertxServer) {
+ CompletableFuture future = new CompletableFuture<>();
- GrpcServerReloader.init(vertxServer);
- shutdown.addShutdownTask(
- new Runnable() { // NOSONAR
- @Override
- public void run() {
- GrpcServerReloader.reset();
+ devModeWrapper = new DevModeWrapper(Thread.currentThread().getContextClassLoader());
+
+ VertxServer vertxServer = (VertxServer) server;
+ vertxServer.start(ar -> {
+ if (ar.failed()) {
+ Throwable effectiveCause = getEffectiveThrowable(ar, portToServer);
+ if (effectiveCause instanceof QuarkusBindException) {
+ LOGGER.error("Unable to start the gRPC server");
+ } else {
+ LOGGER.error("Unable to start the gRPC server", effectiveCause);
}
- });
+ future.completeExceptionally(effectiveCause);
+ } else {
+ postStartup(configuration, false);
+ future.complete(true);
+ grpcVerticleCount.incrementAndGet();
+ }
+ });
+
+ try {
+ future.get(1, TimeUnit.MINUTES);
+ } catch (TimeoutException e) {
+ LOGGER.error("Failed to start grpc server in time", e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("grpc server start failed", e);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Waiting for grpc server start interrupted", e);
+ Thread.currentThread().interrupt();
+ }
+
+ GrpcServerReloader.init(vertxServer);
+ shutdown.addShutdownTask(GrpcServerReloader::reset);
+ } else if (isXds(configuration)) {
+ try {
+ server.start();
+ } catch (Exception e) {
+ LOGGER.error("Unable to start the gRPC server", e);
+ throw new IllegalStateException(e);
+ }
+ postStartup(configuration, false);
+ XdsServerReloader.init(server);
+ shutdown.addShutdownTask(XdsServerReloader::reset);
+ }
}
private void applyNettySettings(GrpcServerConfiguration configuration, VertxServerBuilder builder) {
@@ -291,23 +297,14 @@ private void applyNettySettings(GrpcServerConfiguration configuration, VertxServ
}
}
- private void applyTransportSecurityConfig(GrpcServerConfiguration configuration, VertxServerBuilder builder) {
+ @SuppressWarnings("rawtypes")
+ private void applyTransportSecurityConfig(GrpcServerConfiguration configuration, ServerBuilder builder) {
if (configuration.transportSecurity != null) {
File cert = configuration.transportSecurity.certificate
- .map(new Function() { // NOSONAR
- @Override
- public File apply(String pathname) {
- return new File(pathname);
- }
- })
+ .map(File::new)
.orElse(null);
File key = configuration.transportSecurity.key
- .map(new Function() { // NOSONAR
- @Override
- public File apply(String pathname) {
- return new File(pathname);
- }
- })
+ .map(File::new)
.orElse(null);
if (cert != null || key != null) {
builder.useTransportSecurity(cert, key);
@@ -334,7 +331,7 @@ private static List collectServiceDefinitions(Instance ar, Map.Entry portToServer) {
+ private Throwable getEffectiveThrowable(AsyncResult ar, Map.Entry portToServer) {
Throwable effectiveCause = ar.cause();
while (effectiveCause.getCause() != null) {
effectiveCause = effectiveCause.getCause();
@@ -392,19 +389,21 @@ private void devModeReload(GrpcContainer grpcContainer, Vertx vertx, GrpcServerC
methods.put(method.getMethodDescriptor().getFullMethodName(), method);
}
}
- devModeWrapper = new DevModeWrapper(Thread.currentThread().getContextClassLoader());
initHealthStorage();
- GrpcServerReloader.reinitialize(servicesWithInterceptors, methods, grpcContainer.getSortedGlobalInterceptors());
+ List globalInterceptors = grpcContainer.getSortedGlobalInterceptors();
- shutdown.addShutdownTask(
- new Runnable() { // NOSONAR
- @Override
- public void run() {
- GrpcServerReloader.reset();
- }
- });
+ if (isXds(configuration)) {
+ globalInterceptors.add(new DevModeInterceptor(Thread.currentThread().getContextClassLoader()));
+ globalInterceptors.add(new GrpcHotReplacementInterceptor());
+ XdsServerReloader.reinitialize(servicesWithInterceptors, methods, globalInterceptors);
+ shutdown.addShutdownTask(XdsServerReloader::reset);
+ } else {
+ devModeWrapper = new DevModeWrapper(Thread.currentThread().getContextClassLoader());
+ GrpcServerReloader.reinitialize(servicesWithInterceptors, methods, globalInterceptors);
+ shutdown.addShutdownTask(GrpcServerReloader::reset);
+ }
}
public static int getVerticleCount() {
@@ -417,22 +416,38 @@ public RuntimeValue initServerInterceptorStorage(
return new RuntimeValue<>(new ServerInterceptorStorage(perServiceInterceptors, globalInterceptors));
}
- private Map.Entry buildServer(Vertx vertx, GrpcServerConfiguration configuration,
+ @SuppressWarnings("rawtypes")
+ private Map.Entry buildServer(Vertx vertx, GrpcServerConfiguration configuration,
Map> blockingMethodsPerService, GrpcContainer grpcContainer, LaunchMode launchMode) {
+
int port = launchMode == LaunchMode.TEST ? configuration.testPort : configuration.port;
- VertxServerBuilder builder = VertxServerBuilder.forAddress(vertx, configuration.host, port);
AtomicBoolean usePlainText = new AtomicBoolean();
- builder.useSsl(new Handler() { // NOSONAR
- @Override
- public void handle(HttpServerOptions options) {
+
+ GrpcBuilderProvider provider = GrpcBuilderProvider.findServerBuilderProvider(configuration);
+
+ ServerBuilder builder;
+ if (provider != null) {
+ builder = provider.createServerBuilder(vertx, configuration, launchMode);
+ } else {
+ VertxServerBuilder vsBuilder = VertxServerBuilder.forAddress(vertx, configuration.host, port);
+ // add Vert.x specific stuff here
+ vsBuilder.useSsl(options -> {
try {
usePlainText.set(applySslOptions(configuration, options));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
+ });
+ applyNettySettings(configuration, vsBuilder);
+ if (launchMode == LaunchMode.DEVELOPMENT) {
+ vsBuilder.commandDecorator(command -> vertx.executeBlocking(
+ event -> event.complete(GrpcHotReplacementInterceptor.fire()),
+ false,
+ (Handler>) result -> devModeWrapper.run(command)));
}
- });
+ builder = vsBuilder;
+ }
if (configuration.maxInboundMessageSize.isPresent()) {
builder.maxInboundMessageSize(configuration.maxInboundMessageSize.getAsInt());
@@ -443,16 +458,11 @@ public void handle(HttpServerOptions options) {
}
Optional handshakeTimeout = configuration.handshakeTimeout;
- if (handshakeTimeout.isPresent()) {
- builder.handshakeTimeout(handshakeTimeout.get().toMillis(), TimeUnit.MILLISECONDS);
- }
+ handshakeTimeout.ifPresent(duration -> builder.handshakeTimeout(duration.toMillis(), TimeUnit.MILLISECONDS));
applyTransportSecurityConfig(configuration, builder);
- applyNettySettings(configuration, builder);
-
- boolean reflectionServiceEnabled = configuration.enableReflectionService
- || ProfileManager.getLaunchMode() == LaunchMode.DEVELOPMENT;
+ boolean reflectionServiceEnabled = configuration.enableReflectionService || launchMode == LaunchMode.DEVELOPMENT;
List toBeRegistered = collectServiceDefinitions(grpcContainer.getServices());
List definitions = new ArrayList<>();
@@ -475,30 +485,9 @@ public void handle(HttpServerOptions options) {
builder.intercept(serverInterceptor);
}
- if (launchMode == LaunchMode.DEVELOPMENT) {
- builder.commandDecorator(new Consumer() {
- @Override
- public void accept(Runnable command) {
- vertx.executeBlocking(new Handler>() {
- @Override
- public void handle(Promise event) {
- event.complete(GrpcHotReplacementInterceptor.fire());
- }
- },
- false,
- new Handler>() {
- @Override
- public void handle(AsyncResult result) {
- devModeWrapper.run(command);
- }
- });
- }
- });
- }
-
- LOGGER.debugf("Starting gRPC Server on %s:%d [SSL enabled: %s]...",
+ LOGGER.debugf("Starting gRPC Server on %s:%d [%s] ...",
configuration.host, port,
- !usePlainText.get());
+ provider != null ? provider.serverInfo() : "SSL enabled: " + !usePlainText.get());
return new AbstractMap.SimpleEntry<>(port, builder.build());
}
@@ -543,7 +532,7 @@ private class GrpcServerVerticle extends AbstractVerticle {
private final LaunchMode launchMode;
private final Map> blockingMethodsPerService;
- private VertxServer grpcServer;
+ private Server grpcServer;
GrpcServerVerticle(GrpcServerConfiguration configuration, GrpcContainer grpcContainer,
LaunchMode launchMode, Map> blockingMethodsPerService) {
@@ -560,46 +549,72 @@ public void start(Promise startPromise) {
"Unable to find bean exposing the `BindableService` interface - not starting the gRPC server");
return;
}
- Map.Entry portToServer = buildServer(getVertx(), configuration,
+ Map.Entry portToServer = buildServer(getVertx(), configuration,
blockingMethodsPerService, grpcContainer, launchMode);
- grpcServer = portToServer.getValue()
- .start(new Handler<>() { // NOSONAR
- @Override
- public void handle(AsyncResult ar) {
- if (ar.failed()) {
- Throwable effectiveCause = getEffectiveThrowable(ar, portToServer);
- if (effectiveCause instanceof QuarkusBindException) {
- LOGGER.error("Unable to start the gRPC server");
- } else {
- LOGGER.error("Unable to start the gRPC server", effectiveCause);
- }
- startPromise.fail(effectiveCause);
- } else {
- startPromise.complete();
- grpcVerticleCount.incrementAndGet();
- }
+
+ grpcServer = portToServer.getValue();
+ if (grpcServer instanceof VertxServer) {
+ VertxServer server = (VertxServer) grpcServer;
+ server.start(ar -> {
+ if (ar.failed()) {
+ Throwable effectiveCause = getEffectiveThrowable(ar, portToServer);
+ if (effectiveCause instanceof QuarkusBindException) {
+ LOGGER.error("Unable to start the gRPC server");
+ } else {
+ LOGGER.error("Unable to start the gRPC server", effectiveCause);
}
- });
+ startPromise.fail(effectiveCause);
+ } else {
+ startPromise.complete();
+ grpcVerticleCount.incrementAndGet();
+ }
+ });
+ } else {
+ // XDS server blocks on initialStartFuture
+ vertx.executeBlocking((Handler>) event -> {
+ try {
+ grpcServer.start();
+ startPromise.complete();
+ } catch (Exception e) {
+ LOGGER.error("Unable to start gRPC server", e);
+ startPromise.fail(e);
+ }
+ });
+ }
}
@Override
public void stop(Promise stopPromise) {
- grpcServer.shutdown(new Handler>() { // NOSONAR
- @Override
- public void handle(AsyncResult ar) {
+ if (grpcServer instanceof VertxServer) {
+ VertxServer server = (VertxServer) grpcServer;
+ server.shutdown(ar -> {
if (ar.failed()) {
- LOGGER.errorf(ar.cause(), "Unable to stop the gRPC server gracefully");
+ Throwable cause = ar.cause();
+ LOGGER.errorf(cause, "Unable to stop the gRPC server gracefully");
+ stopPromise.fail(cause);
} else {
LOGGER.debug("gRPC Server stopped");
stopPromise.complete();
grpcVerticleCount.decrementAndGet();
}
+ });
+ } else {
+ try {
+ grpcServer.shutdownNow().awaitTermination(10, TimeUnit.SECONDS);
+ stopPromise.complete();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ stopPromise.fail(e);
+ throw new IllegalStateException(e);
+ } catch (Exception e) {
+ LOGGER.errorf(e, "Unable to stop the gRPC server gracefully");
+ stopPromise.fail(e);
}
- });
+ }
}
}
- private class DevModeWrapper {
+ private static class DevModeWrapper {
private final ClassLoader classLoader;
public DevModeWrapper(ClassLoader contextClassLoader) {
@@ -616,24 +631,4 @@ public void run(Runnable command) {
}
}
}
-
- private class DevModeInterceptor implements ServerInterceptor {
- private final ClassLoader classLoader;
-
- public DevModeInterceptor(ClassLoader contextClassLoader) {
- classLoader = contextClassLoader;
- }
-
- @Override
- public ServerCall.Listener interceptCall(ServerCall serverCall, Metadata metadata,
- ServerCallHandler next) {
- ClassLoader originalTccl = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(classLoader);
- try {
- return next.startCall(serverCall, metadata);
- } finally {
- Thread.currentThread().setContextClassLoader(originalTccl);
- }
- }
- }
}
diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcClientConfiguration.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcClientConfiguration.java
index f20c2a815e19cf..3f29f429fdaaa3 100644
--- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcClientConfiguration.java
+++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcClientConfiguration.java
@@ -13,6 +13,7 @@
public class GrpcClientConfiguration {
public static final String DNS = "dns";
+ public static final String XDS = "xds";
/**
* Use new Vert.x gRPC client support.
@@ -21,6 +22,12 @@ public class GrpcClientConfiguration {
@ConfigItem(defaultValue = "false")
public boolean useQuarkusGrpcClient;
+ /**
+ * Configure XDS usage, if enabled.
+ */
+ @ConfigItem
+ public Xds xds;
+
/**
* The gRPC service port.
*/
diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcServerConfiguration.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcServerConfiguration.java
index 2389fbcfdb06e9..ae569dbd5f26d3 100644
--- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcServerConfiguration.java
+++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcServerConfiguration.java
@@ -19,6 +19,12 @@ public class GrpcServerConfiguration {
@ConfigItem(defaultValue = "true")
public boolean useSeparateServer;
+ /**
+ * Configure XDS usage, if enabled.
+ */
+ @ConfigItem
+ public Xds xds;
+
/**
* The gRPC Server port.
*/
diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/Xds.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/Xds.java
new file mode 100644
index 00000000000000..4916421698837b
--- /dev/null
+++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/Xds.java
@@ -0,0 +1,32 @@
+package io.quarkus.grpc.runtime.config;
+
+import java.util.Optional;
+
+import io.quarkus.runtime.annotations.ConfigGroup;
+import io.quarkus.runtime.annotations.ConfigItem;
+
+/**
+ * XDS config
+ * * XDS usage
+ */
+@ConfigGroup
+public class Xds {
+ /**
+ * Explicitly enable use of XDS.
+ */
+ @ConfigItem(defaultValue = "false")
+ public boolean enabled;
+
+ /**
+ * Use secure credentials.
+ */
+ @ConfigItem(defaultValue = "false")
+ public boolean secure;
+
+ /**
+ * Optional explicit target.
+ */
+ @ConfigItem
+ public Optional target;
+
+}
diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/DevModeInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/DevModeInterceptor.java
new file mode 100644
index 00000000000000..9ecf69d9967a4a
--- /dev/null
+++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/DevModeInterceptor.java
@@ -0,0 +1,26 @@
+package io.quarkus.grpc.runtime.devmode;
+
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+
+public class DevModeInterceptor implements ServerInterceptor {
+ private final ClassLoader classLoader;
+
+ public DevModeInterceptor(ClassLoader contextClassLoader) {
+ classLoader = contextClassLoader;
+ }
+
+ @Override
+ public ServerCall.Listener interceptCall(ServerCall serverCall, Metadata metadata,
+ ServerCallHandler next) {
+ ClassLoader originalTccl = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(classLoader);
+ try {
+ return next.startCall(serverCall, metadata);
+ } finally {
+ Thread.currentThread().setContextClassLoader(originalTccl);
+ }
+ }
+}
diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcHotReplacementInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcHotReplacementInterceptor.java
index 4dc416a9b982d6..a07246f45daa1f 100644
--- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcHotReplacementInterceptor.java
+++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcHotReplacementInterceptor.java
@@ -2,7 +2,12 @@
import java.util.function.Supplier;
-public class GrpcHotReplacementInterceptor {
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+
+public class GrpcHotReplacementInterceptor implements ServerInterceptor {
private static volatile Supplier interceptorAction;
static void register(Supplier onCall) {
@@ -12,4 +17,11 @@ static void register(Supplier onCall) {
public static boolean fire() {
return interceptorAction.get();
}
+
+ @Override
+ public ServerCall.Listener interceptCall(ServerCall call, Metadata headers,
+ ServerCallHandler next) {
+ fire();
+ return next.startCall(call, headers);
+ }
}
diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/XdsServerReloader.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/XdsServerReloader.java
new file mode 100644
index 00000000000000..eccfc85b2ff794
--- /dev/null
+++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/XdsServerReloader.java
@@ -0,0 +1,43 @@
+package io.quarkus.grpc.runtime.devmode;
+
+import java.util.List;
+import java.util.Map;
+
+import io.grpc.Server;
+import io.grpc.ServerInterceptor;
+import io.grpc.ServerMethodDefinition;
+import io.grpc.ServerServiceDefinition;
+
+// TODO
+public class XdsServerReloader {
+ private static volatile Server server;
+
+ public static Server getServer() {
+ return server;
+ }
+
+ public static void init(Server grpcServer) {
+ server = grpcServer;
+ }
+
+ public static void reset() {
+ shutdown();
+ }
+
+ public static void reinitialize(List serviceDefinitions,
+ Map> methods,
+ List sortedInterceptors) {
+ server = null;
+ }
+
+ public static void shutdown() {
+ shutdown(server);
+ server = null;
+ }
+
+ public static void shutdown(Server current) {
+ if (current != null) {
+ current.shutdownNow();
+ }
+ }
+}
diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java
index 5cd0146cbe7b6f..947e0a1ef1f412 100644
--- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java
+++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java
@@ -3,6 +3,7 @@
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static io.grpc.netty.NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW;
+import static io.quarkus.grpc.runtime.config.GrpcClientConfiguration.DNS;
import java.io.IOException;
import java.io.InputStream;
@@ -33,6 +34,7 @@
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
@@ -50,6 +52,7 @@
import io.quarkus.grpc.runtime.config.GrpcServerConfiguration;
import io.quarkus.grpc.runtime.config.SslClientConfig;
import io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor;
+import io.quarkus.grpc.spi.GrpcBuilderProvider;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.util.ClassPathUtils;
import io.smallrye.mutiny.infrastructure.Infrastructure;
@@ -71,6 +74,7 @@ private Channels() {
// Avoid direct instantiation
}
+ @SuppressWarnings("rawtypes")
public static Channel createChannel(String name, Set perClientInterceptors) throws Exception {
InstanceHandle instance = Arc.container().instance(GrpcClientConfigProvider.class);
@@ -92,6 +96,8 @@ public static Channel createChannel(String name, Set perClientIntercepto
throw new IllegalStateException("gRPC client " + name + " is missing configuration.");
}
+ GrpcBuilderProvider provider = GrpcBuilderProvider.findChannelBuilderProvider(config);
+
boolean vertxGrpc = config.useQuarkusGrpcClient;
String host = config.host;
@@ -101,10 +107,13 @@ public static Channel createChannel(String name, Set perClientIntercepto
boolean stork = Stork.STORK.equalsIgnoreCase(nameResolver);
String[] resolverSplit = nameResolver.split(":");
+ String resolver = provider != null ? provider.resolver() : resolverSplit[0];
// TODO -- does this work for Vert.x gRPC client?
- if (!vertxGrpc && GrpcClientConfiguration.DNS.equalsIgnoreCase(resolverSplit[0])) {
- host = "/" + host; // dns name resolver needs triple slash at the beginning
+ if (provider != null) {
+ host = provider.adjustHost(host);
+ } else if (!vertxGrpc && DNS.equalsIgnoreCase(resolver)) {
+ host = "/" + host; // dns or xds name resolver needs triple slash at the beginning
}
// Client-side interceptors
@@ -122,10 +131,11 @@ public static Channel createChannel(String name, Set perClientIntercepto
}
if (!vertxGrpc) {
- String target = String.format("%s://%s:%d", resolverSplit[0], host, port);
+ String target = String.format("%s://%s:%d", resolver, host, port);
+ LOGGER.debugf("Target for client '%s': %s", name, target);
SslContext context = null;
- if (!plainText) {
+ if (!plainText && provider == null) {
Path trustStorePath = config.ssl.trustStore.orElse(null);
Path certificatePath = config.ssl.certificate.orElse(null);
Path keyPath = config.ssl.key.orElse(null);
@@ -152,20 +162,25 @@ public static Channel createChannel(String name, Set perClientIntercepto
String loadBalancingPolicy = stork ? Stork.STORK : config.loadBalancingPolicy;
- NettyChannelBuilder builder = NettyChannelBuilder
- .forTarget(target)
- // clients are intercepted using the IOThreadClientInterceptor interceptor which will decide on which
- // thread the messages should be processed.
- .directExecutor() // will use I/O thread - must not be blocked.
- .offloadExecutor(Infrastructure.getDefaultExecutor())
- .defaultLoadBalancingPolicy(loadBalancingPolicy)
- .flowControlWindow(config.flowControlWindow.orElse(DEFAULT_FLOW_CONTROL_WINDOW))
- .keepAliveWithoutCalls(config.keepAliveWithoutCalls)
- .maxHedgedAttempts(config.maxHedgedAttempts)
- .maxRetryAttempts(config.maxRetryAttempts)
- .maxInboundMetadataSize(config.maxInboundMetadataSize.orElse(DEFAULT_MAX_HEADER_LIST_SIZE))
- .maxInboundMessageSize(config.maxInboundMessageSize.orElse(DEFAULT_MAX_MESSAGE_SIZE))
- .negotiationType(NegotiationType.valueOf(config.negotiationType.toUpperCase()));
+ ManagedChannelBuilder> builder;
+ if (provider != null) {
+ builder = provider.createChannelBuilder(config, target);
+ } else {
+ builder = NettyChannelBuilder
+ .forTarget(target)
+ // clients are intercepted using the IOThreadClientInterceptor interceptor which will decide on which
+ // thread the messages should be processed.
+ .directExecutor() // will use I/O thread - must not be blocked.
+ .offloadExecutor(Infrastructure.getDefaultExecutor())
+ .defaultLoadBalancingPolicy(loadBalancingPolicy)
+ .flowControlWindow(config.flowControlWindow.orElse(DEFAULT_FLOW_CONTROL_WINDOW))
+ .keepAliveWithoutCalls(config.keepAliveWithoutCalls)
+ .maxHedgedAttempts(config.maxHedgedAttempts)
+ .maxRetryAttempts(config.maxRetryAttempts)
+ .maxInboundMetadataSize(config.maxInboundMetadataSize.orElse(DEFAULT_MAX_HEADER_LIST_SIZE))
+ .maxInboundMessageSize(config.maxInboundMessageSize.orElse(DEFAULT_MAX_MESSAGE_SIZE))
+ .negotiationType(NegotiationType.valueOf(config.negotiationType.toUpperCase()));
+ }
if (config.retry) {
builder.enableRetry();
@@ -203,17 +218,19 @@ public static Channel createChannel(String name, Set perClientIntercepto
builder.keepAliveTimeout(idleTimeout.get().toMillis(), TimeUnit.MILLISECONDS);
}
- if (plainText) {
+ if (plainText && provider == null) {
builder.usePlaintext();
}
- if (context != null) {
- builder.sslContext(context);
+ if (context != null && (builder instanceof NettyChannelBuilder)) {
+ NettyChannelBuilder ncBuilder = (NettyChannelBuilder) builder;
+ ncBuilder.sslContext(context);
}
interceptorContainer.getSortedPerServiceInterceptors(perClientInterceptors).forEach(builder::intercept);
interceptorContainer.getSortedGlobalInterceptors().forEach(builder::intercept);
- LOGGER.info("Creating Netty gRPC channel ...");
+ LOGGER.info(String.format("Creating %s gRPC channel ...",
+ provider != null ? provider.channelInfo() : "Netty"));
return builder.build();
} else {
@@ -258,6 +275,7 @@ public static Channel createChannel(String name, Set perClientIntercepto
Vertx vertx = Arc.container().instance(Vertx.class).get();
io.vertx.grpc.client.GrpcClient client = io.vertx.grpc.client.GrpcClient.client(vertx, options);
Channel channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(port, host));
+ LOGGER.debugf("Target for client '%s': %s", name, host + ":" + port);
List interceptors = new ArrayList<>();
interceptors.addAll(interceptorContainer.getSortedPerServiceInterceptors(perClientInterceptors));
@@ -285,7 +303,7 @@ private static GrpcClientConfiguration testConfig(GrpcServerConfiguration server
config.maxInboundMetadataSize = OptionalInt.empty();
config.maxRetryAttempts = 0;
config.maxTraceEvents = OptionalInt.empty();
- config.nameResolver = GrpcClientConfiguration.DNS;
+ config.nameResolver = DNS;
config.negotiationType = "PLAINTEXT";
config.overrideAuthority = Optional.empty();
config.perRpcBufferLimit = OptionalLong.empty();
diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/spi/GrpcBuilderProvider.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/spi/GrpcBuilderProvider.java
new file mode 100644
index 00000000000000..08418262e9a6a1
--- /dev/null
+++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/spi/GrpcBuilderProvider.java
@@ -0,0 +1,134 @@
+package io.quarkus.grpc.spi;
+
+import java.util.ServiceLoader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.ServerBuilder;
+import io.quarkus.grpc.runtime.config.GrpcClientConfiguration;
+import io.quarkus.grpc.runtime.config.GrpcServerConfiguration;
+import io.quarkus.runtime.LaunchMode;
+import io.vertx.core.Vertx;
+
+/**
+ * Allow for additional types of gRPC server and channels to be used / built.
+ *
+ * This is an experimental SPI, subject to change.
+ */
+public interface GrpcBuilderProvider> {
+
+ Logger log = LoggerFactory.getLogger(GrpcBuilderProvider.class);
+
+ /**
+ * Find gRPC server builder provider.
+ *
+ * @param configuration the gRPC server configuration
+ * @return provider instance or null if none is provided
+ */
+ @SuppressWarnings("rawtypes")
+ static GrpcBuilderProvider findServerBuilderProvider(GrpcServerConfiguration configuration) {
+ GrpcBuilderProvider provider = null;
+ ServiceLoader providers = ServiceLoader.load(GrpcBuilderProvider.class);
+ for (GrpcBuilderProvider p : providers) {
+ if (p.providesServer(configuration)) {
+ if (provider != null) {
+ throw new IllegalArgumentException("Too many GrpcBuilderProviders enabled: " + providers);
+ }
+ log.info("Found server GrpcBuilderProvider: {}", p);
+ provider = p;
+ }
+ }
+ return provider;
+ }
+
+ /**
+ * Find gRPC client builder provider.
+ *
+ * @param configuration the gRPC client configuration
+ * @return provider instance or null if none is provided
+ */
+ @SuppressWarnings("rawtypes")
+ static GrpcBuilderProvider findChannelBuilderProvider(GrpcClientConfiguration configuration) {
+ GrpcBuilderProvider provider = null;
+ ServiceLoader providers = ServiceLoader.load(GrpcBuilderProvider.class);
+ for (GrpcBuilderProvider p : providers) {
+ if (p.providesChannel(configuration)) {
+ if (provider != null) {
+ throw new IllegalArgumentException("Too many GrpcBuilderProviders enabled: " + providers);
+ }
+ log.info("Found channel GrpcBuilderProvider: {}", p);
+ provider = p;
+ }
+ }
+ return provider;
+ }
+
+ /**
+ * Does this builder provider provide a new gRPC server instance.
+ *
+ * @param configuration the gRPC server configuration
+ * @return true if yes, false if no
+ */
+ boolean providesServer(GrpcServerConfiguration configuration);
+
+ /**
+ * Create initial server builder.
+ *
+ * @param vertx the Vertx instance
+ * @param configuration the gRPC server configuration
+ * @param launchMode current launch mode
+ * @return new ServerBuilder instance
+ */
+ ServerBuilder createServerBuilder(Vertx vertx, GrpcServerConfiguration configuration, LaunchMode launchMode);
+
+ /**
+ * Provide server info.
+ *
+ * @return simple server info
+ */
+ String serverInfo();
+
+ /**
+ * Does this builder provider provide a new gRPC channel instance.
+ *
+ * @param configuration the gRPC client configuration
+ * @return true if yes, false if no
+ */
+ boolean providesChannel(GrpcClientConfiguration configuration);
+
+ /**
+ * Get resolver.
+ *
+ * @return the resolver
+ */
+ String resolver();
+
+ /**
+ * Adjust host, if needed.
+ * By default, no adjustment is made.
+ *
+ * @param host the host
+ * @return adjusted host, if needed
+ */
+ default String adjustHost(String host) {
+ return host;
+ }
+
+ /**
+ * Create initial channel builder.
+ *
+ * @param configuration the gRPC client configuration
+ * @param target the channel target
+ * @return new ChannelBuilder
+ */
+ ManagedChannelBuilder> createChannelBuilder(GrpcClientConfiguration configuration, String target);
+
+ /**
+ * Provide channel info.
+ *
+ * @return simple channel info
+ */
+ String channelInfo();
+}
diff --git a/extensions/grpc/xds/pom.xml b/extensions/grpc/xds/pom.xml
new file mode 100644
index 00000000000000..ae52996721126f
--- /dev/null
+++ b/extensions/grpc/xds/pom.xml
@@ -0,0 +1,51 @@
+
+
+ 4.0.0
+
+ quarkus-grpc-parent
+ io.quarkus
+ 999-SNAPSHOT
+
+
+ quarkus-grpc-xds
+ Quarkus - gRPC - xDS
+ gRPC xDS support
+
+
+ io.quarkus
+ quarkus-grpc
+
+
+ io.grpc
+ grpc-xds
+
+
+ commons-logging
+ commons-logging
+
+
+ com.google.auto.value
+ auto-value-annotations
+
+
+ com.google.android
+ annotations
+
+
+ com.google.code.findbugs
+ jsr305
+
+
+ org.codehaus.mojo
+ animal-sniffer-annotations
+
+
+ org.checkerframework
+ checker-qual
+
+
+
+
+
diff --git a/extensions/grpc/xds/src/main/java/io/quarkus/grpc/xds/XdsGrpcServerBuilderProvider.java b/extensions/grpc/xds/src/main/java/io/quarkus/grpc/xds/XdsGrpcServerBuilderProvider.java
new file mode 100644
index 00000000000000..81c463c857a48d
--- /dev/null
+++ b/extensions/grpc/xds/src/main/java/io/quarkus/grpc/xds/XdsGrpcServerBuilderProvider.java
@@ -0,0 +1,98 @@
+package io.quarkus.grpc.xds;
+
+import static io.quarkus.grpc.runtime.config.GrpcClientConfiguration.XDS;
+
+import java.util.concurrent.Executor;
+
+import io.grpc.ChannelCredentials;
+import io.grpc.Grpc;
+import io.grpc.InsecureChannelCredentials;
+import io.grpc.InsecureServerCredentials;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.ServerBuilder;
+import io.grpc.ServerCredentials;
+import io.grpc.xds.XdsChannelCredentials;
+import io.grpc.xds.XdsServerBuilder;
+import io.grpc.xds.XdsServerCredentials;
+import io.quarkus.grpc.runtime.config.GrpcClientConfiguration;
+import io.quarkus.grpc.runtime.config.GrpcServerConfiguration;
+import io.quarkus.grpc.runtime.config.Xds;
+import io.quarkus.grpc.runtime.devmode.DevModeInterceptor;
+import io.quarkus.grpc.runtime.devmode.GrpcHotReplacementInterceptor;
+import io.quarkus.grpc.spi.GrpcBuilderProvider;
+import io.quarkus.runtime.LaunchMode;
+import io.vertx.core.Vertx;
+import io.vertx.core.impl.EventLoopContext;
+import io.vertx.core.impl.VertxInternal;
+
+public class XdsGrpcServerBuilderProvider implements GrpcBuilderProvider {
+ @Override
+ public boolean providesServer(GrpcServerConfiguration configuration) {
+ Xds xds = configuration.xds;
+ return xds != null && xds.enabled;
+ }
+
+ @Override
+ public ServerBuilder createServerBuilder(Vertx vertx, GrpcServerConfiguration configuration,
+ LaunchMode launchMode) {
+ Xds xds = configuration.xds;
+ int port = launchMode == LaunchMode.TEST ? configuration.testPort : configuration.port;
+ ServerCredentials credentials = InsecureServerCredentials.create();
+ if (xds.secure) {
+ credentials = XdsServerCredentials.create(credentials);
+ }
+ ServerBuilder builder = XdsServerBuilder.forPort(port, credentials);
+ // wrap with Vert.x context, so that the context interceptors work
+ VertxInternal vxi = (VertxInternal) vertx;
+ Executor delegate = vertx.nettyEventLoopGroup();
+ EventLoopContext context = vxi.createEventLoopContext();
+ Executor executor = command -> delegate.execute(() -> context.dispatch(command));
+ builder.executor(executor);
+ // custom XDS interceptors
+ if (launchMode == LaunchMode.DEVELOPMENT) {
+ builder.intercept(new DevModeInterceptor(Thread.currentThread().getContextClassLoader()));
+ builder.intercept(new GrpcHotReplacementInterceptor());
+ }
+ return builder;
+ }
+
+ @Override
+ public String serverInfo() {
+ return "xDS enabled";
+ }
+
+ @Override
+ public boolean providesChannel(GrpcClientConfiguration configuration) {
+ Xds xds = configuration.xds;
+ if (xds != null) {
+ return xds.enabled || XDS.equalsIgnoreCase(configuration.nameResolver);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String resolver() {
+ return XDS;
+ }
+
+ @Override
+ public String adjustHost(String host) {
+ return "/" + host;
+ }
+
+ @Override
+ public ManagedChannelBuilder> createChannelBuilder(GrpcClientConfiguration configuration, String target) {
+ Xds xds = configuration.xds;
+ ChannelCredentials credentials = InsecureChannelCredentials.create();
+ if (xds.secure) {
+ credentials = XdsChannelCredentials.create(credentials);
+ }
+ return Grpc.newChannelBuilder(target, credentials);
+ }
+
+ @Override
+ public String channelInfo() {
+ return "xDS";
+ }
+}
diff --git a/extensions/grpc/xds/src/main/resources/META-INF/services/io.quarkus.grpc.spi.GrpcBuilderProvider b/extensions/grpc/xds/src/main/resources/META-INF/services/io.quarkus.grpc.spi.GrpcBuilderProvider
new file mode 100644
index 00000000000000..d54d69a8d823e9
--- /dev/null
+++ b/extensions/grpc/xds/src/main/resources/META-INF/services/io.quarkus.grpc.spi.GrpcBuilderProvider
@@ -0,0 +1 @@
+io.quarkus.grpc.xds.XdsGrpcServerBuilderProvider
\ No newline at end of file
diff --git a/integration-tests/istio/disable-native-profile b/integration-tests/istio/disable-native-profile
new file mode 100644
index 00000000000000..011a7cc4571d5e
--- /dev/null
+++ b/integration-tests/istio/disable-native-profile
@@ -0,0 +1 @@
+This file disables the native profile in the parent pom.xml of this module.
\ No newline at end of file
diff --git a/integration-tests/istio/maven-invoker-way/pom.xml b/integration-tests/istio/maven-invoker-way/pom.xml
new file mode 100644
index 00000000000000..07b11a550bf7bf
--- /dev/null
+++ b/integration-tests/istio/maven-invoker-way/pom.xml
@@ -0,0 +1,346 @@
+
+
+ 4.0.0
+
+
+ io.quarkus
+ quarkus-integration-test-istio-parent
+ 999-SNAPSHOT
+
+
+ quarkus-integration-test-istio-invoker
+ Quarkus - Integration Tests - Istio - Invoker
+ Istio integration tests that need to use the maven invoker because they test various dependency related scenarios
+
+
+ ${skipTests}
+
+
+
+
+
+ io.quarkus
+ quarkus-container-image-docker
+ test
+
+
+ io.quarkus
+ quarkus-container-image-jib
+ test
+
+
+
+ io.quarkus
+ quarkus-test-maven
+ test
+
+
+ io.quarkus
+ quarkus-kubernetes
+ test
+
+
+ io.quarkus
+ quarkus-openshift
+ test
+
+
+ io.quarkus
+ quarkus-minikube
+ test
+
+
+ io.fabric8
+ kubernetes-httpclient-okhttp
+ test
+
+
+ io.dekorate
+ kubernetes-annotations
+ noapt
+
+
+ io.dekorate
+ openshift-annotations
+ noapt
+
+
+ io.dekorate
+ knative-annotations
+ noapt
+
+
+ io.quarkus
+ quarkus-resteasy
+ ${project.version}
+ test
+
+
+ *
+ *
+
+
+
+
+
+
+ io.quarkus
+ quarkus-maven-plugin
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-kubernetes-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-openshift-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-minikube-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-container-image-docker-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-container-image-jib-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-resteasy-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-junit5
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+
+
+
+ src/it
+ true
+
+
+
+
+ maven-invoker-plugin
+
+
+ integration-tests
+
+ install
+ run
+ verify
+
+
+
+
+ ${project.build.directory}/it
+ true
+ verify
+ true
+ ${skipTests}
+ true
+ invoker.properties
+
+
+
+ maven-failsafe-plugin
+
+
+
+ integration-test
+ verify
+
+
+
+
+
+
+
+
+
+ disable-tests
+
+
+ !e2e-tests
+
+
+
+ true
+ true
+
+
+
+ kubernetes-e2e-tests
+
+
+ kubernetes-e2e-tests
+
+
+
+ true
+
+
+
+
+ maven-invoker-plugin
+
+
+ xds-*/pom.xml
+
+ ${skipTests}
+
+
+ -Dinvalid-on-purpose
+ success
+
+
+
+
+
+
+
+ openshift-e2e-tests
+
+
+ openshift-e2e-tests
+
+
+
+ true
+
+
+
+
+ maven-invoker-plugin
+
+
+ xds-*/pom.xml
+
+ ${skipTests}
+
+
+ -Dinvalid-on-purpose
+ success
+
+
+
+
+
+
+
+ knative-e2e-tests
+
+
+ knative-e2e-tests
+
+
+
+ true
+
+
+
+
+ maven-invoker-plugin
+
+
+ xds-*/pom.xml
+
+ ${skipTests}
+
+
+ -Dinvalid-on-purpose
+ success
+
+
+
+
+
+
+
+
+
+ windows
+
+
+
+ true
+
+
+
+
diff --git a/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/invoker.properties b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/invoker.properties
new file mode 100644
index 00000000000000..ecfda713b3e099
--- /dev/null
+++ b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/invoker.properties
@@ -0,0 +1,5 @@
+# invoker.goals=clean package -Dquarkus.container.build=true -Dquarkus.package.type=native
+# ensure that an attempt to deploy is made, but that the attempt fails (as we don't want to deploy this test application to a cluster that the runner of test may have configured)
+invoker.goals=clean package -Dquarkus.kubernetes.deploy=true ${kubernetes-client-master-url}
+# expect a failure since there is no Kubernetes cluster to deploy to
+invoker.buildResult = ${build-result}
diff --git a/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/pom.xml b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/pom.xml
new file mode 100644
index 00000000000000..844e6b1e012003
--- /dev/null
+++ b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/pom.xml
@@ -0,0 +1,124 @@
+
+
+ 4.0.0
+ org.acme
+ xds-grpc
+ 0.1-SNAPSHOT
+
+ UTF-8
+ 3.0.0-M7
+ 11
+ UTF-8
+ 11
+
+
+
+
+ io.quarkus
+ quarkus-bom
+ @project.version@
+ pom
+ import
+
+
+
+
+
+ io.quarkus
+ quarkus-grpc-xds
+
+
+ io.quarkus
+ quarkus-resteasy
+
+
+ io.quarkus
+ quarkus-minikube
+
+
+ io.quarkus
+ quarkus-container-image-jib
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+
+
+
+ io.quarkus
+ quarkus-maven-plugin
+ @project.version@
+
+
+
+ generate-code
+ build
+
+
+
+
+
+ maven-surefire-plugin
+ ${surefire-plugin.version}
+
+
+ org.jboss.logmanager.LogManager
+ ${maven.home}
+
+
+
+
+
+
+
+ native
+
+
+ native
+
+
+
+ native
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ ${native.surefire.skip}
+
+
+
+ maven-failsafe-plugin
+ ${surefire-plugin.version}
+
+
+
+ integration-test
+ verify
+
+
+
+ ${project.build.directory}/${project.build.finalName}-runner
+ org.jboss.logmanager.LogManager
+ ${maven.home}
+
+
+
+
+
+
+
+
+
+
diff --git a/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/java/org/acme/HelloEndpoint.java b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/java/org/acme/HelloEndpoint.java
new file mode 100644
index 00000000000000..32d57f630205b2
--- /dev/null
+++ b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/java/org/acme/HelloEndpoint.java
@@ -0,0 +1,37 @@
+package org.acme;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import examples.GreeterGrpc;
+import examples.HelloReply;
+import examples.HelloRequest;
+import io.grpc.StatusRuntimeException;
+import io.quarkus.grpc.GrpcClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Path("/hello")
+public class HelloEndpoint {
+ private static final Logger log = LoggerFactory.getLogger(HelloEndpoint.class);
+
+ @GrpcClient
+ GreeterGrpc.GreeterBlockingStub stub;
+
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public String hello() {
+ HelloRequest request = HelloRequest.newBuilder().setName("XDS gRPC").build();
+ HelloReply response;
+ try {
+ response = stub.sayHello(request);
+ } catch (StatusRuntimeException e) {
+ String msg = "RPC failed: " + e.getStatus();
+ log.warn(msg);
+ return msg;
+ }
+ return response.getMessage();
+ }
+}
\ No newline at end of file
diff --git a/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/java/org/acme/HelloService.java b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/java/org/acme/HelloService.java
new file mode 100644
index 00000000000000..4f86355a826ffa
--- /dev/null
+++ b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/java/org/acme/HelloService.java
@@ -0,0 +1,16 @@
+package org.acme;
+
+import examples.GreeterGrpc;
+import examples.HelloReply;
+import examples.HelloRequest;
+import io.grpc.stub.StreamObserver;
+import io.quarkus.grpc.GrpcService;
+
+@GrpcService
+public class HelloService extends GreeterGrpc.GreeterImplBase {
+ @Override
+ public void sayHello(HelloRequest request, StreamObserver responseObserver) {
+ responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
+ responseObserver.onCompleted();
+ }
+}
diff --git a/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/proto/helloworld.proto b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/proto/helloworld.proto
new file mode 100644
index 00000000000000..5e400c9d4549ca
--- /dev/null
+++ b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/proto/helloworld.proto
@@ -0,0 +1,53 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "examples";
+option java_outer_classname = "HelloWorldProto";
+option objc_class_prefix = "HLW";
+
+package helloworld;
+
+// The greeting service definition.
+service Greeter {
+ // Sends a greeting
+ rpc SayHello (HelloRequest) returns (HelloReply) {}
+}
+
+// The request message containing the user's name.
+message HelloRequest {
+ string name = 1;
+}
+
+// The response message containing the greetings
+message HelloReply {
+ string message = 1;
+}
diff --git a/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/resources/application.properties b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/resources/application.properties
new file mode 100644
index 00000000000000..0230b4c1ae18dc
--- /dev/null
+++ b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/resources/application.properties
@@ -0,0 +1,18 @@
+# Configuration file
+# key = value
+quarkus.container-image.registry=docker.io
+quarkus.kubernetes-client.trust-certs=true
+
+# XDS
+quarkus.grpc.server.xds.enabled=true
+quarkus.grpc.server.port=30051
+
+quarkus.grpc.clients.stub.name-resolver=xds
+quarkus.grpc.clients.stub.host=xds-grpc
+quarkus.grpc.clients.stub.port=30051
+
+# K8s
+quarkus.kubernetes.deployment-target=minikube
+quarkus.kubernetes.ports.grpc.container-port=30051
+quarkus.kubernetes.annotations."inject.istio.io/templates"=grpc-agent
+quarkus.kubernetes.annotations."proxy.istio.io/config"={"holdApplicationUntilProxyStarts": true}
diff --git a/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/verify.groovy b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/verify.groovy
new file mode 100644
index 00000000000000..a350ee928eb6c1
--- /dev/null
+++ b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/verify.groovy
@@ -0,0 +1,63 @@
+import io.dekorate.utils.Serialization
+import io.fabric8.kubernetes.api.model.KubernetesList
+import io.fabric8.kubernetes.api.model.apps.Deployment
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.KubernetesClientBuilder
+import io.fabric8.kubernetes.client.LocalPortForward
+import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory
+
+//Check that file exits
+String base = basedir
+File kubernetesYml = new File(base, "target/kubernetes/minikube.yml")
+assert kubernetesYml.exists()
+
+// Workaround CNFE at shutdown
+this.class.getClassLoader().loadClass("io.fabric8.kubernetes.client.KubernetesClientException\$RequestMetadata")
+
+kubernetesYml.withInputStream { stream ->
+ //Check that its parse-able
+ KubernetesList list = Serialization.unmarshalAsList(stream)
+ assert list != null
+
+ Deployment deployment = list.items.find { r -> r.kind == "Deployment" }
+
+ //Check that ti contains a Deployment named after the project
+ assert deployment != null
+ assert deployment.metadata.name == "xds-grpc"
+
+ try (KubernetesClient client = new KubernetesClientBuilder()
+ .withHttpClientFactory(new OkHttpClientFactory())
+ .build()) {
+ try (LocalPortForward p = client.services().withName("xds-grpc").portForward(8080)) {
+ URL url = new URL(String.format("http://localhost:%s/hello", p.localPort));
+
+ int tries = 10;
+ String response = null;
+ while (response == null && tries > 0) {
+ try {
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("GET");
+ int responseCode = connection.getResponseCode();
+ if (responseCode == HttpURLConnection.HTTP_OK) {
+ try (BufferedReader input = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
+ String line;
+ StringBuffer buffer = new StringBuffer();
+ while ((line = input.readLine()) != null) {
+ buffer.append(line);
+ }
+ response = buffer.toString();
+ }
+ break;
+ }
+ } catch (Exception e) {
+ System.err.print("Error: ")
+ System.err.println(e.getMessage());
+ }
+ Thread.sleep(6_000); // 6sec
+ tries--;
+ }
+ assert tries > 0
+ assert response != null && response == "Hello XDS gRPC"
+ }
+ }
+}
\ No newline at end of file
diff --git a/integration-tests/istio/pom.xml b/integration-tests/istio/pom.xml
new file mode 100644
index 00000000000000..dfae51b7a32418
--- /dev/null
+++ b/integration-tests/istio/pom.xml
@@ -0,0 +1,19 @@
+
+
+ 4.0.0
+
+ quarkus-integration-tests-parent
+ io.quarkus
+ 999-SNAPSHOT
+
+
+ quarkus-integration-test-istio-parent
+ Quarkus - Integration Tests - Istio - Parent
+ pom
+
+
+ maven-invoker-way
+
+
+
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 559d728b5ef371..500c970a53a1f5 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -341,6 +341,7 @@
grpc-stork-response-time
google-cloud-functions-http
google-cloud-functions
+ istio