diff --git a/.github/workflows/ci-istio.yml b/.github/workflows/ci-istio.yml new file mode 100644 index 00000000000000..2da3c22c39970a --- /dev/null +++ b/.github/workflows/ci-istio.yml @@ -0,0 +1,97 @@ +name: Quarkus CI - Istio + +on: + workflow_dispatch: + schedule: + # 2am every weekday + saturday + - cron: '0 2 * * 1-6' + +env: + MAVEN_ARGS: -B -e + +jobs: + cache: + name: Build and save artifacts + runs-on: ubuntu-latest + if: "github.repository == 'quarkusio/quarkus' || github.event_name == 'workflow_dispatch'" + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-java@v3 + with: + distribution: 'temurin' + java-version: '17' + - name: Install artifacts + run: ./mvnw ${MAVEN_ARGS} -DskipTests -DskipITs -Dinvoker.skip clean install -pl :quarkus-integration-test-istio-invoker -am + - name: Tar Maven repository + shell: bash + run: tar -I 'pigz -9' -cf maven-repo.tgz -C ~ .m2/repository + - name: Persist Maven repository + uses: actions/upload-artifact@v3 + with: + name: maven-repo + path: maven-repo.tgz + retention-days: 1 + + kubernetes: + name: Istio + Kubernetes Integration Tests + needs: cache + runs-on: ubuntu-latest + if: "github.repository == 'quarkusio/quarkus' || github.event_name == 'workflow_dispatch'" + strategy: + fail-fast: false + matrix: + kubernetes: [v1.20.1] + steps: + - name: Checkout + uses: actions/checkout@v3 + - uses: actions/setup-java@v3 + with: + distribution: 'temurin' + java-version: '17' + - name: Download Maven repository + uses: actions/download-artifact@v3 + with: + name: maven-repo + path: . + - name: Extract Maven repository + shell: bash + run: tar -xzf maven-repo.tgz -C ~ + - name: Set up Minikube-Kubernetes + uses: manusa/actions-setup-minikube@v2.7.1 + with: + minikube version: v1.16.0 + kubernetes version: ${{ matrix.kubernetes }} + github token: ${{ secrets.GITHUB_TOKEN }} + start args: '--addons=metrics-server --force' + - name: Quay login + uses: docker/login-action@v2 + with: + registry: quay.io + username: ${{ secrets.QUAY_QUARKUSCI_USERNAME }} + password: ${{ secrets.QUAY_QUARKUSCI_PASSWORD }} + - name: Get kubeconfig + id: kubeconfig + run: a="$(cat ~/.kube/config)"; a="${a//'%'/'%25'}"; a="${a//$'\n'/'%0A'}"; a="${a//$'\r'/'%0D'}"; echo "::set-output name=config::$a" + - name: Install Istio + uses: huang195/actions-install-istio@v1.0.0 + with: + kubeconfig: "${{steps.kubeconfig.outputs.config}}" + istio version: '1.15.2' + - name: Run Istio Invoker Tests + run: | + export QUARKUS_CONTAINER_IMAGE_GROUP=quarkustesting + export QUARKUS_CONTAINER_IMAGE_TAG=${{ github.sha }} + export QUARKUS_CONTAINER_IMAGE_REGISTRY=quay.io + ./mvnw ${MAVEN_ARGS} clean install -pl :quarkus-integration-test-istio-invoker -De2e-tests -Dkubernetes-e2e-tests + - name: Report status + if: "always() && github.repository == 'quarkusio/quarkus'" + shell: bash + run: | + curl -Ls https://sh.jbang.dev | bash -s - app setup + ~/.jbang/bin/jbang .github/NativeBuildReport.java \ + issueNumber=29536 \ + runId=${{ github.run_id }} \ + status=${{ job.status }} \ + token=${{ secrets.GITHUB_API_TOKEN }} \ + issueRepo=${{ github.repository }} \ + thisRepo=${{ github.repository }} diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 5a43773497c89d..c57f07252da2ab 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -1909,6 +1909,11 @@ + + io.quarkus + quarkus-grpc-xds + ${project.version} + io.quarkus quarkus-grpc-api diff --git a/extensions/grpc/pom.xml b/extensions/grpc/pom.xml index 27cb1447a3e968..1d41c8feb45663 100644 --- a/extensions/grpc/pom.xml +++ b/extensions/grpc/pom.xml @@ -20,5 +20,6 @@ stubs deployment runtime + xds \ No newline at end of file diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java index 307945967420dc..6b597b4b7b2e92 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java @@ -22,9 +22,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; import java.util.regex.Pattern; import javax.enterprise.inject.Instance; @@ -33,9 +30,8 @@ import grpc.health.v1.HealthOuterClass; import io.grpc.BindableService; -import io.grpc.Metadata; -import io.grpc.ServerCall; -import io.grpc.ServerCallHandler; +import io.grpc.Server; +import io.grpc.ServerBuilder; import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptors; import io.grpc.ServerMethodDefinition; @@ -45,26 +41,25 @@ import io.quarkus.grpc.runtime.config.GrpcConfiguration; import io.quarkus.grpc.runtime.config.GrpcServerConfiguration; import io.quarkus.grpc.runtime.config.GrpcServerNettyConfig; +import io.quarkus.grpc.runtime.devmode.DevModeInterceptor; import io.quarkus.grpc.runtime.devmode.GrpcHotReplacementInterceptor; import io.quarkus.grpc.runtime.devmode.GrpcServerReloader; import io.quarkus.grpc.runtime.health.GrpcHealthStorage; import io.quarkus.grpc.runtime.reflection.ReflectionService; import io.quarkus.grpc.runtime.supports.CompressionInterceptor; import io.quarkus.grpc.runtime.supports.blocking.BlockingServerInterceptor; +import io.quarkus.grpc.spi.GrpcBuilderProvider; import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.QuarkusBindException; import io.quarkus.runtime.RuntimeValue; import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; -import io.quarkus.runtime.configuration.ProfileManager; import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; import io.vertx.core.DeploymentOptions; import io.vertx.core.Handler; import io.vertx.core.Promise; -import io.vertx.core.Verticle; import io.vertx.core.Vertx; -import io.vertx.core.http.HttpServerOptions; import io.vertx.ext.web.Route; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; @@ -103,6 +98,7 @@ public void initializeGrpcServer(RuntimeValue vertxSupplier, } GrpcServerConfiguration configuration = cfg.server; + GrpcBuilderProvider provider = GrpcBuilderProvider.findServerBuilderProvider(configuration); if (configuration.useSeparateServer) { LOGGER.warn( @@ -112,13 +108,14 @@ public void initializeGrpcServer(RuntimeValue vertxSupplier, if (launchMode == LaunchMode.DEVELOPMENT) { // start single server, not in a verticle, regardless of the configuration.instances // for reason unknown to me, verticles occasionally get undeployed on dev mode reload - if (GrpcServerReloader.getServer() == null) { - devModeStart(grpcContainer, vertx, configuration, blockingMethodsPerService, shutdown, launchMode); + if (GrpcServerReloader.getServer() != null || (provider != null && provider.serverAlreadyExists())) { + devModeReload(grpcContainer, vertx, configuration, provider, blockingMethodsPerService, shutdown); } else { - devModeReload(grpcContainer, vertx, configuration, blockingMethodsPerService, shutdown); + devModeStart(grpcContainer, vertx, configuration, provider, blockingMethodsPerService, shutdown, + launchMode); } } else { - prodStart(grpcContainer, vertx, configuration, blockingMethodsPerService, launchMode); + prodStart(grpcContainer, vertx, configuration, provider, blockingMethodsPerService, launchMode); } } else { buildGrpcServer(vertx, configuration, routerSupplier, shutdown, blockingMethodsPerService, grpcContainer, @@ -126,6 +123,7 @@ public void initializeGrpcServer(RuntimeValue vertxSupplier, } } + // TODO -- handle XDS private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration, RuntimeValue routerSupplier, ShutdownContext shutdown, Map> blockingMethodsPerService, GrpcContainer grpcContainer, LaunchMode launchMode) { @@ -134,7 +132,9 @@ private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration, List globalInterceptors = grpcContainer.getSortedGlobalInterceptors(); if (launchMode == LaunchMode.DEVELOPMENT) { - globalInterceptors.add(0, new DevModeInterceptor(Thread.currentThread().getContextClassLoader())); // add as first + // add as last, so they will run first + globalInterceptors.add(new DevModeInterceptor(Thread.currentThread().getContextClassLoader())); + globalInterceptors.add(new GrpcHotReplacementInterceptor()); } List toBeRegistered = collectServiceDefinitions(grpcContainer.getServices()); @@ -153,8 +153,7 @@ private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration, definitions.add(service.definition); } - boolean reflectionServiceEnabled = configuration.enableReflectionService - || ProfileManager.getLaunchMode() == LaunchMode.DEVELOPMENT; + boolean reflectionServiceEnabled = configuration.enableReflectionService || launchMode == LaunchMode.DEVELOPMENT; if (reflectionServiceEnabled) { LOGGER.info("Registering gRPC reflection service"); @@ -164,6 +163,8 @@ private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration, bridge.bind(server); } + initHealthStorage(); + LOGGER.info("Starting new Vert.x gRPC server ..."); Route route = routerSupplier.getValue().route().handler(ctx -> { if (!isGrpc(ctx)) { @@ -182,27 +183,19 @@ private static boolean isGrpc(RoutingContext rc) { } private void prodStart(GrpcContainer grpcContainer, Vertx vertx, GrpcServerConfiguration configuration, - Map> blockingMethodsPerService, LaunchMode launchMode) { + GrpcBuilderProvider provider, Map> blockingMethodsPerService, LaunchMode launchMode) { CompletableFuture startResult = new CompletableFuture<>(); vertx.deployVerticle( - new Supplier() { - @Override - public Verticle get() { - return new GrpcServerVerticle(configuration, grpcContainer, launchMode, blockingMethodsPerService); - } - }, + () -> new GrpcServerVerticle(configuration, grpcContainer, launchMode, blockingMethodsPerService), new DeploymentOptions().setInstances(configuration.instances), - new Handler>() { - @Override - public void handle(AsyncResult result) { - if (result.failed()) { - startResult.completeExceptionally(result.cause()); - } else { - GrpcServerRecorder.this.postStartup(configuration, launchMode == LaunchMode.TEST); + result -> { + if (result.failed()) { + startResult.completeExceptionally(result.cause()); + } else { + GrpcServerRecorder.this.postStartup(configuration, provider, launchMode == LaunchMode.TEST); - startResult.complete(null); - } + startResult.complete(null); } }); @@ -218,10 +211,12 @@ public void handle(AsyncResult result) { } } - private void postStartup(GrpcServerConfiguration configuration, boolean test) { + private void postStartup(GrpcServerConfiguration configuration, GrpcBuilderProvider provider, boolean test) { initHealthStorage(); - LOGGER.infof("gRPC Server started on %s:%d [SSL enabled: %s]", - configuration.host, test ? configuration.testPort : configuration.port, !configuration.plainText); + LOGGER.infof("gRPC Server started on %s:%d [%s]", + configuration.host, + test ? configuration.testPort : configuration.port, + provider != null ? provider.serverInfo() : "SSL enabled: " + !configuration.plainText); } private void initHealthStorage() { @@ -235,52 +230,58 @@ private void initHealthStorage() { } private void devModeStart(GrpcContainer grpcContainer, Vertx vertx, GrpcServerConfiguration configuration, - Map> blockingMethodsPerService, ShutdownContext shutdown, LaunchMode launchMode) { - CompletableFuture future = new CompletableFuture<>(); - - devModeWrapper = new DevModeWrapper(Thread.currentThread().getContextClassLoader()); + GrpcBuilderProvider provider, Map> blockingMethodsPerService, ShutdownContext shutdown, + LaunchMode launchMode) { - Map.Entry portToServer = buildServer(vertx, configuration, + Map.Entry portToServer = buildServer(vertx, configuration, blockingMethodsPerService, grpcContainer, launchMode); - VertxServer vertxServer = portToServer.getValue() - .start(new Handler<>() { // NOSONAR - @Override - public void handle(AsyncResult ar) { - if (ar.failed()) { - Throwable effectiveCause = getEffectiveThrowable(ar, portToServer); - if (effectiveCause instanceof QuarkusBindException) { - LOGGER.error("Unable to start the gRPC server"); - } else { - LOGGER.error("Unable to start the gRPC server", effectiveCause); - } - future.completeExceptionally(effectiveCause); - } else { - postStartup(configuration, false); - future.complete(true); - grpcVerticleCount.incrementAndGet(); - } - } - }); - try { - future.get(1, TimeUnit.MINUTES); - } catch (TimeoutException e) { - LOGGER.error("Failed to start grpc server in time", e); - } catch (ExecutionException e) { - throw new RuntimeException("grpc server start failed", e); - } catch (InterruptedException e) { - LOGGER.warn("Waiting for grpc server start interrupted", e); - Thread.currentThread().interrupt(); - } + Server server = portToServer.getValue(); + if (provider == null) { + CompletableFuture future = new CompletableFuture<>(); + + devModeWrapper = new DevModeWrapper(Thread.currentThread().getContextClassLoader()); - GrpcServerReloader.init(vertxServer); - shutdown.addShutdownTask( - new Runnable() { // NOSONAR - @Override - public void run() { - GrpcServerReloader.reset(); + VertxServer vertxServer = (VertxServer) server; + vertxServer.start(ar -> { + if (ar.failed()) { + Throwable effectiveCause = getEffectiveThrowable(ar, portToServer); + if (effectiveCause instanceof QuarkusBindException) { + LOGGER.error("Unable to start the gRPC server"); + } else { + LOGGER.error("Unable to start the gRPC server", effectiveCause); } - }); + future.completeExceptionally(effectiveCause); + } else { + postStartup(configuration, provider, false); + future.complete(true); + grpcVerticleCount.incrementAndGet(); + } + }); + + try { + future.get(1, TimeUnit.MINUTES); + } catch (TimeoutException e) { + LOGGER.error("Failed to start grpc server in time", e); + } catch (ExecutionException e) { + throw new RuntimeException("grpc server start failed", e); + } catch (InterruptedException e) { + LOGGER.warn("Waiting for grpc server start interrupted", e); + Thread.currentThread().interrupt(); + } + + GrpcServerReloader.init(vertxServer); + shutdown.addShutdownTask(GrpcServerReloader::reset); + } else { + try { + provider.startServer(server); + } catch (Exception e) { + LOGGER.error("Unable to start the gRPC server", e); + throw new IllegalStateException(e); + } + postStartup(configuration, provider, false); + provider.postStartup(server, shutdown); + } } private void applyNettySettings(GrpcServerConfiguration configuration, VertxServerBuilder builder) { @@ -291,23 +292,14 @@ private void applyNettySettings(GrpcServerConfiguration configuration, VertxServ } } - private void applyTransportSecurityConfig(GrpcServerConfiguration configuration, VertxServerBuilder builder) { + @SuppressWarnings("rawtypes") + private void applyTransportSecurityConfig(GrpcServerConfiguration configuration, ServerBuilder builder) { if (configuration.transportSecurity != null) { File cert = configuration.transportSecurity.certificate - .map(new Function() { // NOSONAR - @Override - public File apply(String pathname) { - return new File(pathname); - } - }) + .map(File::new) .orElse(null); File key = configuration.transportSecurity.key - .map(new Function() { // NOSONAR - @Override - public File apply(String pathname) { - return new File(pathname); - } - }) + .map(File::new) .orElse(null); if (cert != null || key != null) { builder.useTransportSecurity(cert, key); @@ -334,7 +326,7 @@ private static List collectServiceDefinitions(Instance ar, Map.Entry portToServer) { + private Throwable getEffectiveThrowable(AsyncResult ar, Map.Entry portToServer) { Throwable effectiveCause = ar.cause(); while (effectiveCause.getCause() != null) { effectiveCause = effectiveCause.getCause(); @@ -365,7 +357,7 @@ public String getImplementationClassName() { } private void devModeReload(GrpcContainer grpcContainer, Vertx vertx, GrpcServerConfiguration configuration, - Map> blockingMethodsPerService, ShutdownContext shutdown) { + GrpcBuilderProvider provider, Map> blockingMethodsPerService, ShutdownContext shutdown) { List services = collectServiceDefinitions(grpcContainer.getServices()); List definitions = new ArrayList<>(); @@ -392,19 +384,20 @@ private void devModeReload(GrpcContainer grpcContainer, Vertx vertx, GrpcServerC methods.put(method.getMethodDescriptor().getFullMethodName(), method); } } - devModeWrapper = new DevModeWrapper(Thread.currentThread().getContextClassLoader()); initHealthStorage(); - GrpcServerReloader.reinitialize(servicesWithInterceptors, methods, grpcContainer.getSortedGlobalInterceptors()); + List globalInterceptors = grpcContainer.getSortedGlobalInterceptors(); - shutdown.addShutdownTask( - new Runnable() { // NOSONAR - @Override - public void run() { - GrpcServerReloader.reset(); - } - }); + if (provider != null) { + globalInterceptors.add(new DevModeInterceptor(Thread.currentThread().getContextClassLoader())); + globalInterceptors.add(new GrpcHotReplacementInterceptor()); + provider.devModeReload(servicesWithInterceptors, methods, globalInterceptors, shutdown); + } else { + devModeWrapper = new DevModeWrapper(Thread.currentThread().getContextClassLoader()); + GrpcServerReloader.reinitialize(servicesWithInterceptors, methods, globalInterceptors); + shutdown.addShutdownTask(GrpcServerReloader::reset); + } } public static int getVerticleCount() { @@ -417,22 +410,38 @@ public RuntimeValue initServerInterceptorStorage( return new RuntimeValue<>(new ServerInterceptorStorage(perServiceInterceptors, globalInterceptors)); } - private Map.Entry buildServer(Vertx vertx, GrpcServerConfiguration configuration, + @SuppressWarnings("rawtypes") + private Map.Entry buildServer(Vertx vertx, GrpcServerConfiguration configuration, Map> blockingMethodsPerService, GrpcContainer grpcContainer, LaunchMode launchMode) { + int port = launchMode == LaunchMode.TEST ? configuration.testPort : configuration.port; - VertxServerBuilder builder = VertxServerBuilder.forAddress(vertx, configuration.host, port); AtomicBoolean usePlainText = new AtomicBoolean(); - builder.useSsl(new Handler() { // NOSONAR - @Override - public void handle(HttpServerOptions options) { + + GrpcBuilderProvider provider = GrpcBuilderProvider.findServerBuilderProvider(configuration); + + ServerBuilder builder; + if (provider != null) { + builder = provider.createServerBuilder(vertx, configuration, launchMode); + } else { + VertxServerBuilder vsBuilder = VertxServerBuilder.forAddress(vertx, configuration.host, port); + // add Vert.x specific stuff here + vsBuilder.useSsl(options -> { try { usePlainText.set(applySslOptions(configuration, options)); } catch (IOException e) { throw new UncheckedIOException(e); } + }); + applyNettySettings(configuration, vsBuilder); + if (launchMode == LaunchMode.DEVELOPMENT) { + vsBuilder.commandDecorator(command -> vertx.executeBlocking( + event -> event.complete(GrpcHotReplacementInterceptor.fire()), + false, + (Handler>) result -> devModeWrapper.run(command))); } - }); + builder = vsBuilder; + } if (configuration.maxInboundMessageSize.isPresent()) { builder.maxInboundMessageSize(configuration.maxInboundMessageSize.getAsInt()); @@ -443,16 +452,11 @@ public void handle(HttpServerOptions options) { } Optional handshakeTimeout = configuration.handshakeTimeout; - if (handshakeTimeout.isPresent()) { - builder.handshakeTimeout(handshakeTimeout.get().toMillis(), TimeUnit.MILLISECONDS); - } + handshakeTimeout.ifPresent(duration -> builder.handshakeTimeout(duration.toMillis(), TimeUnit.MILLISECONDS)); applyTransportSecurityConfig(configuration, builder); - applyNettySettings(configuration, builder); - - boolean reflectionServiceEnabled = configuration.enableReflectionService - || ProfileManager.getLaunchMode() == LaunchMode.DEVELOPMENT; + boolean reflectionServiceEnabled = configuration.enableReflectionService || launchMode == LaunchMode.DEVELOPMENT; List toBeRegistered = collectServiceDefinitions(grpcContainer.getServices()); List definitions = new ArrayList<>(); @@ -475,30 +479,9 @@ public void handle(HttpServerOptions options) { builder.intercept(serverInterceptor); } - if (launchMode == LaunchMode.DEVELOPMENT) { - builder.commandDecorator(new Consumer() { - @Override - public void accept(Runnable command) { - vertx.executeBlocking(new Handler>() { - @Override - public void handle(Promise event) { - event.complete(GrpcHotReplacementInterceptor.fire()); - } - }, - false, - new Handler>() { - @Override - public void handle(AsyncResult result) { - devModeWrapper.run(command); - } - }); - } - }); - } - - LOGGER.debugf("Starting gRPC Server on %s:%d [SSL enabled: %s]...", + LOGGER.debugf("Starting gRPC Server on %s:%d [%s] ...", configuration.host, port, - !usePlainText.get()); + provider != null ? provider.serverInfo() : "SSL enabled: " + !usePlainText.get()); return new AbstractMap.SimpleEntry<>(port, builder.build()); } @@ -543,7 +526,7 @@ private class GrpcServerVerticle extends AbstractVerticle { private final LaunchMode launchMode; private final Map> blockingMethodsPerService; - private VertxServer grpcServer; + private Server grpcServer; GrpcServerVerticle(GrpcServerConfiguration configuration, GrpcContainer grpcContainer, LaunchMode launchMode, Map> blockingMethodsPerService) { @@ -560,46 +543,72 @@ public void start(Promise startPromise) { "Unable to find bean exposing the `BindableService` interface - not starting the gRPC server"); return; } - Map.Entry portToServer = buildServer(getVertx(), configuration, + Map.Entry portToServer = buildServer(getVertx(), configuration, blockingMethodsPerService, grpcContainer, launchMode); - grpcServer = portToServer.getValue() - .start(new Handler<>() { // NOSONAR - @Override - public void handle(AsyncResult ar) { - if (ar.failed()) { - Throwable effectiveCause = getEffectiveThrowable(ar, portToServer); - if (effectiveCause instanceof QuarkusBindException) { - LOGGER.error("Unable to start the gRPC server"); - } else { - LOGGER.error("Unable to start the gRPC server", effectiveCause); - } - startPromise.fail(effectiveCause); - } else { - startPromise.complete(); - grpcVerticleCount.incrementAndGet(); - } + + grpcServer = portToServer.getValue(); + if (grpcServer instanceof VertxServer) { + VertxServer server = (VertxServer) grpcServer; + server.start(ar -> { + if (ar.failed()) { + Throwable effectiveCause = getEffectiveThrowable(ar, portToServer); + if (effectiveCause instanceof QuarkusBindException) { + LOGGER.error("Unable to start the gRPC server"); + } else { + LOGGER.error("Unable to start the gRPC server", effectiveCause); } - }); + startPromise.fail(effectiveCause); + } else { + startPromise.complete(); + grpcVerticleCount.incrementAndGet(); + } + }); + } else { + // XDS server blocks on initialStartFuture + vertx.executeBlocking((Handler>) event -> { + try { + grpcServer.start(); + startPromise.complete(); + } catch (Exception e) { + LOGGER.error("Unable to start gRPC server", e); + startPromise.fail(e); + } + }); + } } @Override public void stop(Promise stopPromise) { - grpcServer.shutdown(new Handler>() { // NOSONAR - @Override - public void handle(AsyncResult ar) { + if (grpcServer instanceof VertxServer) { + VertxServer server = (VertxServer) grpcServer; + server.shutdown(ar -> { if (ar.failed()) { - LOGGER.errorf(ar.cause(), "Unable to stop the gRPC server gracefully"); + Throwable cause = ar.cause(); + LOGGER.errorf(cause, "Unable to stop the gRPC server gracefully"); + stopPromise.fail(cause); } else { LOGGER.debug("gRPC Server stopped"); stopPromise.complete(); grpcVerticleCount.decrementAndGet(); } + }); + } else { + try { + grpcServer.shutdownNow().awaitTermination(10, TimeUnit.SECONDS); + stopPromise.complete(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + stopPromise.fail(e); + throw new IllegalStateException(e); + } catch (Exception e) { + LOGGER.errorf(e, "Unable to stop the gRPC server gracefully"); + stopPromise.fail(e); } - }); + } } } - private class DevModeWrapper { + private static class DevModeWrapper { private final ClassLoader classLoader; public DevModeWrapper(ClassLoader contextClassLoader) { @@ -616,24 +625,4 @@ public void run(Runnable command) { } } } - - private class DevModeInterceptor implements ServerInterceptor { - private final ClassLoader classLoader; - - public DevModeInterceptor(ClassLoader contextClassLoader) { - classLoader = contextClassLoader; - } - - @Override - public ServerCall.Listener interceptCall(ServerCall serverCall, Metadata metadata, - ServerCallHandler next) { - ClassLoader originalTccl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(classLoader); - try { - return next.startCall(serverCall, metadata); - } finally { - Thread.currentThread().setContextClassLoader(originalTccl); - } - } - } } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/ClientXds.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/ClientXds.java new file mode 100644 index 00000000000000..d6feecdd2b06e6 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/ClientXds.java @@ -0,0 +1,19 @@ +package io.quarkus.grpc.runtime.config; + +import java.util.Optional; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.quarkus.runtime.annotations.ConfigItem; + +/** + * Client XDS config + * * XDS usage + */ +@ConfigGroup +public class ClientXds extends Xds { + /** + * Optional explicit target. + */ + @ConfigItem + public Optional target; +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcClientConfiguration.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcClientConfiguration.java index f20c2a815e19cf..087a563ef8b804 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcClientConfiguration.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcClientConfiguration.java @@ -13,6 +13,7 @@ public class GrpcClientConfiguration { public static final String DNS = "dns"; + public static final String XDS = "xds"; /** * Use new Vert.x gRPC client support. @@ -21,6 +22,12 @@ public class GrpcClientConfiguration { @ConfigItem(defaultValue = "false") public boolean useQuarkusGrpcClient; + /** + * Configure XDS usage, if enabled. + */ + @ConfigItem + public ClientXds xds; + /** * The gRPC service port. */ diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcServerConfiguration.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcServerConfiguration.java index 2389fbcfdb06e9..ae569dbd5f26d3 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcServerConfiguration.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcServerConfiguration.java @@ -19,6 +19,12 @@ public class GrpcServerConfiguration { @ConfigItem(defaultValue = "true") public boolean useSeparateServer; + /** + * Configure XDS usage, if enabled. + */ + @ConfigItem + public Xds xds; + /** * The gRPC Server port. */ diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/Xds.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/Xds.java new file mode 100644 index 00000000000000..a5e62f7256a01a --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/Xds.java @@ -0,0 +1,23 @@ +package io.quarkus.grpc.runtime.config; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.quarkus.runtime.annotations.ConfigItem; + +/** + * XDS config + * * XDS usage + */ +@ConfigGroup +public class Xds { + /** + * Explicitly enable use of XDS. + */ + @ConfigItem(defaultValue = "false") + public boolean enabled; + + /** + * Use secure credentials. + */ + @ConfigItem(defaultValue = "false") + public boolean secure; +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/DevModeInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/DevModeInterceptor.java new file mode 100644 index 00000000000000..9ecf69d9967a4a --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/DevModeInterceptor.java @@ -0,0 +1,26 @@ +package io.quarkus.grpc.runtime.devmode; + +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; + +public class DevModeInterceptor implements ServerInterceptor { + private final ClassLoader classLoader; + + public DevModeInterceptor(ClassLoader contextClassLoader) { + classLoader = contextClassLoader; + } + + @Override + public ServerCall.Listener interceptCall(ServerCall serverCall, Metadata metadata, + ServerCallHandler next) { + ClassLoader originalTccl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(classLoader); + try { + return next.startCall(serverCall, metadata); + } finally { + Thread.currentThread().setContextClassLoader(originalTccl); + } + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcHotReplacementInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcHotReplacementInterceptor.java index 4dc416a9b982d6..a07246f45daa1f 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcHotReplacementInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devmode/GrpcHotReplacementInterceptor.java @@ -2,7 +2,12 @@ import java.util.function.Supplier; -public class GrpcHotReplacementInterceptor { +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; + +public class GrpcHotReplacementInterceptor implements ServerInterceptor { private static volatile Supplier interceptorAction; static void register(Supplier onCall) { @@ -12,4 +17,11 @@ static void register(Supplier onCall) { public static boolean fire() { return interceptorAction.get(); } + + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, + ServerCallHandler next) { + fire(); + return next.startCall(call, headers); + } } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java index 5cd0146cbe7b6f..947e0a1ef1f412 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java @@ -3,6 +3,7 @@ import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static io.grpc.netty.NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW; +import static io.quarkus.grpc.runtime.config.GrpcClientConfiguration.DNS; import java.io.IOException; import java.io.InputStream; @@ -33,6 +34,7 @@ import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.grpc.MethodDescriptor; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NegotiationType; @@ -50,6 +52,7 @@ import io.quarkus.grpc.runtime.config.GrpcServerConfiguration; import io.quarkus.grpc.runtime.config.SslClientConfig; import io.quarkus.grpc.runtime.stork.StorkMeasuringGrpcInterceptor; +import io.quarkus.grpc.spi.GrpcBuilderProvider; import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.util.ClassPathUtils; import io.smallrye.mutiny.infrastructure.Infrastructure; @@ -71,6 +74,7 @@ private Channels() { // Avoid direct instantiation } + @SuppressWarnings("rawtypes") public static Channel createChannel(String name, Set perClientInterceptors) throws Exception { InstanceHandle instance = Arc.container().instance(GrpcClientConfigProvider.class); @@ -92,6 +96,8 @@ public static Channel createChannel(String name, Set perClientIntercepto throw new IllegalStateException("gRPC client " + name + " is missing configuration."); } + GrpcBuilderProvider provider = GrpcBuilderProvider.findChannelBuilderProvider(config); + boolean vertxGrpc = config.useQuarkusGrpcClient; String host = config.host; @@ -101,10 +107,13 @@ public static Channel createChannel(String name, Set perClientIntercepto boolean stork = Stork.STORK.equalsIgnoreCase(nameResolver); String[] resolverSplit = nameResolver.split(":"); + String resolver = provider != null ? provider.resolver() : resolverSplit[0]; // TODO -- does this work for Vert.x gRPC client? - if (!vertxGrpc && GrpcClientConfiguration.DNS.equalsIgnoreCase(resolverSplit[0])) { - host = "/" + host; // dns name resolver needs triple slash at the beginning + if (provider != null) { + host = provider.adjustHost(host); + } else if (!vertxGrpc && DNS.equalsIgnoreCase(resolver)) { + host = "/" + host; // dns or xds name resolver needs triple slash at the beginning } // Client-side interceptors @@ -122,10 +131,11 @@ public static Channel createChannel(String name, Set perClientIntercepto } if (!vertxGrpc) { - String target = String.format("%s://%s:%d", resolverSplit[0], host, port); + String target = String.format("%s://%s:%d", resolver, host, port); + LOGGER.debugf("Target for client '%s': %s", name, target); SslContext context = null; - if (!plainText) { + if (!plainText && provider == null) { Path trustStorePath = config.ssl.trustStore.orElse(null); Path certificatePath = config.ssl.certificate.orElse(null); Path keyPath = config.ssl.key.orElse(null); @@ -152,20 +162,25 @@ public static Channel createChannel(String name, Set perClientIntercepto String loadBalancingPolicy = stork ? Stork.STORK : config.loadBalancingPolicy; - NettyChannelBuilder builder = NettyChannelBuilder - .forTarget(target) - // clients are intercepted using the IOThreadClientInterceptor interceptor which will decide on which - // thread the messages should be processed. - .directExecutor() // will use I/O thread - must not be blocked. - .offloadExecutor(Infrastructure.getDefaultExecutor()) - .defaultLoadBalancingPolicy(loadBalancingPolicy) - .flowControlWindow(config.flowControlWindow.orElse(DEFAULT_FLOW_CONTROL_WINDOW)) - .keepAliveWithoutCalls(config.keepAliveWithoutCalls) - .maxHedgedAttempts(config.maxHedgedAttempts) - .maxRetryAttempts(config.maxRetryAttempts) - .maxInboundMetadataSize(config.maxInboundMetadataSize.orElse(DEFAULT_MAX_HEADER_LIST_SIZE)) - .maxInboundMessageSize(config.maxInboundMessageSize.orElse(DEFAULT_MAX_MESSAGE_SIZE)) - .negotiationType(NegotiationType.valueOf(config.negotiationType.toUpperCase())); + ManagedChannelBuilder builder; + if (provider != null) { + builder = provider.createChannelBuilder(config, target); + } else { + builder = NettyChannelBuilder + .forTarget(target) + // clients are intercepted using the IOThreadClientInterceptor interceptor which will decide on which + // thread the messages should be processed. + .directExecutor() // will use I/O thread - must not be blocked. + .offloadExecutor(Infrastructure.getDefaultExecutor()) + .defaultLoadBalancingPolicy(loadBalancingPolicy) + .flowControlWindow(config.flowControlWindow.orElse(DEFAULT_FLOW_CONTROL_WINDOW)) + .keepAliveWithoutCalls(config.keepAliveWithoutCalls) + .maxHedgedAttempts(config.maxHedgedAttempts) + .maxRetryAttempts(config.maxRetryAttempts) + .maxInboundMetadataSize(config.maxInboundMetadataSize.orElse(DEFAULT_MAX_HEADER_LIST_SIZE)) + .maxInboundMessageSize(config.maxInboundMessageSize.orElse(DEFAULT_MAX_MESSAGE_SIZE)) + .negotiationType(NegotiationType.valueOf(config.negotiationType.toUpperCase())); + } if (config.retry) { builder.enableRetry(); @@ -203,17 +218,19 @@ public static Channel createChannel(String name, Set perClientIntercepto builder.keepAliveTimeout(idleTimeout.get().toMillis(), TimeUnit.MILLISECONDS); } - if (plainText) { + if (plainText && provider == null) { builder.usePlaintext(); } - if (context != null) { - builder.sslContext(context); + if (context != null && (builder instanceof NettyChannelBuilder)) { + NettyChannelBuilder ncBuilder = (NettyChannelBuilder) builder; + ncBuilder.sslContext(context); } interceptorContainer.getSortedPerServiceInterceptors(perClientInterceptors).forEach(builder::intercept); interceptorContainer.getSortedGlobalInterceptors().forEach(builder::intercept); - LOGGER.info("Creating Netty gRPC channel ..."); + LOGGER.info(String.format("Creating %s gRPC channel ...", + provider != null ? provider.channelInfo() : "Netty")); return builder.build(); } else { @@ -258,6 +275,7 @@ public static Channel createChannel(String name, Set perClientIntercepto Vertx vertx = Arc.container().instance(Vertx.class).get(); io.vertx.grpc.client.GrpcClient client = io.vertx.grpc.client.GrpcClient.client(vertx, options); Channel channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(port, host)); + LOGGER.debugf("Target for client '%s': %s", name, host + ":" + port); List interceptors = new ArrayList<>(); interceptors.addAll(interceptorContainer.getSortedPerServiceInterceptors(perClientInterceptors)); @@ -285,7 +303,7 @@ private static GrpcClientConfiguration testConfig(GrpcServerConfiguration server config.maxInboundMetadataSize = OptionalInt.empty(); config.maxRetryAttempts = 0; config.maxTraceEvents = OptionalInt.empty(); - config.nameResolver = GrpcClientConfiguration.DNS; + config.nameResolver = DNS; config.negotiationType = "PLAINTEXT"; config.overrideAuthority = Optional.empty(); config.perRpcBufferLimit = OptionalLong.empty(); diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/spi/GrpcBuilderProvider.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/spi/GrpcBuilderProvider.java new file mode 100644 index 00000000000000..ff5f9f8405f030 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/spi/GrpcBuilderProvider.java @@ -0,0 +1,176 @@ +package io.quarkus.grpc.spi; + +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.ServerInterceptor; +import io.grpc.ServerMethodDefinition; +import io.grpc.ServerServiceDefinition; +import io.quarkus.grpc.runtime.config.GrpcClientConfiguration; +import io.quarkus.grpc.runtime.config.GrpcServerConfiguration; +import io.quarkus.runtime.LaunchMode; +import io.quarkus.runtime.ShutdownContext; +import io.vertx.core.Vertx; + +/** + * Allow for additional types of gRPC server and channels to be used / built. + * + * This is an experimental SPI, subject to change. + */ +public interface GrpcBuilderProvider> { + + Logger log = LoggerFactory.getLogger(GrpcBuilderProvider.class); + + /** + * Find gRPC server builder provider. + * + * @param configuration the gRPC server configuration + * @return provider instance or null if none is provided + */ + @SuppressWarnings("rawtypes") + static GrpcBuilderProvider findServerBuilderProvider(GrpcServerConfiguration configuration) { + GrpcBuilderProvider provider = null; + ServiceLoader providers = ServiceLoader.load(GrpcBuilderProvider.class); + for (GrpcBuilderProvider p : providers) { + if (p.providesServer(configuration)) { + if (provider != null) { + throw new IllegalArgumentException("Too many GrpcBuilderProviders enabled: " + providers); + } + log.info("Found server GrpcBuilderProvider: {}", p); + provider = p; + } + } + return provider; + } + + /** + * Find gRPC client builder provider. + * + * @param configuration the gRPC client configuration + * @return provider instance or null if none is provided + */ + @SuppressWarnings("rawtypes") + static GrpcBuilderProvider findChannelBuilderProvider(GrpcClientConfiguration configuration) { + GrpcBuilderProvider provider = null; + ServiceLoader providers = ServiceLoader.load(GrpcBuilderProvider.class); + for (GrpcBuilderProvider p : providers) { + if (p.providesChannel(configuration)) { + if (provider != null) { + throw new IllegalArgumentException("Too many GrpcBuilderProviders enabled: " + providers); + } + log.info("Found channel GrpcBuilderProvider: {}", p); + provider = p; + } + } + return provider; + } + + /** + * Does this builder provider provide a new gRPC server instance. + * + * @param configuration the gRPC server configuration + * @return true if yes, false if no + */ + boolean providesServer(GrpcServerConfiguration configuration); + + /** + * Create initial server builder. + * + * @param vertx the Vertx instance + * @param configuration the gRPC server configuration + * @param launchMode current launch mode + * @return new ServerBuilder instance + */ + ServerBuilder createServerBuilder(Vertx vertx, GrpcServerConfiguration configuration, LaunchMode launchMode); + + /** + * Start gRPC server. + * + * @param server the server instance to start + * @throws Exception for any exception while starting the server + */ + void startServer(Server server) throws Exception; + + /** + * Post startup. + * + * @param server the started server + * @param shutdown the shutdown hook + */ + void postStartup(Server server, ShutdownContext shutdown); + + /** + * Handle dev mode reload. + * + * @param servicesWithInterceptors the services + * @param methods the methods + * @param globalInterceptors the global interceptors + * @param shutdown the shutdown hook + */ + void devModeReload(List servicesWithInterceptors, + Map> methods, + List globalInterceptors, ShutdownContext shutdown); + + /** + * Does a server instance already exist. + * + * @return true if a server instance already exists, false otherwise + */ + boolean serverAlreadyExists(); + + /** + * Provide server info. + * + * @return simple server info + */ + String serverInfo(); + + /** + * Does this builder provider provide a new gRPC channel instance. + * + * @param configuration the gRPC client configuration + * @return true if yes, false if no + */ + boolean providesChannel(GrpcClientConfiguration configuration); + + /** + * Get resolver. + * + * @return the resolver + */ + String resolver(); + + /** + * Adjust host, if needed. + * By default, no adjustment is made. + * + * @param host the host + * @return adjusted host, if needed + */ + default String adjustHost(String host) { + return host; + } + + /** + * Create initial channel builder. + * + * @param configuration the gRPC client configuration + * @param target the channel target + * @return new ChannelBuilder + */ + ManagedChannelBuilder createChannelBuilder(GrpcClientConfiguration configuration, String target); + + /** + * Provide channel info. + * + * @return simple channel info + */ + String channelInfo(); +} diff --git a/extensions/grpc/xds/pom.xml b/extensions/grpc/xds/pom.xml new file mode 100644 index 00000000000000..ae52996721126f --- /dev/null +++ b/extensions/grpc/xds/pom.xml @@ -0,0 +1,51 @@ + + + 4.0.0 + + quarkus-grpc-parent + io.quarkus + 999-SNAPSHOT + + + quarkus-grpc-xds + Quarkus - gRPC - xDS + gRPC xDS support + + + io.quarkus + quarkus-grpc + + + io.grpc + grpc-xds + + + commons-logging + commons-logging + + + com.google.auto.value + auto-value-annotations + + + com.google.android + annotations + + + com.google.code.findbugs + jsr305 + + + org.codehaus.mojo + animal-sniffer-annotations + + + org.checkerframework + checker-qual + + + + + diff --git a/extensions/grpc/xds/src/main/java/io/quarkus/grpc/xds/XdsGrpcServerBuilderProvider.java b/extensions/grpc/xds/src/main/java/io/quarkus/grpc/xds/XdsGrpcServerBuilderProvider.java new file mode 100644 index 00000000000000..7e5717a4065542 --- /dev/null +++ b/extensions/grpc/xds/src/main/java/io/quarkus/grpc/xds/XdsGrpcServerBuilderProvider.java @@ -0,0 +1,132 @@ +package io.quarkus.grpc.xds; + +import static io.quarkus.grpc.runtime.config.GrpcClientConfiguration.XDS; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + +import io.grpc.ChannelCredentials; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.InsecureServerCredentials; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.ServerCredentials; +import io.grpc.ServerInterceptor; +import io.grpc.ServerMethodDefinition; +import io.grpc.ServerServiceDefinition; +import io.grpc.xds.XdsChannelCredentials; +import io.grpc.xds.XdsServerBuilder; +import io.grpc.xds.XdsServerCredentials; +import io.quarkus.grpc.runtime.config.ClientXds; +import io.quarkus.grpc.runtime.config.GrpcClientConfiguration; +import io.quarkus.grpc.runtime.config.GrpcServerConfiguration; +import io.quarkus.grpc.runtime.config.Xds; +import io.quarkus.grpc.runtime.devmode.DevModeInterceptor; +import io.quarkus.grpc.runtime.devmode.GrpcHotReplacementInterceptor; +import io.quarkus.grpc.spi.GrpcBuilderProvider; +import io.quarkus.grpc.xds.devmode.XdsServerReloader; +import io.quarkus.runtime.LaunchMode; +import io.quarkus.runtime.ShutdownContext; +import io.vertx.core.Vertx; +import io.vertx.core.impl.EventLoopContext; +import io.vertx.core.impl.VertxInternal; + +public class XdsGrpcServerBuilderProvider implements GrpcBuilderProvider { + @Override + public boolean providesServer(GrpcServerConfiguration configuration) { + Xds xds = configuration.xds; + return xds != null && xds.enabled; + } + + @Override + public ServerBuilder createServerBuilder(Vertx vertx, GrpcServerConfiguration configuration, + LaunchMode launchMode) { + Xds xds = configuration.xds; + int port = launchMode == LaunchMode.TEST ? configuration.testPort : configuration.port; + ServerCredentials credentials = InsecureServerCredentials.create(); + if (xds.secure) { + credentials = XdsServerCredentials.create(credentials); + } + ServerBuilder builder = XdsServerBuilder.forPort(port, credentials); + // wrap with Vert.x context, so that the context interceptors work + VertxInternal vxi = (VertxInternal) vertx; + Executor delegate = vertx.nettyEventLoopGroup(); + EventLoopContext context = vxi.createEventLoopContext(); + Executor executor = command -> delegate.execute(() -> context.dispatch(command)); + builder.executor(executor); + // custom XDS interceptors + if (launchMode == LaunchMode.DEVELOPMENT) { + builder.intercept(new DevModeInterceptor(Thread.currentThread().getContextClassLoader())); + builder.intercept(new GrpcHotReplacementInterceptor()); + } + return builder; + } + + @Override + public void startServer(Server server) throws Exception { + server.start(); + } + + @Override + public void postStartup(Server server, ShutdownContext shutdown) { + XdsServerReloader.init(server); + shutdown.addShutdownTask(XdsServerReloader::reset); + } + + @Override + public void devModeReload(List servicesWithInterceptors, + Map> methods, List globalInterceptors, + ShutdownContext shutdown) { + XdsServerReloader.reinitialize(servicesWithInterceptors, methods, globalInterceptors); + shutdown.addShutdownTask(XdsServerReloader::reset); + } + + @Override + public boolean serverAlreadyExists() { + return XdsServerReloader.getServer() != null; + } + + @Override + public String serverInfo() { + return "xDS enabled"; + } + + @Override + public boolean providesChannel(GrpcClientConfiguration configuration) { + Xds xds = configuration.xds; + if (xds != null) { + return xds.enabled || XDS.equalsIgnoreCase(configuration.nameResolver); + } else { + return false; + } + } + + @Override + public String resolver() { + return XDS; + } + + @Override + public String adjustHost(String host) { + return "/" + host; + } + + @Override + public ManagedChannelBuilder createChannelBuilder(GrpcClientConfiguration configuration, String target) { + ClientXds xds = configuration.xds; + ChannelCredentials credentials = InsecureChannelCredentials.create(); + if (xds.secure) { + credentials = XdsChannelCredentials.create(credentials); + } + target = xds.target.orElse(target); // use xds's target, if explicitly set + return Grpc.newChannelBuilder(target, credentials); + } + + @Override + public String channelInfo() { + return "xDS"; + } +} diff --git a/extensions/grpc/xds/src/main/java/io/quarkus/grpc/xds/devmode/XdsServerReloader.java b/extensions/grpc/xds/src/main/java/io/quarkus/grpc/xds/devmode/XdsServerReloader.java new file mode 100644 index 00000000000000..2d095f77e88d3d --- /dev/null +++ b/extensions/grpc/xds/src/main/java/io/quarkus/grpc/xds/devmode/XdsServerReloader.java @@ -0,0 +1,43 @@ +package io.quarkus.grpc.xds.devmode; + +import java.util.List; +import java.util.Map; + +import io.grpc.Server; +import io.grpc.ServerInterceptor; +import io.grpc.ServerMethodDefinition; +import io.grpc.ServerServiceDefinition; + +// TODO +public class XdsServerReloader { + private static volatile Server server; + + public static Server getServer() { + return server; + } + + public static void init(Server grpcServer) { + server = grpcServer; + } + + public static void reset() { + shutdown(); + } + + public static void reinitialize(List serviceDefinitions, + Map> methods, + List sortedInterceptors) { + server = null; + } + + public static void shutdown() { + shutdown(server); + server = null; + } + + public static void shutdown(Server current) { + if (current != null) { + current.shutdownNow(); + } + } +} diff --git a/extensions/grpc/xds/src/main/resources/META-INF/services/io.quarkus.grpc.spi.GrpcBuilderProvider b/extensions/grpc/xds/src/main/resources/META-INF/services/io.quarkus.grpc.spi.GrpcBuilderProvider new file mode 100644 index 00000000000000..d54d69a8d823e9 --- /dev/null +++ b/extensions/grpc/xds/src/main/resources/META-INF/services/io.quarkus.grpc.spi.GrpcBuilderProvider @@ -0,0 +1 @@ +io.quarkus.grpc.xds.XdsGrpcServerBuilderProvider \ No newline at end of file diff --git a/integration-tests/istio/disable-native-profile b/integration-tests/istio/disable-native-profile new file mode 100644 index 00000000000000..011a7cc4571d5e --- /dev/null +++ b/integration-tests/istio/disable-native-profile @@ -0,0 +1 @@ +This file disables the native profile in the parent pom.xml of this module. \ No newline at end of file diff --git a/integration-tests/istio/maven-invoker-way/pom.xml b/integration-tests/istio/maven-invoker-way/pom.xml new file mode 100644 index 00000000000000..a449d28ccf71d9 --- /dev/null +++ b/integration-tests/istio/maven-invoker-way/pom.xml @@ -0,0 +1,345 @@ + + + 4.0.0 + + + io.quarkus + quarkus-integration-test-istio-parent + 999-SNAPSHOT + + + quarkus-integration-test-istio-invoker + Quarkus - Integration Tests - Istio - Invoker + Istio integration tests that need to use the maven invoker because they test various dependency related scenarios + + + ${skipTests} + + + + + + io.quarkus + quarkus-container-image-docker + test + + + io.quarkus + quarkus-container-image-jib + test + + + + io.quarkus + quarkus-test-maven + test + + + io.quarkus + quarkus-kubernetes + test + + + io.quarkus + quarkus-openshift + test + + + io.quarkus + quarkus-minikube + test + + + io.fabric8 + kubernetes-httpclient-okhttp + test + + + io.dekorate + kubernetes-annotations + noapt + + + io.dekorate + openshift-annotations + noapt + + + io.dekorate + knative-annotations + noapt + + + io.quarkus + quarkus-resteasy + ${project.version} + test + + + * + * + + + + + + + io.quarkus + quarkus-maven-plugin + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-kubernetes-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-openshift-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-minikube-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-container-image-docker-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-container-image-jib-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-junit5 + ${project.version} + pom + test + + + * + * + + + + + + + + src/it + true + + + + + maven-invoker-plugin + + + integration-tests + + install + run + verify + + + + + ${project.build.directory}/it + true + verify + true + true + invoker.properties + + + + maven-failsafe-plugin + + + + integration-test + verify + + + + + + + + + + disable-tests + + + !e2e-tests + + + + true + true + + + + kubernetes-e2e-tests + + + kubernetes-e2e-tests + + + + true + + + + + maven-invoker-plugin + + + xds-*/pom.xml + + ${skipTests} + + + -Dinvalid-on-purpose + success + + + + + + + + openshift-e2e-tests + + + openshift-e2e-tests + + + + true + + + + + maven-invoker-plugin + + + xds-*/pom.xml + + ${skipTests} + + + -Dinvalid-on-purpose + success + + + + + + + + knative-e2e-tests + + + knative-e2e-tests + + + + true + + + + + maven-invoker-plugin + + + xds-*/pom.xml + + ${skipTests} + + + -Dinvalid-on-purpose + success + + + + + + + + + + windows + + + + true + + + + diff --git a/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/invoker.properties b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/invoker.properties new file mode 100644 index 00000000000000..ecfda713b3e099 --- /dev/null +++ b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/invoker.properties @@ -0,0 +1,5 @@ +# invoker.goals=clean package -Dquarkus.container.build=true -Dquarkus.package.type=native +# ensure that an attempt to deploy is made, but that the attempt fails (as we don't want to deploy this test application to a cluster that the runner of test may have configured) +invoker.goals=clean package -Dquarkus.kubernetes.deploy=true ${kubernetes-client-master-url} +# expect a failure since there is no Kubernetes cluster to deploy to +invoker.buildResult = ${build-result} diff --git a/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/pom.xml b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/pom.xml new file mode 100644 index 00000000000000..844e6b1e012003 --- /dev/null +++ b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/pom.xml @@ -0,0 +1,124 @@ + + + 4.0.0 + org.acme + xds-grpc + 0.1-SNAPSHOT + + UTF-8 + 3.0.0-M7 + 11 + UTF-8 + 11 + + + + + io.quarkus + quarkus-bom + @project.version@ + pom + import + + + + + + io.quarkus + quarkus-grpc-xds + + + io.quarkus + quarkus-resteasy + + + io.quarkus + quarkus-minikube + + + io.quarkus + quarkus-container-image-jib + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + + + + io.quarkus + quarkus-maven-plugin + @project.version@ + + + + generate-code + build + + + + + + maven-surefire-plugin + ${surefire-plugin.version} + + + org.jboss.logmanager.LogManager + ${maven.home} + + + + + + + + native + + + native + + + + native + + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${native.surefire.skip} + + + + maven-failsafe-plugin + ${surefire-plugin.version} + + + + integration-test + verify + + + + ${project.build.directory}/${project.build.finalName}-runner + org.jboss.logmanager.LogManager + ${maven.home} + + + + + + + + + + diff --git a/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/java/org/acme/HelloEndpoint.java b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/java/org/acme/HelloEndpoint.java new file mode 100644 index 00000000000000..32d57f630205b2 --- /dev/null +++ b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/java/org/acme/HelloEndpoint.java @@ -0,0 +1,37 @@ +package org.acme; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import examples.GreeterGrpc; +import examples.HelloReply; +import examples.HelloRequest; +import io.grpc.StatusRuntimeException; +import io.quarkus.grpc.GrpcClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Path("/hello") +public class HelloEndpoint { + private static final Logger log = LoggerFactory.getLogger(HelloEndpoint.class); + + @GrpcClient + GreeterGrpc.GreeterBlockingStub stub; + + @GET + @Produces(MediaType.TEXT_PLAIN) + public String hello() { + HelloRequest request = HelloRequest.newBuilder().setName("XDS gRPC").build(); + HelloReply response; + try { + response = stub.sayHello(request); + } catch (StatusRuntimeException e) { + String msg = "RPC failed: " + e.getStatus(); + log.warn(msg); + return msg; + } + return response.getMessage(); + } +} \ No newline at end of file diff --git a/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/java/org/acme/HelloService.java b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/java/org/acme/HelloService.java new file mode 100644 index 00000000000000..4f86355a826ffa --- /dev/null +++ b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/java/org/acme/HelloService.java @@ -0,0 +1,16 @@ +package org.acme; + +import examples.GreeterGrpc; +import examples.HelloReply; +import examples.HelloRequest; +import io.grpc.stub.StreamObserver; +import io.quarkus.grpc.GrpcService; + +@GrpcService +public class HelloService extends GreeterGrpc.GreeterImplBase { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } +} diff --git a/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/proto/helloworld.proto b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/proto/helloworld.proto new file mode 100644 index 00000000000000..5e400c9d4549ca --- /dev/null +++ b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/proto/helloworld.proto @@ -0,0 +1,53 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "examples"; +option java_outer_classname = "HelloWorldProto"; +option objc_class_prefix = "HLW"; + +package helloworld; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/resources/application.properties b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/resources/application.properties new file mode 100644 index 00000000000000..0230b4c1ae18dc --- /dev/null +++ b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/src/main/resources/application.properties @@ -0,0 +1,18 @@ +# Configuration file +# key = value +quarkus.container-image.registry=docker.io +quarkus.kubernetes-client.trust-certs=true + +# XDS +quarkus.grpc.server.xds.enabled=true +quarkus.grpc.server.port=30051 + +quarkus.grpc.clients.stub.name-resolver=xds +quarkus.grpc.clients.stub.host=xds-grpc +quarkus.grpc.clients.stub.port=30051 + +# K8s +quarkus.kubernetes.deployment-target=minikube +quarkus.kubernetes.ports.grpc.container-port=30051 +quarkus.kubernetes.annotations."inject.istio.io/templates"=grpc-agent +quarkus.kubernetes.annotations."proxy.istio.io/config"={"holdApplicationUntilProxyStarts": true} diff --git a/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/verify.groovy b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/verify.groovy new file mode 100644 index 00000000000000..a350ee928eb6c1 --- /dev/null +++ b/integration-tests/istio/maven-invoker-way/src/it/xds-grpc/verify.groovy @@ -0,0 +1,63 @@ +import io.dekorate.utils.Serialization +import io.fabric8.kubernetes.api.model.KubernetesList +import io.fabric8.kubernetes.api.model.apps.Deployment +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.KubernetesClientBuilder +import io.fabric8.kubernetes.client.LocalPortForward +import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory + +//Check that file exits +String base = basedir +File kubernetesYml = new File(base, "target/kubernetes/minikube.yml") +assert kubernetesYml.exists() + +// Workaround CNFE at shutdown +this.class.getClassLoader().loadClass("io.fabric8.kubernetes.client.KubernetesClientException\$RequestMetadata") + +kubernetesYml.withInputStream { stream -> + //Check that its parse-able + KubernetesList list = Serialization.unmarshalAsList(stream) + assert list != null + + Deployment deployment = list.items.find { r -> r.kind == "Deployment" } + + //Check that ti contains a Deployment named after the project + assert deployment != null + assert deployment.metadata.name == "xds-grpc" + + try (KubernetesClient client = new KubernetesClientBuilder() + .withHttpClientFactory(new OkHttpClientFactory()) + .build()) { + try (LocalPortForward p = client.services().withName("xds-grpc").portForward(8080)) { + URL url = new URL(String.format("http://localhost:%s/hello", p.localPort)); + + int tries = 10; + String response = null; + while (response == null && tries > 0) { + try { + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + int responseCode = connection.getResponseCode(); + if (responseCode == HttpURLConnection.HTTP_OK) { + try (BufferedReader input = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { + String line; + StringBuffer buffer = new StringBuffer(); + while ((line = input.readLine()) != null) { + buffer.append(line); + } + response = buffer.toString(); + } + break; + } + } catch (Exception e) { + System.err.print("Error: ") + System.err.println(e.getMessage()); + } + Thread.sleep(6_000); // 6sec + tries--; + } + assert tries > 0 + assert response != null && response == "Hello XDS gRPC" + } + } +} \ No newline at end of file diff --git a/integration-tests/istio/pom.xml b/integration-tests/istio/pom.xml new file mode 100644 index 00000000000000..dfae51b7a32418 --- /dev/null +++ b/integration-tests/istio/pom.xml @@ -0,0 +1,19 @@ + + + 4.0.0 + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + + quarkus-integration-test-istio-parent + Quarkus - Integration Tests - Istio - Parent + pom + + + maven-invoker-way + + + diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 559d728b5ef371..500c970a53a1f5 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -341,6 +341,7 @@ grpc-stork-response-time google-cloud-functions-http google-cloud-functions + istio