diff --git a/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java b/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java index 1560a717b0bac..a54586a1cfeb4 100644 --- a/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java +++ b/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java @@ -8,6 +8,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import javax.ws.rs.ServiceUnavailableException; import javax.ws.rs.container.AsyncResponse; @@ -262,11 +263,7 @@ public boolean resume(Object entity) { return false; done = true; requestContext.activate(requestContextState); - try { - return internalResume(entity, t -> vertxFlush()); - } finally { - requestContext.terminate(); - } + return internalResume(entity, new FlushTask()); } } @@ -279,11 +276,7 @@ public boolean resume(Throwable ex) { return false; done = true; requestContext.activate(requestContextState); - try { - return internalResume(ex, t -> vertxFlush()); - } finally { - requestContext.terminate(); - } + return internalResume(ex, new FlushTask()); } } @@ -299,11 +292,7 @@ public boolean cancel() { done = true; cancelled = true; requestContext.activate(requestContextState); - try { - return internalResume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build(), t -> vertxFlush()); - } finally { - requestContext.terminate(); - } + return internalResume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build(), new FlushTask()); } } @@ -317,14 +306,10 @@ public boolean cancel(int retryAfter) { done = true; cancelled = true; requestContext.activate(requestContextState); - try { - return internalResume( - Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter) - .build(), - t -> vertxFlush()); - } finally { - requestContext.terminate(); - } + return internalResume( + Response.status(Response.Status.SERVICE_UNAVAILABLE).header(HttpHeaders.RETRY_AFTER, retryAfter) + .build(), + new FlushTask()); } } @@ -393,6 +378,17 @@ protected void handleTimeout() { return; resume(new ServiceUnavailableException()); } + + private class FlushTask implements Consumer { + @Override + public void accept(Throwable t) { + try { + requestContext.terminate(); + } finally { + VertxHttpAsyncResponse.this.vertxFlush(); + } + } + } } } } diff --git a/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxRequestHandler.java b/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxRequestHandler.java index d57c8a4563e8d..d676b8ab814c2 100644 --- a/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxRequestHandler.java +++ b/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxRequestHandler.java @@ -117,20 +117,17 @@ private void dispatch(RoutingContext routingContext, InputStream is, VertxOutput boolean suspended = vertxRequest.getAsyncContext().isSuspended(); boolean requestContextActive = requestContext.isActive(); - if (requestContextActive) { - //it is possible that there was an async response, that then finished in the same thread - //the async response will have terminated the request context in this case - currentVertxRequest.initialInvocationComplete(suspended); - } if (!suspended) { try { - vertxResponse.finish(); - } catch (IOException e) { - log.error("Unexpected failure", e); - } finally { if (requestContextActive) { requestContext.terminate(); } + } finally { + try { + vertxResponse.finish(); + } catch (IOException e) { + log.debug("IOException writing JAX-RS response", e); + } } } else { //we need the request context to stick around diff --git a/extensions/smallrye-opentracing/deployment/src/test/java/io/quarkus/smallrye/opentracing/deployment/TracingTest.java b/extensions/smallrye-opentracing/deployment/src/test/java/io/quarkus/smallrye/opentracing/deployment/TracingTest.java index 1fb5563107fb7..9ab1d26d7b9ac 100644 --- a/extensions/smallrye-opentracing/deployment/src/test/java/io/quarkus/smallrye/opentracing/deployment/TracingTest.java +++ b/extensions/smallrye-opentracing/deployment/src/test/java/io/quarkus/smallrye/opentracing/deployment/TracingTest.java @@ -1,9 +1,7 @@ package io.quarkus.smallrye.opentracing.deployment; import java.util.List; -import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.awaitility.Awaitility; import org.jboss.shrinkwrap.api.ShrinkWrap; @@ -48,15 +46,13 @@ public static void afterAll() { } @Test - public void testSingleServerRequest() throws InterruptedException { + public void testSingleServerRequest() { try { RestAssured.defaultParser = Parser.TEXT; RestAssured.when().get("/hello") .then() .statusCode(200); - //inherently racy, tracer is completed after response is sent back to the client - Awaitility.await().atMost(5, TimeUnit.SECONDS) - .until(() -> mockTracer.finishedSpans().size() == 1); + Assertions.assertEquals(1, mockTracer.finishedSpans().size()); Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello", mockTracer.finishedSpans().get(0).operationName()); } finally { @@ -65,15 +61,12 @@ public void testSingleServerRequest() throws InterruptedException { } @Test - public void testCDI() throws InterruptedException { + public void testCDI() { try { RestAssured.defaultParser = Parser.TEXT; RestAssured.when().get("/cdi") .then() .statusCode(200); - //inherently racy, tracer is completed after response is sent back to the client - Awaitility.await().atMost(5, TimeUnit.SECONDS) - .until(() -> mockTracer.finishedSpans().size() == 2); Assertions.assertEquals(2, mockTracer.finishedSpans().size()); Assertions.assertEquals("io.quarkus.smallrye.opentracing.deployment.Service.foo", mockTracer.finishedSpans().get(0).operationName()); @@ -85,24 +78,18 @@ public void testCDI() throws InterruptedException { } @Test - public void testMPRestClient() throws InterruptedException { + public void testMPRestClient() { try { RestAssured.defaultParser = Parser.TEXT; RestAssured.when().get("/restClient") .then() .statusCode(200); - //inherently racy, tracer is completed after response is sent back to the client - Awaitility.await().atMost(5, TimeUnit.SECONDS) - .until(() -> mockTracer.finishedSpans().size() == 3); Assertions.assertEquals(3, mockTracer.finishedSpans().size()); - //these can come in any order, as the 'hello' span is finished after the request is sent back. - //this means the client might have already dealt with the response before the hello span is finished - //in practice this means that the spans might be in any order, as they are ordered by the time - //they are completed rather than the time they are started - Set results = mockTracer.finishedSpans().stream().map(MockSpan::operationName).collect(Collectors.toSet()); - Assertions.assertTrue(results.contains("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello")); - Assertions.assertTrue(results.contains("GET")); - Assertions.assertTrue(results.contains("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.restClient")); + Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.hello", + mockTracer.finishedSpans().get(0).operationName()); + Assertions.assertEquals("GET", mockTracer.finishedSpans().get(1).operationName()); + Assertions.assertEquals("GET:io.quarkus.smallrye.opentracing.deployment.TestResource.restClient", + mockTracer.finishedSpans().get(2).operationName()); } finally { RestAssured.reset(); } diff --git a/extensions/smallrye-opentracing/runtime/src/main/java/io/quarkus/smallrye/opentracing/runtime/QuarkusSmallRyeTracingStandaloneVertxDynamicFeature.java b/extensions/smallrye-opentracing/runtime/src/main/java/io/quarkus/smallrye/opentracing/runtime/QuarkusSmallRyeTracingStandaloneVertxDynamicFeature.java index 2ce0f0b01aa9d..36c586228f560 100644 --- a/extensions/smallrye-opentracing/runtime/src/main/java/io/quarkus/smallrye/opentracing/runtime/QuarkusSmallRyeTracingStandaloneVertxDynamicFeature.java +++ b/extensions/smallrye-opentracing/runtime/src/main/java/io/quarkus/smallrye/opentracing/runtime/QuarkusSmallRyeTracingStandaloneVertxDynamicFeature.java @@ -16,6 +16,7 @@ import io.opentracing.contrib.jaxrs2.internal.SpanWrapper; import io.opentracing.tag.Tags; import io.quarkus.vertx.http.runtime.CurrentVertxRequest; +import io.vertx.core.Handler; import io.vertx.ext.web.RoutingContext; @Provider @@ -39,31 +40,21 @@ CurrentVertxRequest request() { @Override public void filter(ContainerRequestContext requestContext) throws IOException { - request().addRequestDoneListener(new CurrentVertxRequest.Listener() { + RoutingContext routingContext = request().getCurrent(); + routingContext.addHeadersEndHandler(new Handler() { @Override - public void initialInvocationComplete(RoutingContext routingContext, boolean goingAsync) { + public void handle(Void event) { SpanWrapper wrapper = routingContext.get(SpanWrapper.PROPERTY_NAME); if (wrapper != null) { wrapper.getScope().close(); + Tags.HTTP_STATUS.set(wrapper.get(), routingContext.response().getStatusCode()); + if (routingContext.failure() != null) { + addExceptionLogs(wrapper.get(), routingContext.failure()); + } + wrapper.finish(); } } - - @Override - public void responseComplete(RoutingContext routingContext) { - SpanWrapper wrapper = routingContext.get(SpanWrapper.PROPERTY_NAME); - if (wrapper == null) { - return; - } - - Tags.HTTP_STATUS.set(wrapper.get(), routingContext.response().getStatusCode()); - if (routingContext.failure() != null) { - addExceptionLogs(wrapper.get(), routingContext.failure()); - } - wrapper.finish(); - - } }); - } private static void addExceptionLogs(Span span, Throwable throwable) { diff --git a/extensions/undertow/runtime/src/main/java/io/quarkus/undertow/runtime/UndertowDeploymentRecorder.java b/extensions/undertow/runtime/src/main/java/io/quarkus/undertow/runtime/UndertowDeploymentRecorder.java index 1537a5b69ea75..f9e7394440a1b 100644 --- a/extensions/undertow/runtime/src/main/java/io/quarkus/undertow/runtime/UndertowDeploymentRecorder.java +++ b/extensions/undertow/runtime/src/main/java/io/quarkus/undertow/runtime/UndertowDeploymentRecorder.java @@ -460,7 +460,6 @@ public T call(HttpServerExchange exchange, C context) throws Exception { if (exchange == null) { return action.call(exchange, context); } - boolean vertxFirst = false; ManagedContext requestContext = beanContainer.requestContext(); if (requestContext.isActive()) { return action.call(exchange, context); @@ -469,7 +468,6 @@ public T call(HttpServerExchange exchange, C context) throws Exception { .getAttachment(REQUEST_CONTEXT); try { requestContext.activate(existingRequestContext); - vertxFirst = existingRequestContext == null; VertxHttpExchange delegate = (VertxHttpExchange) exchange.getDelegate(); RoutingContext rc = (RoutingContext) delegate.getContext(); @@ -479,9 +477,6 @@ public T call(HttpServerExchange exchange, C context) throws Exception { ServletRequestContext src = exchange .getAttachment(ServletRequestContext.ATTACHMENT_KEY); HttpServletRequestImpl req = src.getOriginalRequest(); - if (vertxFirst) { - currentVertxRequest.initialInvocationComplete(req.isAsyncStarted()); - } if (req.isAsyncStarted()) { exchange.putAttachment(REQUEST_CONTEXT, requestContext.getState()); requestContext.deactivate(); diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/CurrentVertxRequest.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/CurrentVertxRequest.java index e7dd3617c3220..2588f1a13485a 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/CurrentVertxRequest.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/CurrentVertxRequest.java @@ -1,23 +1,14 @@ package io.quarkus.vertx.http.runtime; -import java.util.ArrayList; -import java.util.List; - -import javax.annotation.PreDestroy; import javax.enterprise.context.RequestScoped; import javax.enterprise.inject.Produces; -import org.jboss.logging.Logger; - import io.vertx.ext.web.RoutingContext; @RequestScoped public class CurrentVertxRequest { - private final Logger log = Logger.getLogger(CurrentVertxRequest.class); - public RoutingContext current; - private List doneListeners; @Produces @RequestScoped @@ -30,51 +21,4 @@ public CurrentVertxRequest setCurrent(RoutingContext current) { return this; } - public void addRequestDoneListener(Listener doneListener) { - if (doneListeners == null) { - doneListeners = new ArrayList<>(); - } - doneListeners.add(doneListener); - } - - public void initialInvocationComplete(boolean goingAsync) { - - if (current == null) { - return; - } - if (doneListeners != null) { - for (Listener i : doneListeners) { - try { - i.initialInvocationComplete(current, goingAsync); - } catch (Throwable t) { - log.errorf(t, "Failed to process invocation listener %s", i); - } - } - } - } - - @PreDestroy - void done() { - if (current == null) { - return; - } - if (doneListeners != null) { - for (Listener i : doneListeners) { - try { - i.responseComplete(current); - } catch (Throwable t) { - log.errorf(t, "Failed to process done listener %s", i); - } - } - } - } - - public interface Listener { - - void initialInvocationComplete(RoutingContext routingContext, boolean goingAsync); - - void responseComplete(RoutingContext routingContext); - - } - }