From a7e0c42e88e842d02982f050d645ef5d66cb7333 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Mon, 18 Nov 2019 15:10:40 +1100 Subject: [PATCH] Add the notion of a current vert.x request This allows the Vert.x routing context to be injected --- .../runtime/standalone/VertxHttpRequest.java | 80 +++++++++++++------ .../standalone/VertxRequestHandler.java | 50 ++++++++---- .../runtime/UndertowDeploymentRecorder.java | 11 +++ .../http/deployment/VertxHttpProcessor.java | 7 +- .../http/runtime/CurrentVertxRequest.java | 80 +++++++++++++++++++ 5 files changed, 186 insertions(+), 42 deletions(-) create mode 100644 extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/CurrentVertxRequest.java diff --git a/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java b/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java index eee6889ec0381b..1560a717b0bac0 100644 --- a/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java +++ b/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java @@ -5,7 +5,6 @@ import java.util.Collections; import java.util.Date; import java.util.Enumeration; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -26,7 +25,9 @@ import org.jboss.resteasy.spi.ResteasyAsynchronousContext; import org.jboss.resteasy.spi.ResteasyAsynchronousResponse; +import io.quarkus.arc.ManagedContext; import io.vertx.core.Context; +import io.vertx.ext.web.RoutingContext; /** * Abstraction for an inbound http request on the server, or a response from a server to a client @@ -43,18 +44,21 @@ public final class VertxHttpRequest extends BaseHttpRequest { private String httpMethod; private LazyHostSupplier remoteHost; private InputStream inputStream; - private Map attributes; private VertxHttpResponse response; private VertxExecutionContext executionContext; + private final RoutingContext routingContext; private final Context context; + private final ManagedContext requestContext; + private final ManagedContext.ContextState requestContextState; public VertxHttpRequest(Context context, + RoutingContext routingContext, ResteasyHttpHeaders httpHeaders, ResteasyUriInfo uri, String httpMethod, LazyHostSupplier remoteHost, SynchronousDispatcher dispatcher, - VertxHttpResponse response) { + VertxHttpResponse response, ManagedContext requestContext) { super(uri); this.context = context; this.response = response; @@ -62,6 +66,9 @@ public VertxHttpRequest(Context context, this.httpMethod = httpMethod; this.remoteHost = remoteHost; this.executionContext = new VertxExecutionContext(this, response, dispatcher); + this.requestContext = requestContext; + this.requestContextState = requestContext.getState(); + this.routingContext = routingContext; } @Override @@ -76,7 +83,7 @@ public void setHttpMethod(String method) { @Override public Enumeration getAttributeNames() { - final Map attributes = this.attributes; + final Map attributes = routingContext.data(); if (attributes == null) { return Collections.emptyEnumeration(); } else { @@ -104,22 +111,17 @@ public ResteasyAsynchronousContext getAsyncContext() { @Override public Object getAttribute(String attribute) { - return attributes != null ? attributes.get(attribute) : null; + return routingContext.get(attribute); } @Override public void setAttribute(String name, Object value) { - if (attributes == null) { - attributes = new HashMap(); - } - attributes.put(name, value); + routingContext.put(name, value); } @Override public void removeAttribute(String name) { - if (attributes != null) { - attributes.remove(name); - } + routingContext.remove(name); } @Override @@ -241,11 +243,12 @@ public void initialRequestThreadFinished() { @Override public void complete() { synchronized (responseLock) { - if (done) - return; - if (cancelled) + if (done || cancelled) { return; + } done = true; + requestContext.activate(requestContextState); + requestContext.terminate(); vertxFlush(); } } @@ -258,7 +261,12 @@ public boolean resume(Object entity) { if (cancelled) return false; done = true; - return internalResume(entity, t -> vertxFlush()); + requestContext.activate(requestContextState); + try { + return internalResume(entity, t -> vertxFlush()); + } finally { + requestContext.terminate(); + } } } @@ -270,7 +278,12 @@ public boolean resume(Throwable ex) { if (cancelled) return false; done = true; - return internalResume(ex, t -> vertxFlush()); + requestContext.activate(requestContextState); + try { + return internalResume(ex, t -> vertxFlush()); + } finally { + requestContext.terminate(); + } } } @@ -285,7 +298,12 @@ public boolean cancel() { } done = true; cancelled = true; - return internalResume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build(), t -> vertxFlush()); + requestContext.activate(requestContextState); + try { + return internalResume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build(), t -> vertxFlush()); + } finally { + requestContext.terminate(); + } } } @@ -298,10 +316,15 @@ public boolean cancel(int retryAfter) { return false; done = true; cancelled = true; - return internalResume( - Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter) - .build(), - t -> vertxFlush()); + requestContext.activate(requestContextState); + try { + return internalResume( + Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter) + .build(), + t -> vertxFlush()); + } finally { + requestContext.terminate(); + } } } @@ -322,10 +345,15 @@ public boolean cancel(Date retryAfter) { return false; done = true; cancelled = true; - return internalResume( - Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter) - .build(), - t -> vertxFlush()); + requestContext.activate(requestContextState); + try { + return internalResume( + Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter) + .build(), + t -> vertxFlush()); + } finally { + requestContext.terminate(); + } } } diff --git a/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxRequestHandler.java b/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxRequestHandler.java index 154150ba90a771..d57c8a4563e8de 100644 --- a/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxRequestHandler.java +++ b/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxRequestHandler.java @@ -18,6 +18,7 @@ import io.quarkus.arc.ManagedContext; import io.quarkus.arc.runtime.BeanContainer; import io.quarkus.security.identity.CurrentIdentityAssociation; +import io.quarkus.vertx.http.runtime.CurrentVertxRequest; import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser; import io.vertx.core.Context; import io.vertx.core.Handler; @@ -38,6 +39,7 @@ public class VertxRequestHandler implements Handler { protected final BufferAllocator allocator; protected final BeanContainer beanContainer; protected final CurrentIdentityAssociation association; + protected final CurrentVertxRequest currentVertxRequest; public VertxRequestHandler(Vertx vertx, BeanContainer beanContainer, @@ -52,6 +54,7 @@ public VertxRequestHandler(Vertx vertx, this.allocator = allocator; Instance association = CDI.current().select(CurrentIdentityAssociation.class); this.association = association.isResolvable() ? association.get() : null; + currentVertxRequest = CDI.current().select(CurrentVertxRequest.class).get(); } @Override @@ -67,26 +70,22 @@ public void handle(RoutingContext request) { } vertx.executeBlocking(event -> { - dispatchRequestContext(request, is, new VertxBlockingOutput(request.request())); + dispatch(request, is, new VertxBlockingOutput(request.request())); }, false, event -> { + if (event.failed()) { + request.fail(event.cause()); + } }); } - private void dispatchRequestContext(RoutingContext request, InputStream is, VertxOutput output) { + private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput output) { ManagedContext requestContext = beanContainer.requestContext(); requestContext.activate(); - QuarkusHttpUser user = (QuarkusHttpUser) request.user(); + QuarkusHttpUser user = (QuarkusHttpUser) routingContext.user(); if (user != null && association != null) { association.setIdentity(user.getSecurityIdentity()); } - try { - dispatch(request, is, output); - } finally { - requestContext.terminate(); - } - } - - private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput output) { + currentVertxRequest.setCurrent(routingContext); try { Context ctx = vertx.getOrCreateContext(); HttpServerRequest request = routingContext.request(); @@ -99,8 +98,9 @@ private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput // using a supplier to make the remote Address resolution lazy: often it's not needed and it's not very cheap to create. LazyHostSupplier hostSupplier = new LazyHostSupplier(request); - VertxHttpRequest vertxRequest = new VertxHttpRequest(ctx, headers, uriInfo, request.rawMethod(), hostSupplier, - dispatcher.getDispatcher(), vertxResponse); + VertxHttpRequest vertxRequest = new VertxHttpRequest(ctx, routingContext, headers, uriInfo, request.rawMethod(), + hostSupplier, + dispatcher.getDispatcher(), vertxResponse, requestContext); vertxRequest.setInputStream(is); try { ResteasyContext.pushContext(SecurityContext.class, new QuarkusResteasySecurityContext(request)); @@ -115,15 +115,35 @@ private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput routingContext.fail(ex); } - if (!vertxRequest.getAsyncContext().isSuspended()) { + boolean suspended = vertxRequest.getAsyncContext().isSuspended(); + boolean requestContextActive = requestContext.isActive(); + if (requestContextActive) { + //it is possible that there was an async response, that then finished in the same thread + //the async response will have terminated the request context in this case + currentVertxRequest.initialInvocationComplete(suspended); + } + if (!suspended) { try { vertxResponse.finish(); } catch (IOException e) { log.error("Unexpected failure", e); + } finally { + if (requestContextActive) { + requestContext.terminate(); + } } + } else { + //we need the request context to stick around + requestContext.deactivate(); } } catch (Throwable t) { - routingContext.fail(t); + try { + routingContext.fail(t); + } finally { + if (requestContext.isActive()) { + requestContext.terminate(); + } + } } } } diff --git a/extensions/undertow/runtime/src/main/java/io/quarkus/undertow/runtime/UndertowDeploymentRecorder.java b/extensions/undertow/runtime/src/main/java/io/quarkus/undertow/runtime/UndertowDeploymentRecorder.java index 594b8b11831e24..1537a5b69ea75e 100644 --- a/extensions/undertow/runtime/src/main/java/io/quarkus/undertow/runtime/UndertowDeploymentRecorder.java +++ b/extensions/undertow/runtime/src/main/java/io/quarkus/undertow/runtime/UndertowDeploymentRecorder.java @@ -37,6 +37,7 @@ import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; import io.quarkus.runtime.configuration.MemorySize; +import io.quarkus.vertx.http.runtime.CurrentVertxRequest; import io.quarkus.vertx.http.runtime.HttpConfiguration; import io.undertow.httpcore.BufferAllocator; import io.undertow.httpcore.StatusCodes; @@ -448,6 +449,7 @@ public ServletExtension setupRequestScope(BeanContainer beanContainer) { return new ServletExtension() { @Override public void handleDeployment(DeploymentInfo deploymentInfo, ServletContext servletContext) { + CurrentVertxRequest currentVertxRequest = CDI.current().select(CurrentVertxRequest.class).get(); deploymentInfo.addThreadSetupAction(new ThreadSetupHandler() { @Override public ThreadSetupHandler.Action create(Action action) { @@ -458,6 +460,7 @@ public T call(HttpServerExchange exchange, C context) throws Exception { if (exchange == null) { return action.call(exchange, context); } + boolean vertxFirst = false; ManagedContext requestContext = beanContainer.requestContext(); if (requestContext.isActive()) { return action.call(exchange, context); @@ -466,11 +469,19 @@ public T call(HttpServerExchange exchange, C context) throws Exception { .getAttachment(REQUEST_CONTEXT); try { requestContext.activate(existingRequestContext); + vertxFirst = existingRequestContext == null; + + VertxHttpExchange delegate = (VertxHttpExchange) exchange.getDelegate(); + RoutingContext rc = (RoutingContext) delegate.getContext(); + currentVertxRequest.setCurrent(rc); return action.call(exchange, context); } finally { ServletRequestContext src = exchange .getAttachment(ServletRequestContext.ATTACHMENT_KEY); HttpServletRequestImpl req = src.getOriginalRequest(); + if (vertxFirst) { + currentVertxRequest.initialInvocationComplete(req.isAsyncStarted()); + } if (req.isAsyncStarted()) { exchange.putAttachment(REQUEST_CONTEXT, requestContext.getState()); requestContext.deactivate(); diff --git a/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/VertxHttpProcessor.java b/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/VertxHttpProcessor.java index 3171a927ed9da6..10529a8c85d637 100644 --- a/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/VertxHttpProcessor.java +++ b/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/VertxHttpProcessor.java @@ -27,6 +27,7 @@ import io.quarkus.runtime.RuntimeValue; import io.quarkus.vertx.core.deployment.EventLoopCountBuildItem; import io.quarkus.vertx.core.deployment.InternalWebVertxBuildItem; +import io.quarkus.vertx.http.runtime.CurrentVertxRequest; import io.quarkus.vertx.http.runtime.HttpBuildTimeConfig; import io.quarkus.vertx.http.runtime.HttpConfiguration; import io.quarkus.vertx.http.runtime.RouterProducer; @@ -51,7 +52,11 @@ FilterBuildItem cors(CORSRecorder recorder, HttpConfiguration configuration) { @BuildStep AdditionalBeanBuildItem additionalBeans() { - return AdditionalBeanBuildItem.unremovableOf(RouterProducer.class); + return AdditionalBeanBuildItem.builder() + .setUnremovable() + .addBeanClass(RouterProducer.class) + .addBeanClass(CurrentVertxRequest.class) + .build(); } /** diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/CurrentVertxRequest.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/CurrentVertxRequest.java new file mode 100644 index 00000000000000..e7dd3617c3220d --- /dev/null +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/CurrentVertxRequest.java @@ -0,0 +1,80 @@ +package io.quarkus.vertx.http.runtime; + +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.PreDestroy; +import javax.enterprise.context.RequestScoped; +import javax.enterprise.inject.Produces; + +import org.jboss.logging.Logger; + +import io.vertx.ext.web.RoutingContext; + +@RequestScoped +public class CurrentVertxRequest { + + private final Logger log = Logger.getLogger(CurrentVertxRequest.class); + + public RoutingContext current; + private List doneListeners; + + @Produces + @RequestScoped + public RoutingContext getCurrent() { + return current; + } + + public CurrentVertxRequest setCurrent(RoutingContext current) { + this.current = current; + return this; + } + + public void addRequestDoneListener(Listener doneListener) { + if (doneListeners == null) { + doneListeners = new ArrayList<>(); + } + doneListeners.add(doneListener); + } + + public void initialInvocationComplete(boolean goingAsync) { + + if (current == null) { + return; + } + if (doneListeners != null) { + for (Listener i : doneListeners) { + try { + i.initialInvocationComplete(current, goingAsync); + } catch (Throwable t) { + log.errorf(t, "Failed to process invocation listener %s", i); + } + } + } + } + + @PreDestroy + void done() { + if (current == null) { + return; + } + if (doneListeners != null) { + for (Listener i : doneListeners) { + try { + i.responseComplete(current); + } catch (Throwable t) { + log.errorf(t, "Failed to process done listener %s", i); + } + } + } + } + + public interface Listener { + + void initialInvocationComplete(RoutingContext routingContext, boolean goingAsync); + + void responseComplete(RoutingContext routingContext); + + } + +}