Skip to content

Commit

Permalink
Terminate request scope before sending JAX-RS response
Browse files Browse the repository at this point in the history
This gives more consistent span ordering for Opentracing
  • Loading branch information
stuartwdouglas committed Nov 19, 2019
1 parent 5dca04d commit 5ab0b96
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import javax.ws.rs.ServiceUnavailableException;
import javax.ws.rs.container.AsyncResponse;
Expand Down Expand Up @@ -262,11 +263,7 @@ public boolean resume(Object entity) {
return false;
done = true;
requestContext.activate(requestContextState);
try {
return internalResume(entity, t -> vertxFlush());
} finally {
requestContext.terminate();
}
return internalResume(entity, new FlushTask());
}
}

Expand All @@ -279,11 +276,7 @@ public boolean resume(Throwable ex) {
return false;
done = true;
requestContext.activate(requestContextState);
try {
return internalResume(ex, t -> vertxFlush());
} finally {
requestContext.terminate();
}
return internalResume(ex, new FlushTask());
}
}

Expand All @@ -299,11 +292,7 @@ public boolean cancel() {
done = true;
cancelled = true;
requestContext.activate(requestContextState);
try {
return internalResume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build(), t -> vertxFlush());
} finally {
requestContext.terminate();
}
return internalResume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build(), new FlushTask());
}
}

Expand All @@ -317,14 +306,10 @@ public boolean cancel(int retryAfter) {
done = true;
cancelled = true;
requestContext.activate(requestContextState);
try {
return internalResume(
Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter)
.build(),
t -> vertxFlush());
} finally {
requestContext.terminate();
}
return internalResume(
Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter)
.build(),
new FlushTask());
}
}

Expand Down Expand Up @@ -393,6 +378,14 @@ protected void handleTimeout() {
return;
resume(new ServiceUnavailableException());
}

private class FlushTask implements Consumer<Throwable> {
@Override
public void accept(Throwable t) {
requestContext.terminate();
VertxHttpAsyncResponse.this.vertxFlush();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,17 @@ private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput

boolean suspended = vertxRequest.getAsyncContext().isSuspended();
boolean requestContextActive = requestContext.isActive();
if (requestContextActive) {
//it is possible that there was an async response, that then finished in the same thread
//the async response will have terminated the request context in this case
currentVertxRequest.initialInvocationComplete(suspended);
}
if (!suspended) {
try {
vertxResponse.finish();
} catch (IOException e) {
log.error("Unexpected failure", e);
} finally {
if (requestContextActive) {
requestContext.terminate();
}
} finally {
try {
vertxResponse.finish();
} catch (IOException e) {
log.debug("IOException writing JAX-RS response", e);
}
}
} else {
//we need the request context to stick around
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package io.quarkus.smallrye.opentracing.deployment;

import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.awaitility.Awaitility;
import org.jboss.shrinkwrap.api.ShrinkWrap;
Expand Down Expand Up @@ -48,15 +46,13 @@ public static void afterAll() {
}

@Test
public void testSingleServerRequest() throws InterruptedException {
public void testSingleServerRequest() {
try {
RestAssured.defaultParser = Parser.TEXT;
RestAssured.when().get("/hello")
.then()
.statusCode(200);
//inherently racy, tracer is completed after response is sent back to the client
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> mockTracer.finishedSpans().size() == 1);
Assertions.assertEquals(1, mockTracer.finishedSpans().size());
Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello",
mockTracer.finishedSpans().get(0).operationName());
} finally {
Expand All @@ -65,15 +61,12 @@ public void testSingleServerRequest() throws InterruptedException {
}

@Test
public void testCDI() throws InterruptedException {
public void testCDI() {
try {
RestAssured.defaultParser = Parser.TEXT;
RestAssured.when().get("/cdi")
.then()
.statusCode(200);
//inherently racy, tracer is completed after response is sent back to the client
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> mockTracer.finishedSpans().size() == 2);
Assertions.assertEquals(2, mockTracer.finishedSpans().size());
Assertions.assertEquals("io.quarkus.smallrye.opentracing.deployment.Service.foo",
mockTracer.finishedSpans().get(0).operationName());
Expand All @@ -85,24 +78,18 @@ public void testCDI() throws InterruptedException {
}

@Test
public void testMPRestClient() throws InterruptedException {
public void testMPRestClient() {
try {
RestAssured.defaultParser = Parser.TEXT;
RestAssured.when().get("/restClient")
.then()
.statusCode(200);
//inherently racy, tracer is completed after response is sent back to the client
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> mockTracer.finishedSpans().size() == 3);
Assertions.assertEquals(3, mockTracer.finishedSpans().size());
//these can come in any order, as the 'hello' span is finished after the request is sent back.
//this means the client might have already dealt with the response before the hello span is finished
//in practice this means that the spans might be in any order, as they are ordered by the time
//they are completed rather than the time they are started
Set<String> results = mockTracer.finishedSpans().stream().map(MockSpan::operationName).collect(Collectors.toSet());
Assertions.assertTrue(results.contains("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello"));
Assertions.assertTrue(results.contains("GET"));
Assertions.assertTrue(results.contains("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.restClient"));
Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello",
mockTracer.finishedSpans().get(0).operationName());
Assertions.assertEquals("GET", mockTracer.finishedSpans().get(1).operationName());
Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.restClient",
mockTracer.finishedSpans().get(2).operationName());
} finally {
RestAssured.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.opentracing.contrib.jaxrs2.internal.SpanWrapper;
import io.opentracing.tag.Tags;
import io.quarkus.vertx.http.runtime.CurrentVertxRequest;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;

@Provider
Expand All @@ -39,31 +40,22 @@ CurrentVertxRequest request() {

@Override
public void filter(ContainerRequestContext requestContext) throws IOException {
request().addRequestDoneListener(new CurrentVertxRequest.Listener() {
RoutingContext routingContext = request().getCurrent();
routingContext.addHeadersEndHandler(new Handler<Void>() {
@Override
public void initialInvocationComplete(RoutingContext routingContext, boolean goingAsync) {
public void handle(Void event) {

SpanWrapper wrapper = routingContext.get(SpanWrapper.PROPERTY_NAME);
if (wrapper != null) {
wrapper.getScope().close();
}
}

@Override
public void responseComplete(RoutingContext routingContext) {
SpanWrapper wrapper = routingContext.get(SpanWrapper.PROPERTY_NAME);
if (wrapper == null) {
return;
}

Tags.HTTP_STATUS.set(wrapper.get(), routingContext.response().getStatusCode());
if (routingContext.failure() != null) {
addExceptionLogs(wrapper.get(), routingContext.failure());
}
wrapper.finish();

}
});

}

private static void addExceptionLogs(Span span, Throwable throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,6 @@ public T call(HttpServerExchange exchange, C context) throws Exception {
if (exchange == null) {
return action.call(exchange, context);
}
boolean vertxFirst = false;
ManagedContext requestContext = beanContainer.requestContext();
if (requestContext.isActive()) {
return action.call(exchange, context);
Expand All @@ -469,7 +468,6 @@ public T call(HttpServerExchange exchange, C context) throws Exception {
.getAttachment(REQUEST_CONTEXT);
try {
requestContext.activate(existingRequestContext);
vertxFirst = existingRequestContext == null;

VertxHttpExchange delegate = (VertxHttpExchange) exchange.getDelegate();
RoutingContext rc = (RoutingContext) delegate.getContext();
Expand All @@ -479,9 +477,6 @@ public T call(HttpServerExchange exchange, C context) throws Exception {
ServletRequestContext src = exchange
.getAttachment(ServletRequestContext.ATTACHMENT_KEY);
HttpServletRequestImpl req = src.getOriginalRequest();
if (vertxFirst) {
currentVertxRequest.initialInvocationComplete(req.isAsyncStarted());
}
if (req.isAsyncStarted()) {
exchange.putAttachment(REQUEST_CONTEXT, requestContext.getState());
requestContext.deactivate();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
package io.quarkus.vertx.http.runtime;

import java.util.ArrayList;
import java.util.List;

import javax.annotation.PreDestroy;
import javax.enterprise.context.RequestScoped;
import javax.enterprise.inject.Produces;

import org.jboss.logging.Logger;

import io.vertx.ext.web.RoutingContext;

@RequestScoped
public class CurrentVertxRequest {

private final Logger log = Logger.getLogger(CurrentVertxRequest.class);

public RoutingContext current;
private List<Listener> doneListeners;

@Produces
@RequestScoped
Expand All @@ -30,51 +21,4 @@ public CurrentVertxRequest setCurrent(RoutingContext current) {
return this;
}

public void addRequestDoneListener(Listener doneListener) {
if (doneListeners == null) {
doneListeners = new ArrayList<>();
}
doneListeners.add(doneListener);
}

public void initialInvocationComplete(boolean goingAsync) {

if (current == null) {
return;
}
if (doneListeners != null) {
for (Listener i : doneListeners) {
try {
i.initialInvocationComplete(current, goingAsync);
} catch (Throwable t) {
log.errorf(t, "Failed to process invocation listener %s", i);
}
}
}
}

@PreDestroy
void done() {
if (current == null) {
return;
}
if (doneListeners != null) {
for (Listener i : doneListeners) {
try {
i.responseComplete(current);
} catch (Throwable t) {
log.errorf(t, "Failed to process done listener %s", i);
}
}
}
}

public interface Listener {

void initialInvocationComplete(RoutingContext routingContext, boolean goingAsync);

void responseComplete(RoutingContext routingContext);

}

}

0 comments on commit 5ab0b96

Please sign in to comment.