From ad9d45ac491c98f77530dc74a0296f52ea35ebd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Szynkiewicz?= Date: Mon, 16 Aug 2021 22:55:14 +0200 Subject: [PATCH] gRPC: support per service interceptors fixes #19229 --- .../asciidoc/grpc-service-implementation.adoc | 22 ++- .../AdditionalGlobalInterceptorBuildItem.java | 15 ++ .../DelegatingGrpcBeanBuildItem.java | 15 ++ .../quarkus/grpc/deployment/GrpcDotNames.java | 9 + .../grpc/deployment/GrpcServerProcessor.java | 162 +++++++++++++++- .../devmode/DevModeTestInterceptor.java | 4 +- .../GlobalAndServiceInterceptorsTest.java | 178 ++++++++++++++++++ .../interceptors/MyFirstInterceptor.java | 2 + .../interceptors/MySecondInterceptor.java | 2 + .../io/quarkus/grpc/GlobalInterceptor.java | 19 ++ .../io/quarkus/grpc/RegisterInterceptor.java | 22 +++ .../io/quarkus/grpc/RegisterInterceptors.java | 14 ++ .../quarkus/grpc/runtime/GrpcContainer.java | 82 ++++++-- .../grpc/runtime/GrpcServerRecorder.java | 16 +- .../grpc/runtime/InterceptorStorage.java | 33 ++++ .../metrics/GrpcMetricsServerInterceptor.java | 2 + .../GrpcRequestContextGrpcInterceptor.java | 2 + .../RestClientReactiveProcessor.java | 2 +- .../interceptors/HeaderServerInterceptor.java | 4 +- .../examples/hello/HelloWorldService.java | 2 + 20 files changed, 567 insertions(+), 40 deletions(-) create mode 100644 extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/AdditionalGlobalInterceptorBuildItem.java create mode 100644 extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/DelegatingGrpcBeanBuildItem.java create mode 100644 extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/GlobalAndServiceInterceptorsTest.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/GlobalInterceptor.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/RegisterInterceptor.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/RegisterInterceptors.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/InterceptorStorage.java diff --git a/docs/src/main/asciidoc/grpc-service-implementation.adoc b/docs/src/main/asciidoc/grpc-service-implementation.adoc index 29e96478b25e4..52c1dd29e8686 100644 --- a/docs/src/main/asciidoc/grpc-service-implementation.adoc +++ b/docs/src/main/asciidoc/grpc-service-implementation.adoc @@ -243,11 +243,14 @@ quarkus.grpc.server.ssl.client-auth=REQUIRED == Server Interceptors -You can implement a gRPC server interceptor by implementing an `@ApplicationScoped` bean implementing `io.grpc.ServerInterceptor`: +gRPC server interceptors let you perform logic, such as authentication, before your service is invoked. + +You can implement a gRPC server interceptor by creating an `@ApplicationScoped` bean implementing `io.grpc.ServerInterceptor`: [source, java] ---- @ApplicationScoped +// add @GlobalInterceptor for interceptors meant to be invoked for every service public class MyInterceptor implements ServerInterceptor { @Override @@ -260,7 +263,22 @@ public class MyInterceptor implements ServerInterceptor { TIP: Check the https://grpc.github.io/grpc-java/javadoc/io/grpc/ServerInterceptor.html[ServerInterceptor JavaDoc] to properly implement your interceptor. -When you have multiple server interceptors, you can order them by implementing the `javax.enterprise.inject.spi.Prioritized` interface: +To apply an interceptor to all exposed services, annotate it with `@io.quarkus.grpc.GlobalInterceptor`. +To apply an interceptor to a single service, register it on the service with `@io.quarkus.grpc.RegisterInterceptor`: +[source, java] +---- +import io.quarkus.grpc.GrpcService; +import io.quarkus.grpc.RegisterInterceptor; + +@GrpcService +@RegisterInterceptor(MyInterceptor.class) +public class StreamingService implements Streaming { + // ... +} +---- + +When you have multiple server interceptors, you can order them by implementing the `javax.enterprise.inject.spi.Prioritized` interface. Please note that all the global interceptors are invoked before the service-specific +interceptors. [source, java] ---- diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/AdditionalGlobalInterceptorBuildItem.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/AdditionalGlobalInterceptorBuildItem.java new file mode 100644 index 0000000000000..80af3a6273e8d --- /dev/null +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/AdditionalGlobalInterceptorBuildItem.java @@ -0,0 +1,15 @@ +package io.quarkus.grpc.deployment; + +import io.quarkus.builder.item.MultiBuildItem; + +public final class AdditionalGlobalInterceptorBuildItem extends MultiBuildItem { + private final String interceptorClass; + + public AdditionalGlobalInterceptorBuildItem(String interceptorClass) { + this.interceptorClass = interceptorClass; + } + + public String interceptorClass() { + return interceptorClass; + } +} diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/DelegatingGrpcBeanBuildItem.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/DelegatingGrpcBeanBuildItem.java new file mode 100644 index 0000000000000..2ae80b07ba62e --- /dev/null +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/DelegatingGrpcBeanBuildItem.java @@ -0,0 +1,15 @@ +package io.quarkus.grpc.deployment; + +import org.jboss.jandex.ClassInfo; + +import io.quarkus.builder.item.MultiBuildItem; + +final class DelegatingGrpcBeanBuildItem extends MultiBuildItem { + final ClassInfo generatedBean; + final ClassInfo userDefinedBean; + + DelegatingGrpcBeanBuildItem(ClassInfo generatedBean, ClassInfo userDefinedBean) { + this.generatedBean = generatedBean; + this.userDefinedBean = userDefinedBean; + } +} diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java index 444c0af34bd82..d421883b9695b 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java @@ -6,11 +6,15 @@ import io.grpc.BindableService; import io.grpc.Channel; +import io.grpc.ServerInterceptor; import io.grpc.stub.AbstractBlockingStub; import io.grpc.stub.AbstractStub; import io.quarkus.gizmo.MethodDescriptor; +import io.quarkus.grpc.GlobalInterceptor; import io.quarkus.grpc.GrpcClient; import io.quarkus.grpc.GrpcService; +import io.quarkus.grpc.RegisterInterceptor; +import io.quarkus.grpc.RegisterInterceptors; import io.quarkus.grpc.runtime.MutinyBean; import io.quarkus.grpc.runtime.MutinyClient; import io.quarkus.grpc.runtime.MutinyGrpc; @@ -38,6 +42,11 @@ public class GrpcDotNames { public static final DotName MUTINY_BEAN = DotName.createSimple(MutinyBean.class.getName()); public static final DotName MUTINY_SERVICE = DotName.createSimple(MutinyService.class.getName()); + public static final DotName GLOBAL_INTERCEPTOR = DotName.createSimple(GlobalInterceptor.class.getName()); + public static final DotName REGISTER_INTERCEPTOR = DotName.createSimple(RegisterInterceptor.class.getName()); + public static final DotName REGISTER_INTERCEPTORS = DotName.createSimple(RegisterInterceptors.class.getName()); + public static final DotName SERVER_INTERCEPTOR = DotName.createSimple(ServerInterceptor.class.getName()); + static final MethodDescriptor CREATE_CHANNEL_METHOD = MethodDescriptor.ofMethod(Channels.class, "createChannel", Channel.class, String.class); static final MethodDescriptor RETRIEVE_CHANNEL_METHOD = MethodDescriptor.ofMethod(Channels.class, "retrieveChannel", diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java index a87294e34fabb..f835c9b3dce28 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcServerProcessor.java @@ -1,9 +1,14 @@ package io.quarkus.grpc.deployment; import static io.quarkus.deployment.Feature.GRPC_SERVER; +import static io.quarkus.grpc.deployment.GrpcDotNames.GLOBAL_INTERCEPTOR; +import static io.quarkus.grpc.deployment.GrpcDotNames.REGISTER_INTERCEPTOR; +import static io.quarkus.grpc.deployment.GrpcDotNames.REGISTER_INTERCEPTORS; import static java.util.Arrays.asList; import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -14,6 +19,8 @@ import java.util.Set; import java.util.function.Predicate; +import javax.inject.Singleton; + import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.jandex.AnnotationInstance; @@ -21,6 +28,7 @@ import org.jboss.jandex.ClassInfo; import org.jboss.jandex.DotName; import org.jboss.jandex.FieldInfo; +import org.jboss.jandex.IndexView; import org.jboss.jandex.MethodInfo; import org.jboss.jandex.Type; import org.jboss.logging.Logger; @@ -29,7 +37,10 @@ import io.quarkus.arc.deployment.AdditionalBeanBuildItem; import io.quarkus.arc.deployment.AnnotationsTransformerBuildItem; import io.quarkus.arc.deployment.CustomScopeAnnotationsBuildItem; +import io.quarkus.arc.deployment.GeneratedBeanBuildItem; +import io.quarkus.arc.deployment.GeneratedBeanGizmoAdaptor; import io.quarkus.arc.deployment.SyntheticBeansRuntimeInitBuildItem; +import io.quarkus.arc.deployment.UnremovableBeanBuildItem; import io.quarkus.arc.deployment.ValidationPhaseBuildItem; import io.quarkus.arc.processor.AnnotationsTransformer; import io.quarkus.arc.processor.BeanInfo; @@ -50,11 +61,15 @@ import io.quarkus.deployment.builditem.ShutdownContextBuildItem; import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; import io.quarkus.deployment.metrics.MetricsCapabilityBuildItem; +import io.quarkus.gizmo.ClassCreator; +import io.quarkus.gizmo.MethodCreator; +import io.quarkus.gizmo.MethodDescriptor; import io.quarkus.grpc.GrpcService; import io.quarkus.grpc.deployment.devmode.FieldDefinalizingVisitor; import io.quarkus.grpc.protoc.plugin.MutinyGrpcGenerator; import io.quarkus.grpc.runtime.GrpcContainer; import io.quarkus.grpc.runtime.GrpcServerRecorder; +import io.quarkus.grpc.runtime.InterceptorStorage; import io.quarkus.grpc.runtime.config.GrpcConfiguration; import io.quarkus.grpc.runtime.config.GrpcServerBuildTimeConfig; import io.quarkus.grpc.runtime.health.GrpcHealthEndpoint; @@ -71,13 +86,14 @@ public class GrpcServerProcessor { private static final Set BLOCKING_SKIPPED_METHODS = Set.of("bindService", "", "withCompression"); - private static final Logger logger = Logger.getLogger(GrpcServerProcessor.class); + private static final Logger log = Logger.getLogger(GrpcServerProcessor.class); private static final String SSL_PREFIX = "quarkus.grpc.server.ssl."; private static final String CERTIFICATE = SSL_PREFIX + "certificate"; private static final String KEY = SSL_PREFIX + "key"; private static final String KEY_STORE = SSL_PREFIX + "key-store"; private static final String TRUST_STORE = SSL_PREFIX + "trust-store"; + private static final String METRICS_SERVER_INTERCEPTOR = "io.quarkus.grpc.runtime.metrics.GrpcMetricsServerInterceptor"; @BuildStep MinNettyAllocatorMaxOrderBuildItem setMinimalNettyMaxOrderSize() { @@ -86,7 +102,8 @@ MinNettyAllocatorMaxOrderBuildItem setMinimalNettyMaxOrderSize() { @BuildStep void processGeneratedBeans(CombinedIndexBuildItem index, BuildProducer transformers, - BuildProducer bindables) { + BuildProducer bindables, + BuildProducer delegatingBeans) { // generated bean class -> blocking methods Map> generatedBeans = new HashMap<>(); @@ -130,7 +147,7 @@ void processGeneratedBeans(CombinedIndexBuildItem index, BuildProducer blockingMethods = gatherBlockingMethods(userDefinedBean); generatedBeans.put(generatedBean.name(), blockingMethods); @@ -307,8 +325,121 @@ void registerBeans(BuildProducer beans, beans.produce(AdditionalBeanBuildItem.unremovableOf(GrpcRequestContextGrpcInterceptor.class)); features.produce(new FeatureBuildItem(GRPC_SERVER)); } else { - logger.debug("Unable to find beans exposing the `BindableService` interface - not starting the gRPC server"); + log.debug("Unable to find beans exposing the `BindableService` interface - not starting the gRPC server"); + } + } + + @BuildStep + void registerAdditionalInterceptors(BuildProducer additionalInterceptors) { + additionalInterceptors + .produce(new AdditionalGlobalInterceptorBuildItem(GrpcRequestContextGrpcInterceptor.class.getName())); + } + + @BuildStep + void gatherGrpcInterceptors(CombinedIndexBuildItem indexBuildItem, + List additionalGlobalInterceptors, + List delegatingGrpcBeans, + BuildProducer generatedBeans, + BuildProducer unremovableBeans) { + + Map delegateMap = new HashMap<>(); + for (DelegatingGrpcBeanBuildItem delegatingGrpcBean : delegatingGrpcBeans) { + delegateMap.put(delegatingGrpcBean.userDefinedBean.name().toString(), + delegatingGrpcBean.generatedBean.name().toString()); + } + + IndexView index = indexBuildItem.getIndex(); + + GrpcInterceptors interceptors = gatherInterceptors(index); + + // let's gather all the non-abstract, non-global interceptors, from these we'll filter out ones used per-service ones + // the rest, if anything stays, should be logged as problematic + Set superfluousInterceptors = new HashSet<>(interceptors.nonGlobalInterceptors); + + Map> annotationsByClassName = new HashMap<>(); + + for (AnnotationInstance annotation : index.getAnnotations(REGISTER_INTERCEPTOR)) { + String targetClass = annotation.target().asClass().name().toString(); + annotationsByClassName.computeIfAbsent(targetClass, key -> new ArrayList<>()) + .add(annotation); + } + + for (AnnotationInstance annotation : index.getAnnotations(REGISTER_INTERCEPTORS)) { + String targetClass = annotation.target().asClass().name().toString(); + annotationsByClassName.computeIfAbsent(targetClass, key -> new ArrayList<>()) + .addAll(Arrays.asList(annotation.value().asNestedArray())); + } + + String perServiceInterceptorsImpl = InterceptorStorage.class.getName() + "Impl"; + try (ClassCreator classCreator = ClassCreator.builder() + .className(perServiceInterceptorsImpl) + .classOutput(new GeneratedBeanGizmoAdaptor(generatedBeans)) + .superClass(InterceptorStorage.class) + .build()) { + + classCreator.addAnnotation(Singleton.class.getName()); + MethodCreator constructor = classCreator + .getMethodCreator(MethodDescriptor.ofConstructor(perServiceInterceptorsImpl)); + constructor.invokeSpecialMethod(MethodDescriptor.ofConstructor(InterceptorStorage.class), + constructor.getThis()); + + for (Map.Entry> annotationsForClass : annotationsByClassName.entrySet()) { + for (AnnotationInstance value : annotationsForClass.getValue()) { + String className = annotationsForClass.getKey(); + + // if the user bean is invoked by a generated bean + // the interceptors defined on the user bean have to be applied to the generated bean: + className = delegateMap.getOrDefault(className, className); + + String interceptorClassName = value.value().asString(); + superfluousInterceptors.remove(interceptorClassName); + + constructor.invokeVirtualMethod( + MethodDescriptor.ofMethod(InterceptorStorage.class, "addInterceptor", void.class, + String.class, Class.class), + constructor.getThis(), constructor.load(className), constructor.loadClass(interceptorClassName)); + } + } + + for (String globalInterceptor : interceptors.globalInterceptors) { + constructor.invokeVirtualMethod( + MethodDescriptor.ofMethod(InterceptorStorage.class, "addGlobalInterceptor", void.class, Class.class), + constructor.getThis(), constructor.loadClass(globalInterceptor)); + } + + for (AdditionalGlobalInterceptorBuildItem globalInterceptorBuildItem : additionalGlobalInterceptors) { + constructor.invokeVirtualMethod( + MethodDescriptor.ofMethod(InterceptorStorage.class, "addGlobalInterceptor", void.class, Class.class), + constructor.getThis(), constructor.loadClass(globalInterceptorBuildItem.interceptorClass())); + } + + constructor.returnValue(null); + } + + if (!superfluousInterceptors.isEmpty()) { + log.warnf("At least one unused gRPC interceptor found: %s. If there are meant to be used globally, " + + "annotate them with @GlobalInterceptor.", String.join(", ", superfluousInterceptors)); } + + unremovableBeans.produce(UnremovableBeanBuildItem.beanClassNames(perServiceInterceptorsImpl)); + } + + private GrpcInterceptors gatherInterceptors(IndexView index) { + Set globalInterceptors = new HashSet<>(); + Set nonGlobalInterceptors = new HashSet<>(); + + Collection interceptorImplClasses = index.getAllKnownImplementors(GrpcDotNames.SERVER_INTERCEPTOR); + for (ClassInfo interceptorImplClass : interceptorImplClasses) { + if (!Modifier.isAbstract(interceptorImplClass.flags()) + && !Modifier.isInterface(interceptorImplClass.flags())) { + if (interceptorImplClass.classAnnotation(GLOBAL_INTERCEPTOR) == null) { + nonGlobalInterceptors.add(interceptorImplClass.name().toString()); + } else { + globalInterceptors.add(interceptorImplClass.name().toString()); + } + } + } + return new GrpcInterceptors(globalInterceptors, nonGlobalInterceptors); } @BuildStep @@ -379,18 +510,31 @@ ExtensionSslNativeSupportBuildItem extensionSslNativeSupport() { @BuildStep void configureMetrics(GrpcBuildTimeConfig configuration, Optional metricsCapability, - BuildProducer beans) { + BuildProducer beans, + BuildProducer additionalInterceptors) { - // Note that this build steps confgures both the server side and the client side + // Note that this build steps configures both the server side and the client side if (configuration.metricsEnabled && metricsCapability.isPresent()) { if (metricsCapability.get().metricsSupported(MetricsFactory.MICROMETER)) { // Strings are used intentionally - micrometer-core is an optional dependency of the runtime module - beans.produce(new AdditionalBeanBuildItem("io.quarkus.grpc.runtime.metrics.GrpcMetricsServerInterceptor", + beans.produce(new AdditionalBeanBuildItem(METRICS_SERVER_INTERCEPTOR, "io.quarkus.grpc.runtime.metrics.GrpcMetricsClientInterceptor")); + additionalInterceptors + .produce(new AdditionalGlobalInterceptorBuildItem(METRICS_SERVER_INTERCEPTOR)); } else { - logger.warn("Only Micrometer-based metrics system is supported by quarkus-grpc"); + log.warn("Only Micrometer-based metrics system is supported by quarkus-grpc"); } } } + private static class GrpcInterceptors { + final Set globalInterceptors; + final Set nonGlobalInterceptors; + + GrpcInterceptors(Set globalInterceptors, Set nonGlobalInterceptors) { + this.globalInterceptors = globalInterceptors; + this.nonGlobalInterceptors = nonGlobalInterceptors; + } + } + } diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/devmode/DevModeTestInterceptor.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/devmode/DevModeTestInterceptor.java index 306fee7164db1..392e8861c3e61 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/devmode/DevModeTestInterceptor.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/devmode/DevModeTestInterceptor.java @@ -7,8 +7,10 @@ import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; +import io.quarkus.grpc.GlobalInterceptor; @ApplicationScoped +@GlobalInterceptor public class DevModeTestInterceptor implements ServerInterceptor { private volatile String lastStatus = "initial"; @@ -17,7 +19,7 @@ public class DevModeTestInterceptor implements ServerInterceptor { public ServerCall.Listener interceptCall(ServerCall serverCall, Metadata metadata, ServerCallHandler serverCallHandler) { return serverCallHandler - .startCall(new ForwardingServerCall.SimpleForwardingServerCall(serverCall) { + .startCall(new ForwardingServerCall.SimpleForwardingServerCall<>(serverCall) { @Override protected ServerCall delegate() { lastStatus = getStatus(); diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/GlobalAndServiceInterceptorsTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/GlobalAndServiceInterceptorsTest.java new file mode 100644 index 0000000000000..bb446a1ca3e06 --- /dev/null +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/GlobalAndServiceInterceptorsTest.java @@ -0,0 +1,178 @@ +package io.quarkus.grpc.server.interceptors; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; + +import javax.enterprise.context.ApplicationScoped; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.examples.goodbyeworld.Farewell; +import io.grpc.examples.goodbyeworld.FarewellGrpc; +import io.grpc.examples.goodbyeworld.GoodbyeReply; +import io.grpc.examples.goodbyeworld.GoodbyeRequest; +import io.grpc.examples.helloworld.Greeter; +import io.grpc.examples.helloworld.GreeterBean; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.examples.helloworld3.Greeter3; +import io.grpc.examples.helloworld3.Greeter3Grpc; +import io.grpc.examples.helloworld3.HelloReply3; +import io.grpc.examples.helloworld3.HelloRequest3; +import io.grpc.stub.StreamObserver; +import io.quarkus.grpc.GrpcClient; +import io.quarkus.grpc.GrpcService; +import io.quarkus.grpc.RegisterInterceptor; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.mutiny.Uni; + +public class GlobalAndServiceInterceptorsTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest().setArchiveProducer( + () -> ShrinkWrap.create(JavaArchive.class) + .addPackage(GreeterGrpc.class.getPackage()) + .addPackage(Greeter3Grpc.class.getPackage()) + .addPackage(FarewellGrpc.class.getPackage()) + .addClasses(MyFirstInterceptor.class, GreeterBean.class, HelloRequest.class)); + + protected ManagedChannel channel; + + @GrpcClient + Greeter greeter; + + @GrpcClient + Greeter3 greeter3; + + @GrpcClient + Farewell farewell; + + @BeforeEach + void cleanUp() { + config.getLogRecords(); + GlobalInterceptor.invoked = false; + ServiceBInterceptor.invoked = false; + FarewellInterceptor.invoked = false; + } + + @Test + void shouldInvokeGlobalInterceptorAndNotInvokedUnregisteredLocal() { + Uni result = greeter.sayHello(HelloRequest.newBuilder().setName("ServiceA").build()); + + HelloReply helloReply = result.await().atMost(Duration.ofSeconds(5)); + assertThat(helloReply.getMessage()).isEqualTo("Hello, ServiceA"); + + assertThat(GlobalInterceptor.invoked).isTrue(); + assertThat(ServiceBInterceptor.invoked).isFalse(); + assertThat(FarewellInterceptor.invoked).isFalse(); + } + + @Test + void shouldInvokeGlobalInterceptorAndInvokedRegisteredLocal() { + Uni result = greeter3.sayHello(HelloRequest3.newBuilder().setName("ServiceB").build()); + + HelloReply3 helloReply = result.await().atMost(Duration.ofSeconds(5)); + assertThat(helloReply.getMessage()).isEqualTo("Hello3, ServiceB"); + + assertThat(GlobalInterceptor.invoked).isTrue(); + assertThat(ServiceBInterceptor.invoked).isTrue(); + assertThat(FarewellInterceptor.invoked).isFalse(); + } + + @Test + void shouldInvokeGlobalInterceptorAndInvokedRegisteredLocalOnGrpcStub() { + Uni result = farewell.sayGoodbye(GoodbyeRequest.newBuilder().setName("Farewell").build()); + + GoodbyeReply goodbyeReply = result.await().atMost(Duration.ofSeconds(5)); + assertThat(goodbyeReply.getMessage()).isEqualTo("Goodbye, Farewell"); + + assertThat(GlobalInterceptor.invoked).isTrue(); + assertThat(ServiceBInterceptor.invoked).isFalse(); + assertThat(FarewellInterceptor.invoked).isTrue(); + } + + @GrpcService + public static class ServiceA implements Greeter { + @Override + public Uni sayHello(HelloRequest request) { + return Uni.createFrom().item(HelloReply.newBuilder().setMessage("Hello, " + request.getName()).build()); + } + } + + @GrpcService + @RegisterInterceptor(ServiceBInterceptor.class) + public static class ServiceB implements Greeter3 { + @Override + public Uni sayHello(HelloRequest3 request) { + return Uni.createFrom().item(HelloReply3.newBuilder().setMessage("Hello3, " + request.getName()).build()); + } + } + + @io.quarkus.grpc.GlobalInterceptor + @ApplicationScoped + public static class GlobalInterceptor implements ServerInterceptor { + static boolean invoked; + + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, + ServerCallHandler next) { + invoked = true; + return next.startCall(call, headers); + } + } + + @GrpcService + @RegisterInterceptor(FarewellInterceptor.class) + public static class FarewellService extends FarewellGrpc.FarewellImplBase { + @Override + public void sayGoodbye(GoodbyeRequest request, StreamObserver responseObserver) { + responseObserver.onNext(GoodbyeReply.newBuilder().setMessage("Goodbye, " + request.getName()).build()); + responseObserver.onCompleted(); + } + } + + @ApplicationScoped + public static class ServiceBInterceptor implements ServerInterceptor { + static boolean invoked; + + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, + ServerCallHandler next) { + invoked = true; + return next.startCall(call, headers); + } + } + + @ApplicationScoped + public static class FarewellInterceptor implements ServerInterceptor { + static boolean invoked; + + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, + ServerCallHandler next) { + invoked = true; + return next.startCall(call, headers); + } + } + + @ApplicationScoped + public static class UnusedInterceptor implements ServerInterceptor { + + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, + ServerCallHandler next) { + throw new IllegalStateException("Interceptor that should not be called was invoked"); + } + } +} diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyFirstInterceptor.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyFirstInterceptor.java index fd71a3a866f93..ea796231910ca 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyFirstInterceptor.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MyFirstInterceptor.java @@ -9,8 +9,10 @@ import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.Status; +import io.quarkus.grpc.GlobalInterceptor; @ApplicationScoped +@GlobalInterceptor public class MyFirstInterceptor implements ServerInterceptor, Prioritized { private volatile long callTime; diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MySecondInterceptor.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MySecondInterceptor.java index 8291e9eace301..eb93fb52f78ac 100644 --- a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MySecondInterceptor.java +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/interceptors/MySecondInterceptor.java @@ -9,8 +9,10 @@ import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.Status; +import io.quarkus.grpc.GlobalInterceptor; @ApplicationScoped +@GlobalInterceptor public class MySecondInterceptor implements ServerInterceptor, Prioritized { private volatile long callTime; diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/GlobalInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/GlobalInterceptor.java new file mode 100644 index 0000000000000..67d893b3a953a --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/GlobalInterceptor.java @@ -0,0 +1,19 @@ +package io.quarkus.grpc; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +/** + * Denotes a ServerInterceptor that should be registered for all gRPC services + * + * @see RegisterInterceptor + */ +@Target({ FIELD, PARAMETER, TYPE }) +@Retention(RUNTIME) +public @interface GlobalInterceptor { +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/RegisterInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/RegisterInterceptor.java new file mode 100644 index 0000000000000..0b24d591a311c --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/RegisterInterceptor.java @@ -0,0 +1,22 @@ +package io.quarkus.grpc; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import io.grpc.ServerInterceptor; + +/** + * Registers a {@link ServerInterceptor} for a particular gRPC service. + * + * @see GlobalInterceptor + */ +@Target({ FIELD, PARAMETER, TYPE }) +@Retention(RUNTIME) +public @interface RegisterInterceptor { + Class value(); +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/RegisterInterceptors.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/RegisterInterceptors.java new file mode 100644 index 0000000000000..33561b1eba1da --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/RegisterInterceptors.java @@ -0,0 +1,14 @@ +package io.quarkus.grpc; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ ElementType.TYPE }) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface RegisterInterceptors { + RegisterInterceptor[] value(); +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcContainer.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcContainer.java index 4c27c5c175599..6f9969e77de84 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcContainer.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcContainer.java @@ -1,53 +1,95 @@ package io.quarkus.grpc.runtime; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.stream.Collectors; +import java.util.Set; import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.inject.Any; import javax.enterprise.inject.Instance; import javax.enterprise.inject.spi.Prioritized; import javax.inject.Inject; import io.grpc.BindableService; import io.grpc.ServerInterceptor; +import io.quarkus.arc.Arc; +import io.quarkus.arc.InstanceHandle; import io.quarkus.grpc.GrpcService; @ApplicationScoped public class GrpcContainer { + private static final Comparator INTERCEPTOR_COMPARATOR = new Comparator<>() { + @Override + public int compare(ServerInterceptor si1, ServerInterceptor si2) { + int p1 = 0; + int p2 = 0; + if (si1 instanceof Prioritized) { + p1 = ((Prioritized) si1).getPriority(); + } + if (si2 instanceof Prioritized) { + p2 = ((Prioritized) si2).getPriority(); + } + if (si1.equals(si2)) { + return 0; + } + return Integer.compare(p1, p2); + } + }; + @Inject @GrpcService Instance services; @Inject - @Any Instance interceptors; - List getSortedInterceptors() { + @Inject + InterceptorStorage perServiceInterceptors; + + List getSortedPerServiceInterceptors(String serviceClassName) { + Set> interceptorClasses = perServiceInterceptors.getInterceptors(serviceClassName); + if (interceptorClasses == null || interceptorClasses.isEmpty()) { + return Collections.emptyList(); + } + + List interceptors = new ArrayList<>(); + + for (Class interceptorClass : interceptorClasses) { + InstanceHandle interceptorInstance = Arc.container().instance(interceptorClass); + ServerInterceptor serverInterceptor = interceptorInstance.get(); + if (serverInterceptor == null) { + throw new IllegalArgumentException("Server interceptor class " + interceptorClass + " is not a CDI bean. " + + "Only CDI beans can be used as gRPC server interceptors. Add one of the scope-defining annotations" + + " (@Singleton, @ApplicationScoped, @RequestScoped) on the interceptor class."); + } + interceptors.add(serverInterceptor); + } + interceptors.sort(INTERCEPTOR_COMPARATOR); + + return interceptors; + } + + List getSortedGlobalInterceptors() { if (interceptors.isUnsatisfied()) { return Collections.emptyList(); } - return interceptors.stream().sorted(new Comparator<>() { // NOSONAR - @Override - public int compare(ServerInterceptor si1, ServerInterceptor si2) { - int p1 = 0; - int p2 = 0; - if (si1 instanceof Prioritized) { - p1 = ((Prioritized) si1).getPriority(); - } - if (si2 instanceof Prioritized) { - p2 = ((Prioritized) si2).getPriority(); - } - if (si1.equals(si2)) { - return 0; - } - return Integer.compare(p1, p2); + Set> globalInterceptors = perServiceInterceptors.getGlobalInterceptors(); + List interceptors = new ArrayList<>(); + for (Class interceptorClass : globalInterceptors) { + InstanceHandle interceptorInstance = Arc.container().instance(interceptorClass); + ServerInterceptor serverInterceptor = interceptorInstance.get(); + if (serverInterceptor == null) { + throw new IllegalArgumentException("Server interceptor class " + interceptorClass + " is not a CDI bean. " + + "Only CDI beans can be used as gRPC server interceptors. Add one of the scope-defining annotations" + + " (@Singleton, @ApplicationScoped, @RequestScoped) on the interceptor class."); } - }).collect(Collectors.toList()); + interceptors.add(serverInterceptor); + } + interceptors.sort(INTERCEPTOR_COMPARATOR); + return interceptors; } public Instance getServices() { diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java index 6d9579b9867ab..b30b00a64314b 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java @@ -286,7 +286,7 @@ private void devModeReload(GrpcContainer grpcContainer, Vertx vertx, GrpcServerC List servicesWithInterceptors = new ArrayList<>(); CompressionInterceptor compressionInterceptor = prepareCompressionInterceptor(configuration); for (GrpcServiceDefinition service : services) { - servicesWithInterceptors.add(serviceWithInterceptors(vertx, compressionInterceptor, service, true)); + servicesWithInterceptors.add(serviceWithInterceptors(vertx, grpcContainer, compressionInterceptor, service, true)); } for (ServerServiceDefinition serviceWithInterceptors : servicesWithInterceptors) { @@ -298,7 +298,7 @@ private void devModeReload(GrpcContainer grpcContainer, Vertx vertx, GrpcServerC initHealthStorage(); - GrpcServerReloader.reinitialize(servicesWithInterceptors, methods, grpcContainer.getSortedInterceptors()); + GrpcServerReloader.reinitialize(servicesWithInterceptors, methods, grpcContainer.getSortedGlobalInterceptors()); shutdown.addShutdownTask( new Runnable() { // NOSONAR @@ -357,7 +357,8 @@ public void handle(HttpServerOptions options) { for (GrpcServiceDefinition service : toBeRegistered) { builder.addService( - serviceWithInterceptors(vertx, compressionInterceptor, service, launchMode == LaunchMode.DEVELOPMENT)); + serviceWithInterceptors(vertx, grpcContainer, compressionInterceptor, service, + launchMode == LaunchMode.DEVELOPMENT)); LOGGER.debugf("Registered gRPC service '%s'", service.definition.getServiceDescriptor().getName()); definitions.add(service.definition); } @@ -367,7 +368,7 @@ public void handle(HttpServerOptions options) { builder.addService(new ReflectionService(definitions)); } - for (ServerInterceptor serverInterceptor : grpcContainer.getSortedInterceptors()) { + for (ServerInterceptor serverInterceptor : grpcContainer.getSortedGlobalInterceptors()) { builder.intercept(serverInterceptor); } @@ -413,12 +414,15 @@ private CompressionInterceptor prepareCompressionInterceptor(GrpcServerConfigura return compressionInterceptor; } - private ServerServiceDefinition serviceWithInterceptors(Vertx vertx, CompressionInterceptor compressionInterceptor, - GrpcServiceDefinition service, boolean devMode) { + private ServerServiceDefinition serviceWithInterceptors(Vertx vertx, GrpcContainer grpcContainer, + CompressionInterceptor compressionInterceptor, GrpcServiceDefinition service, boolean devMode) { List interceptors = new ArrayList<>(); if (compressionInterceptor != null) { interceptors.add(compressionInterceptor); } + + interceptors.addAll(grpcContainer.getSortedPerServiceInterceptors(service.getImplementationClassName())); + // We only register the blocking interceptor if needed by at least one method of the service. if (!blockingMethodsPerService.isEmpty()) { List list = blockingMethodsPerService.get(service.getImplementationClassName()); diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/InterceptorStorage.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/InterceptorStorage.java new file mode 100644 index 0000000000000..0d5974d9d59ea --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/InterceptorStorage.java @@ -0,0 +1,33 @@ +package io.quarkus.grpc.runtime; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import io.grpc.ServerInterceptor; + +public abstract class InterceptorStorage { + private final Map>> perServiceInterceptors = new HashMap<>(); + private final Set> globalInterceptors = new HashSet<>(); + + public Set> getInterceptors(String serviceClassName) { + return perServiceInterceptors.get(serviceClassName); + } + + public Set> getGlobalInterceptors() { + return globalInterceptors; + } + + @SuppressWarnings("unused") // used by generated code + public void addGlobalInterceptor(Class interceptor) { + globalInterceptors.add(interceptor); + } + + @SuppressWarnings("unused") // used by generated code + public void addInterceptor(String serviceClassName, Class interceptor) { + Set> interceptors = perServiceInterceptors.computeIfAbsent(serviceClassName, + c -> new HashSet<>()); + interceptors.add(interceptor); + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/metrics/GrpcMetricsServerInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/metrics/GrpcMetricsServerInterceptor.java index d3ad2212825d6..dab92a9e062a5 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/metrics/GrpcMetricsServerInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/metrics/GrpcMetricsServerInterceptor.java @@ -7,8 +7,10 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.binder.grpc.MetricCollectingServerInterceptor; +import io.quarkus.grpc.GlobalInterceptor; @Singleton +@GlobalInterceptor public class GrpcMetricsServerInterceptor extends MetricCollectingServerInterceptor implements Prioritized { @Inject diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java index bfa7ff4bc250d..ed2a9a6729ef4 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java @@ -13,10 +13,12 @@ import io.quarkus.arc.Arc; import io.quarkus.arc.InjectableContext; import io.quarkus.arc.ManagedContext; +import io.quarkus.grpc.GlobalInterceptor; import io.vertx.core.Context; import io.vertx.core.Vertx; @ApplicationScoped +@GlobalInterceptor public class GrpcRequestContextGrpcInterceptor implements ServerInterceptor, Prioritized { private static final Logger log = Logger.getLogger(GrpcRequestContextGrpcInterceptor.class.getName()); diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/RestClientReactiveProcessor.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/RestClientReactiveProcessor.java index 8e1ecbdb16b86..386f9406ded82 100644 --- a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/RestClientReactiveProcessor.java +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/RestClientReactiveProcessor.java @@ -376,7 +376,7 @@ void addRestClientBeans(Capabilities capabilities, // METHODS: for (MethodInfo method : restMethods) { - // for each method method that corresponds to making a rest call, create a method like: + // for each method that corresponds to making a rest call, create a method like: // public JsonArray get() { // return ((InterfaceClass)this.getDelegate()).get(); // } diff --git a/integration-tests/grpc-interceptors/src/main/java/io/quarkus/grpc/examples/interceptors/HeaderServerInterceptor.java b/integration-tests/grpc-interceptors/src/main/java/io/quarkus/grpc/examples/interceptors/HeaderServerInterceptor.java index c507980faced3..e04db0ea0e080 100644 --- a/integration-tests/grpc-interceptors/src/main/java/io/quarkus/grpc/examples/interceptors/HeaderServerInterceptor.java +++ b/integration-tests/grpc-interceptors/src/main/java/io/quarkus/grpc/examples/interceptors/HeaderServerInterceptor.java @@ -11,11 +11,13 @@ import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; +import io.quarkus.grpc.GlobalInterceptor; /** * A interceptor to handle server header. */ @ApplicationScoped +@GlobalInterceptor public class HeaderServerInterceptor implements ServerInterceptor { private static final Logger logger = Logger.getLogger(HeaderServerInterceptor.class.getName()); @@ -30,7 +32,7 @@ public ServerCall.Listener interceptCall( final Metadata requestHeaders, ServerCallHandler next) { logger.info("header received from client:" + requestHeaders); - return next.startCall(new ForwardingServerCall.SimpleForwardingServerCall(call) { + return next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<>(call) { @Override public void sendHeaders(Metadata responseHeaders) { responseHeaders.put(CUSTOM_HEADER_KEY, "customRespondValue"); diff --git a/integration-tests/grpc-plain-text-mutiny/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldService.java b/integration-tests/grpc-plain-text-mutiny/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldService.java index 6b13fdf54462d..af5c27562db06 100644 --- a/integration-tests/grpc-plain-text-mutiny/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldService.java +++ b/integration-tests/grpc-plain-text-mutiny/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldService.java @@ -6,9 +6,11 @@ import examples.HelloRequest; import examples.MutinyGreeterGrpc; import io.quarkus.grpc.GrpcService; +import io.quarkus.grpc.RegisterInterceptor; import io.smallrye.mutiny.Uni; @GrpcService +@RegisterInterceptor(IncomingInterceptor.class) public class HelloWorldService extends MutinyGreeterGrpc.GreeterImplBase { AtomicInteger counter = new AtomicInteger();