From 56e7d9cb89a5bba3fb978b079febcff89f66aa11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Szynkiewicz?= Date: Fri, 18 Jun 2021 11:54:02 +0200 Subject: [PATCH] gRPC: fix request context propagation - post review fixes --- .../GrpcRequestContextGrpcInterceptor.java | 139 +++++++++--------- 1 file changed, 72 insertions(+), 67 deletions(-) diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java index b453e31be5435..7841ae58a2899 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/context/GrpcRequestContextGrpcInterceptor.java @@ -2,13 +2,11 @@ import org.jboss.logging.Logger; -import io.grpc.ForwardingServerCall; import io.grpc.ForwardingServerCallListener; import io.grpc.Metadata; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; -import io.grpc.Status; import io.quarkus.arc.Arc; import io.quarkus.arc.InjectableContext; import io.quarkus.arc.ManagedContext; @@ -38,90 +36,97 @@ public ServerCall.Listener interceptCall(ServerCall delegate = next - .startCall(new ForwardingServerCall.SimpleForwardingServerCall(call) { - - @Override - public void close(Status status, Metadata trailers) { - super.close(status, trailers); - if (state != null) { - capturedVertxContext.runOnContext(event -> { - reqContext.deactivate(); - reqContext.destroy(state); - }); - } - } - }, headers); - // a gRPC service can return a StreamObserver and instead of doing the work // directly in the method body, do stuff that requires a request context in StreamObserver's methods // let's propagate the request context to these methods: - return new ForwardingServerCallListener.SimpleForwardingServerCallListener(delegate) { - - @Override - public void onMessage(ReqT message) { - activateContext(); - try { - super.onMessage(message); - } finally { - deactivateContext(); + try { + return new ForwardingServerCallListener.SimpleForwardingServerCallListener( + next.startCall(call, headers)) { + + @Override + public void onMessage(ReqT message) { + boolean activated = activateContext(); + try { + super.onMessage(message); + } finally { + if (activated) { + deactivateContext(); + } + } } - } - @Override - public void onReady() { - activateContext(); - try { - super.onReady(); - } finally { - deactivateContext(); + @Override + public void onReady() { + boolean activated = activateContext(); + try { + super.onReady(); + } finally { + if (activated) { + deactivateContext(); + } + } } - } - @Override - public void onHalfClose() { - activateContext(); - try { - super.onHalfClose(); - } finally { - deactivateContext(); + @Override + public void onHalfClose() { + boolean activated = activateContext(); + try { + super.onHalfClose(); + } finally { + if (activated) { + deactivateContext(); + } + } } - } - @Override - public void onCancel() { - activateContext(); - try { - super.onHalfClose(); - } finally { - deactivateContext(); + @Override + public void onCancel() { + boolean activated = activateContext(); + try { + super.onCancel(); + } finally { + if (activated) { + deactivateContext(); + } + if (state != null) { + reqContext.destroy(state); + } + } } - } - @Override - public void onComplete() { - activateContext(); - try { - super.onComplete(); - } finally { - deactivateContext(); + @Override + public void onComplete() { + boolean activated = activateContext(); + try { + super.onComplete(); + } finally { + if (activated) { + deactivateContext(); + } + if (state != null) { + reqContext.destroy(state); + } + } } - } - private void deactivateContext() { - if (state != null) { + private void deactivateContext() { reqContext.deactivate(); } - } - private void activateContext() { - if (state != null && !reqContext.isActive()) { - reqContext.activate(state); + private boolean activateContext() { + if (state != null && !reqContext.isActive()) { + reqContext.activate(state); + return true; + } + return false; } - } - }; + }; + } finally { + reqContext.deactivate(); + } } else { log.warn("Unable to activate the request scope - interceptor not called on the Vert.x event loop"); return next.startCall(call, headers);