Skip to content

Commit

Permalink
Add the notion of a current vert.x request
Browse files Browse the repository at this point in the history
This allows the Vert.x routing context to be injected
  • Loading branch information
stuartwdouglas committed Nov 18, 2019
1 parent bf9e8f3 commit f1fd426
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.Collections;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -26,7 +25,9 @@
import org.jboss.resteasy.spi.ResteasyAsynchronousContext;
import org.jboss.resteasy.spi.ResteasyAsynchronousResponse;

import io.quarkus.arc.ManagedContext;
import io.vertx.core.Context;
import io.vertx.ext.web.RoutingContext;

/**
* Abstraction for an inbound http request on the server, or a response from a server to a client
Expand All @@ -43,25 +44,31 @@ public final class VertxHttpRequest extends BaseHttpRequest {
private String httpMethod;
private LazyHostSupplier remoteHost;
private InputStream inputStream;
private Map<String, Object> attributes;
private VertxHttpResponse response;
private VertxExecutionContext executionContext;
private final RoutingContext routingContext;
private final Context context;
private final ManagedContext requestContext;
private final ManagedContext.ContextState requestContextState;

public VertxHttpRequest(Context context,
RoutingContext routingContext,
ResteasyHttpHeaders httpHeaders,
ResteasyUriInfo uri,
String httpMethod,
LazyHostSupplier remoteHost,
SynchronousDispatcher dispatcher,
VertxHttpResponse response) {
VertxHttpResponse response, ManagedContext requestContext) {
super(uri);
this.context = context;
this.response = response;
this.httpHeaders = httpHeaders;
this.httpMethod = httpMethod;
this.remoteHost = remoteHost;
this.executionContext = new VertxExecutionContext(this, response, dispatcher);
this.requestContext = requestContext;
this.requestContextState = requestContext.getState();
this.routingContext = routingContext;
}

@Override
Expand All @@ -76,7 +83,7 @@ public void setHttpMethod(String method) {

@Override
public Enumeration<String> getAttributeNames() {
final Map<String, Object> attributes = this.attributes;
final Map<String, Object> attributes = routingContext.data();
if (attributes == null) {
return Collections.emptyEnumeration();
} else {
Expand Down Expand Up @@ -104,22 +111,17 @@ public ResteasyAsynchronousContext getAsyncContext() {

@Override
public Object getAttribute(String attribute) {
return attributes != null ? attributes.get(attribute) : null;
return routingContext.get(attribute);
}

@Override
public void setAttribute(String name, Object value) {
if (attributes == null) {
attributes = new HashMap<String, Object>();
}
attributes.put(name, value);
routingContext.put(name, value);
}

@Override
public void removeAttribute(String name) {
if (attributes != null) {
attributes.remove(name);
}
routingContext.remove(name);
}

@Override
Expand Down Expand Up @@ -241,11 +243,12 @@ public void initialRequestThreadFinished() {
@Override
public void complete() {
synchronized (responseLock) {
if (done)
return;
if (cancelled)
if (done || cancelled) {
return;
}
done = true;
requestContext.activate(requestContextState);
requestContext.terminate();
vertxFlush();
}
}
Expand All @@ -258,7 +261,12 @@ public boolean resume(Object entity) {
if (cancelled)
return false;
done = true;
return internalResume(entity, t -> vertxFlush());
requestContext.activate(requestContextState);
try {
return internalResume(entity, t -> vertxFlush());
} finally {
requestContext.terminate();
}
}
}

Expand All @@ -270,7 +278,12 @@ public boolean resume(Throwable ex) {
if (cancelled)
return false;
done = true;
return internalResume(ex, t -> vertxFlush());
requestContext.activate(requestContextState);
try {
return internalResume(ex, t -> vertxFlush());
} finally {
requestContext.terminate();
}
}
}

Expand All @@ -285,7 +298,12 @@ public boolean cancel() {
}
done = true;
cancelled = true;
return internalResume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build(), t -> vertxFlush());
requestContext.activate(requestContextState);
try {
return internalResume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build(), t -> vertxFlush());
} finally {
requestContext.terminate();
}
}
}

Expand All @@ -298,10 +316,15 @@ public boolean cancel(int retryAfter) {
return false;
done = true;
cancelled = true;
return internalResume(
Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter)
.build(),
t -> vertxFlush());
requestContext.activate(requestContextState);
try {
return internalResume(
Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter)
.build(),
t -> vertxFlush());
} finally {
requestContext.terminate();
}
}
}

Expand All @@ -322,10 +345,15 @@ public boolean cancel(Date retryAfter) {
return false;
done = true;
cancelled = true;
return internalResume(
Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter)
.build(),
t -> vertxFlush());
requestContext.activate(requestContextState);
try {
return internalResume(
Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter)
.build(),
t -> vertxFlush());
} finally {
requestContext.terminate();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.quarkus.arc.ManagedContext;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.security.identity.CurrentIdentityAssociation;
import io.quarkus.vertx.http.runtime.CurrentVertxRequest;
import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser;
import io.vertx.core.Context;
import io.vertx.core.Handler;
Expand All @@ -38,6 +39,7 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
protected final BufferAllocator allocator;
protected final BeanContainer beanContainer;
protected final CurrentIdentityAssociation association;
protected final CurrentVertxRequest currentVertxRequest;

public VertxRequestHandler(Vertx vertx,
BeanContainer beanContainer,
Expand All @@ -52,6 +54,7 @@ public VertxRequestHandler(Vertx vertx,
this.allocator = allocator;
Instance<CurrentIdentityAssociation> association = CDI.current().select(CurrentIdentityAssociation.class);
this.association = association.isResolvable() ? association.get() : null;
currentVertxRequest = CDI.current().select(CurrentVertxRequest.class).get();
}

@Override
Expand All @@ -67,26 +70,22 @@ public void handle(RoutingContext request) {
}

vertx.executeBlocking(event -> {
dispatchRequestContext(request, is, new VertxBlockingOutput(request.request()));
dispatch(request, is, new VertxBlockingOutput(request.request()));
}, false, event -> {
if (event.failed()) {
request.fail(event.cause());
}
});
}

private void dispatchRequestContext(RoutingContext request, InputStream is, VertxOutput output) {
private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput output) {
ManagedContext requestContext = beanContainer.requestContext();
requestContext.activate();
QuarkusHttpUser user = (QuarkusHttpUser) request.user();
QuarkusHttpUser user = (QuarkusHttpUser) routingContext.user();
if (user != null && association != null) {
association.setIdentity(user.getSecurityIdentity());
}
try {
dispatch(request, is, output);
} finally {
requestContext.terminate();
}
}

private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput output) {
currentVertxRequest.setCurrent(routingContext);
try {
Context ctx = vertx.getOrCreateContext();
HttpServerRequest request = routingContext.request();
Expand All @@ -99,8 +98,9 @@ private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput
// using a supplier to make the remote Address resolution lazy: often it's not needed and it's not very cheap to create.
LazyHostSupplier hostSupplier = new LazyHostSupplier(request);

VertxHttpRequest vertxRequest = new VertxHttpRequest(ctx, headers, uriInfo, request.rawMethod(), hostSupplier,
dispatcher.getDispatcher(), vertxResponse);
VertxHttpRequest vertxRequest = new VertxHttpRequest(ctx, routingContext, headers, uriInfo, request.rawMethod(),
hostSupplier,
dispatcher.getDispatcher(), vertxResponse, requestContext);
vertxRequest.setInputStream(is);
try {
ResteasyContext.pushContext(SecurityContext.class, new QuarkusResteasySecurityContext(request));
Expand All @@ -115,15 +115,35 @@ private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput
routingContext.fail(ex);
}

if (!vertxRequest.getAsyncContext().isSuspended()) {
boolean suspended = vertxRequest.getAsyncContext().isSuspended();
boolean requestContextActive = requestContext.isActive();
if (requestContextActive) {
//it is possible that there was an async response, that then finished in the same thread
//the async response will have terminated the request context in this case
currentVertxRequest.initialInvocationComplete(suspended);
}
if (!suspended) {
try {
vertxResponse.finish();
} catch (IOException e) {
log.error("Unexpected failure", e);
} finally {
if (requestContextActive) {
requestContext.terminate();
}
}
} else {
//we need the request context to stick around
requestContext.deactivate();
}
} catch (Throwable t) {
routingContext.fail(t);
try {
routingContext.fail(t);
} finally {
if (requestContext.isActive()) {
requestContext.terminate();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.runtime.configuration.MemorySize;
import io.quarkus.vertx.http.runtime.CurrentVertxRequest;
import io.quarkus.vertx.http.runtime.HttpConfiguration;
import io.undertow.httpcore.BufferAllocator;
import io.undertow.httpcore.StatusCodes;
Expand Down Expand Up @@ -448,6 +449,7 @@ public ServletExtension setupRequestScope(BeanContainer beanContainer) {
return new ServletExtension() {
@Override
public void handleDeployment(DeploymentInfo deploymentInfo, ServletContext servletContext) {
CurrentVertxRequest currentVertxRequest = CDI.current().select(CurrentVertxRequest.class).get();
deploymentInfo.addThreadSetupAction(new ThreadSetupHandler() {
@Override
public <T, C> ThreadSetupHandler.Action<T, C> create(Action<T, C> action) {
Expand All @@ -458,6 +460,7 @@ public T call(HttpServerExchange exchange, C context) throws Exception {
if (exchange == null) {
return action.call(exchange, context);
}
boolean vertxFirst = false;
ManagedContext requestContext = beanContainer.requestContext();
if (requestContext.isActive()) {
return action.call(exchange, context);
Expand All @@ -466,11 +469,19 @@ public T call(HttpServerExchange exchange, C context) throws Exception {
.getAttachment(REQUEST_CONTEXT);
try {
requestContext.activate(existingRequestContext);
vertxFirst = existingRequestContext == null;

VertxHttpExchange delegate = (VertxHttpExchange) exchange.getDelegate();
RoutingContext rc = (RoutingContext) delegate.getContext();
currentVertxRequest.setCurrent(rc);
return action.call(exchange, context);
} finally {
ServletRequestContext src = exchange
.getAttachment(ServletRequestContext.ATTACHMENT_KEY);
HttpServletRequestImpl req = src.getOriginalRequest();
if (vertxFirst) {
currentVertxRequest.initialInvocationComplete(req.isAsyncStarted());
}
if (req.isAsyncStarted()) {
exchange.putAttachment(REQUEST_CONTEXT, requestContext.getState());
requestContext.deactivate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.vertx.core.deployment.EventLoopCountBuildItem;
import io.quarkus.vertx.core.deployment.InternalWebVertxBuildItem;
import io.quarkus.vertx.http.runtime.CurrentVertxRequest;
import io.quarkus.vertx.http.runtime.HttpBuildTimeConfig;
import io.quarkus.vertx.http.runtime.HttpConfiguration;
import io.quarkus.vertx.http.runtime.RouterProducer;
Expand All @@ -51,7 +52,11 @@ FilterBuildItem cors(CORSRecorder recorder, HttpConfiguration configuration) {

@BuildStep
AdditionalBeanBuildItem additionalBeans() {
return AdditionalBeanBuildItem.unremovableOf(RouterProducer.class);
return AdditionalBeanBuildItem.builder()
.setUnremovable()
.addBeanClass(RouterProducer.class)
.addBeanClass(CurrentVertxRequest.class)
.build();
}

/**
Expand Down
Loading

0 comments on commit f1fd426

Please sign in to comment.