Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Terminate request scope before sending JAX-RS response #5573

Merged
merged 1 commit into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import javax.ws.rs.ServiceUnavailableException;
import javax.ws.rs.container.AsyncResponse;
Expand Down Expand Up @@ -262,11 +263,7 @@ public boolean resume(Object entity) {
return false;
done = true;
requestContext.activate(requestContextState);
try {
return internalResume(entity, t -> vertxFlush());
} finally {
requestContext.terminate();
}
return internalResume(entity, new FlushTask());
}
}

Expand All @@ -279,11 +276,7 @@ public boolean resume(Throwable ex) {
return false;
done = true;
requestContext.activate(requestContextState);
try {
return internalResume(ex, t -> vertxFlush());
} finally {
requestContext.terminate();
}
return internalResume(ex, new FlushTask());
}
}

Expand All @@ -299,11 +292,7 @@ public boolean cancel() {
done = true;
cancelled = true;
requestContext.activate(requestContextState);
try {
return internalResume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build(), t -> vertxFlush());
} finally {
requestContext.terminate();
}
return internalResume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build(), new FlushTask());
}
}

Expand All @@ -317,14 +306,10 @@ public boolean cancel(int retryAfter) {
done = true;
cancelled = true;
requestContext.activate(requestContextState);
try {
return internalResume(
Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter)
.build(),
t -> vertxFlush());
} finally {
requestContext.terminate();
}
return internalResume(
Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter)
.build(),
new FlushTask());
}
}

Expand Down Expand Up @@ -393,6 +378,17 @@ protected void handleTimeout() {
return;
resume(new ServiceUnavailableException());
}

private class FlushTask implements Consumer<Throwable> {
@Override
public void accept(Throwable t) {
try {
requestContext.terminate();
} finally {
VertxHttpAsyncResponse.this.vertxFlush();
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,17 @@ private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput

boolean suspended = vertxRequest.getAsyncContext().isSuspended();
boolean requestContextActive = requestContext.isActive();
if (requestContextActive) {
//it is possible that there was an async response, that then finished in the same thread
//the async response will have terminated the request context in this case
currentVertxRequest.initialInvocationComplete(suspended);
}
if (!suspended) {
try {
vertxResponse.finish();
} catch (IOException e) {
log.error("Unexpected failure", e);
} finally {
if (requestContextActive) {
requestContext.terminate();
}
} finally {
try {
vertxResponse.finish();
} catch (IOException e) {
log.debug("IOException writing JAX-RS response", e);
}
}
} else {
//we need the request context to stick around
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package io.quarkus.smallrye.opentracing.deployment;

import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.awaitility.Awaitility;
import org.jboss.shrinkwrap.api.ShrinkWrap;
Expand Down Expand Up @@ -48,15 +46,13 @@ public static void afterAll() {
}

@Test
public void testSingleServerRequest() throws InterruptedException {
public void testSingleServerRequest() {
try {
RestAssured.defaultParser = Parser.TEXT;
RestAssured.when().get("/hello")
.then()
.statusCode(200);
//inherently racy, tracer is completed after response is sent back to the client
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> mockTracer.finishedSpans().size() == 1);
Assertions.assertEquals(1, mockTracer.finishedSpans().size());
Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello",
mockTracer.finishedSpans().get(0).operationName());
} finally {
Expand All @@ -65,15 +61,12 @@ public void testSingleServerRequest() throws InterruptedException {
}

@Test
public void testCDI() throws InterruptedException {
public void testCDI() {
try {
RestAssured.defaultParser = Parser.TEXT;
RestAssured.when().get("/cdi")
.then()
.statusCode(200);
//inherently racy, tracer is completed after response is sent back to the client
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> mockTracer.finishedSpans().size() == 2);
Assertions.assertEquals(2, mockTracer.finishedSpans().size());
Assertions.assertEquals("io.quarkus.smallrye.opentracing.deployment.Service.foo",
mockTracer.finishedSpans().get(0).operationName());
Expand All @@ -85,24 +78,18 @@ public void testCDI() throws InterruptedException {
}

@Test
public void testMPRestClient() throws InterruptedException {
public void testMPRestClient() {
try {
RestAssured.defaultParser = Parser.TEXT;
RestAssured.when().get("/restClient")
.then()
.statusCode(200);
//inherently racy, tracer is completed after response is sent back to the client
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> mockTracer.finishedSpans().size() == 3);
Assertions.assertEquals(3, mockTracer.finishedSpans().size());
//these can come in any order, as the 'hello' span is finished after the request is sent back.
//this means the client might have already dealt with the response before the hello span is finished
//in practice this means that the spans might be in any order, as they are ordered by the time
//they are completed rather than the time they are started
Set<String> results = mockTracer.finishedSpans().stream().map(MockSpan::operationName).collect(Collectors.toSet());
Assertions.assertTrue(results.contains("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello"));
Assertions.assertTrue(results.contains("GET"));
Assertions.assertTrue(results.contains("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.restClient"));
Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello",
mockTracer.finishedSpans().get(0).operationName());
Assertions.assertEquals("GET", mockTracer.finishedSpans().get(1).operationName());
Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.restClient",
mockTracer.finishedSpans().get(2).operationName());
} finally {
RestAssured.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.opentracing.contrib.jaxrs2.internal.SpanWrapper;
import io.opentracing.tag.Tags;
import io.quarkus.vertx.http.runtime.CurrentVertxRequest;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;

@Provider
Expand All @@ -39,31 +40,21 @@ CurrentVertxRequest request() {

@Override
public void filter(ContainerRequestContext requestContext) throws IOException {
request().addRequestDoneListener(new CurrentVertxRequest.Listener() {
RoutingContext routingContext = request().getCurrent();
routingContext.addHeadersEndHandler(new Handler<Void>() {
@Override
public void initialInvocationComplete(RoutingContext routingContext, boolean goingAsync) {
public void handle(Void event) {
SpanWrapper wrapper = routingContext.get(SpanWrapper.PROPERTY_NAME);
if (wrapper != null) {
wrapper.getScope().close();
Tags.HTTP_STATUS.set(wrapper.get(), routingContext.response().getStatusCode());
if (routingContext.failure() != null) {
addExceptionLogs(wrapper.get(), routingContext.failure());
}
wrapper.finish();
}
}

@Override
public void responseComplete(RoutingContext routingContext) {
SpanWrapper wrapper = routingContext.get(SpanWrapper.PROPERTY_NAME);
if (wrapper == null) {
return;
}

Tags.HTTP_STATUS.set(wrapper.get(), routingContext.response().getStatusCode());
if (routingContext.failure() != null) {
addExceptionLogs(wrapper.get(), routingContext.failure());
}
wrapper.finish();

}
});

}

private static void addExceptionLogs(Span span, Throwable throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,6 @@ public T call(HttpServerExchange exchange, C context) throws Exception {
if (exchange == null) {
return action.call(exchange, context);
}
boolean vertxFirst = false;
ManagedContext requestContext = beanContainer.requestContext();
if (requestContext.isActive()) {
return action.call(exchange, context);
Expand All @@ -469,7 +468,6 @@ public T call(HttpServerExchange exchange, C context) throws Exception {
.getAttachment(REQUEST_CONTEXT);
try {
requestContext.activate(existingRequestContext);
vertxFirst = existingRequestContext == null;

VertxHttpExchange delegate = (VertxHttpExchange) exchange.getDelegate();
RoutingContext rc = (RoutingContext) delegate.getContext();
Expand All @@ -479,9 +477,6 @@ public T call(HttpServerExchange exchange, C context) throws Exception {
ServletRequestContext src = exchange
.getAttachment(ServletRequestContext.ATTACHMENT_KEY);
HttpServletRequestImpl req = src.getOriginalRequest();
if (vertxFirst) {
currentVertxRequest.initialInvocationComplete(req.isAsyncStarted());
}
if (req.isAsyncStarted()) {
exchange.putAttachment(REQUEST_CONTEXT, requestContext.getState());
requestContext.deactivate();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
package io.quarkus.vertx.http.runtime;

import java.util.ArrayList;
import java.util.List;

import javax.annotation.PreDestroy;
import javax.enterprise.context.RequestScoped;
import javax.enterprise.inject.Produces;

import org.jboss.logging.Logger;

import io.vertx.ext.web.RoutingContext;

@RequestScoped
public class CurrentVertxRequest {

private final Logger log = Logger.getLogger(CurrentVertxRequest.class);

public RoutingContext current;
private List<Listener> doneListeners;

@Produces
@RequestScoped
Expand All @@ -30,51 +21,4 @@ public CurrentVertxRequest setCurrent(RoutingContext current) {
return this;
}

public void addRequestDoneListener(Listener doneListener) {
if (doneListeners == null) {
doneListeners = new ArrayList<>();
}
doneListeners.add(doneListener);
}

public void initialInvocationComplete(boolean goingAsync) {

if (current == null) {
return;
}
if (doneListeners != null) {
for (Listener i : doneListeners) {
try {
i.initialInvocationComplete(current, goingAsync);
} catch (Throwable t) {
log.errorf(t, "Failed to process invocation listener %s", i);
}
}
}
}

@PreDestroy
void done() {
if (current == null) {
return;
}
if (doneListeners != null) {
for (Listener i : doneListeners) {
try {
i.responseComplete(current);
} catch (Throwable t) {
log.errorf(t, "Failed to process done listener %s", i);
}
}
}
}

public interface Listener {

void initialInvocationComplete(RoutingContext routingContext, boolean goingAsync);

void responseComplete(RoutingContext routingContext);

}

}