From 0f32c84c61d0edd54223c69c7c5cdd8d37c0b313 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 8 Sep 2022 03:59:14 +0300 Subject: [PATCH] Run Reactive Messaging wiring without request context. ActivateRequestContextInterceptor : Destroy request scope property when reactive method returns on different thread --- .../SmallRyeReactiveMessagingLifecycle.java | 15 ++ .../ActivateRequestContextInterceptor.java | 29 ++- ...ActivateRequestContextInterceptorTest.java | 238 ++++++++++++++---- 3 files changed, 220 insertions(+), 62 deletions(-) diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java index b42cd10fda0bb..8ad0e75d8dc19 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java @@ -10,6 +10,9 @@ import javax.inject.Inject; import javax.interceptor.Interceptor; +import io.quarkus.arc.Arc; +import io.quarkus.arc.InjectableContext; +import io.quarkus.arc.ManagedContext; import io.quarkus.runtime.StartupEvent; import io.smallrye.reactive.messaging.EmitterConfiguration; import io.smallrye.reactive.messaging.providers.extension.ChannelConfiguration; @@ -37,6 +40,14 @@ void onStaticInit(@Observes @Initialized(ApplicationScoped.class) Object event, } void onApplicationStart(@Observes @Priority(Interceptor.Priority.LIBRARY_BEFORE) StartupEvent event) { + // We do not want a request scope during the wiring, or it will be propagated and never terminated. + ManagedContext requestContext = Arc.container().requestContext(); + boolean isRequestScopeActive = requestContext.isActive(); + InjectableContext.ContextState state = null; + if (isRequestScopeActive) { + state = requestContext.getState(); + requestContext.deactivate(); + } try { mediatorManager.start(); } catch (Exception e) { @@ -44,6 +55,10 @@ void onApplicationStart(@Observes @Priority(Interceptor.Priority.LIBRARY_BEFORE) throw e; } throw new DeploymentException(e); + } finally { + if (state != null) { + requestContext.activate(state); + } } } diff --git a/independent-projects/arc/runtime/src/main/java/io/quarkus/arc/impl/ActivateRequestContextInterceptor.java b/independent-projects/arc/runtime/src/main/java/io/quarkus/arc/impl/ActivateRequestContextInterceptor.java index 284de44f3542c..44bb09bbe0095 100644 --- a/independent-projects/arc/runtime/src/main/java/io/quarkus/arc/impl/ActivateRequestContextInterceptor.java +++ b/independent-projects/arc/runtime/src/main/java/io/quarkus/arc/impl/ActivateRequestContextInterceptor.java @@ -1,6 +1,7 @@ package io.quarkus.arc.impl; import io.quarkus.arc.Arc; +import io.quarkus.arc.InjectableContext; import io.quarkus.arc.ManagedContext; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -38,14 +39,16 @@ private CompletionStage invokeStage(InvocationContext ctx) { } return activate(requestContext) - .thenCompose(v -> proceedWithStage(ctx)) - .whenComplete((r, t) -> requestContext.terminate()); + .thenCompose(state -> proceedWithStage(ctx).whenComplete((r, t) -> { + requestContext.destroy(state); + requestContext.deactivate(); + })); } - private static CompletionStage activate(ManagedContext requestContext) { + private static CompletionStage activate(ManagedContext requestContext) { try { requestContext.activate(); - return CompletableFuture.completedStage(requestContext); + return CompletableFuture.completedStage(requestContext.getState()); } catch (Throwable t) { return CompletableFuture.failedStage(t); } @@ -68,8 +71,13 @@ private Multi invokeMulti(InvocationContext ctx) { return Multi.createFrom().deferred(() -> { requestContext.activate(); - return proceedWithMulti(ctx); - }).onTermination().invoke(requestContext::terminate); + InjectableContext.ContextState state = requestContext.getState(); + return proceedWithMulti(ctx) + .onTermination().invoke(() -> { + requestContext.destroy(state); + requestContext.deactivate(); + }); + }); }); } @@ -90,8 +98,13 @@ private Uni invokeUni(InvocationContext ctx) { return Uni.createFrom().deferred(() -> { requestContext.activate(); - return proceedWithUni(ctx); - }).eventually(requestContext::terminate); + InjectableContext.ContextState state = requestContext.getState(); + return proceedWithUni(ctx) + .eventually(() -> { + requestContext.destroy(state); + requestContext.deactivate(); + }); + }); }); } diff --git a/independent-projects/arc/tests/src/test/java/io/quarkus/arc/test/contexts/request/propagation/ActivateRequestContextInterceptorTest.java b/independent-projects/arc/tests/src/test/java/io/quarkus/arc/test/contexts/request/propagation/ActivateRequestContextInterceptorTest.java index bfb2e696b905a..29766a92dbf72 100644 --- a/independent-projects/arc/tests/src/test/java/io/quarkus/arc/test/contexts/request/propagation/ActivateRequestContextInterceptorTest.java +++ b/independent-projects/arc/tests/src/test/java/io/quarkus/arc/test/contexts/request/propagation/ActivateRequestContextInterceptorTest.java @@ -13,6 +13,9 @@ import io.smallrye.mutiny.Uni; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.context.RequestScoped; import javax.enterprise.context.control.ActivateRequestContext; @@ -23,6 +26,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -32,74 +36,143 @@ public class ActivateRequestContextInterceptorTest { @RegisterExtension - public ArcTestContainer container = new ArcTestContainer(FakeSessionProducer.class, SessionClient.class); + public ArcTestContainer container = new ArcTestContainer(FakeSessionProducer.class, SessionClient.class, + ExecutorProducer.class, SessionClientCompletingOnDifferentThread.class); - InstanceHandle clientHandler; + @Nested + class CompletingActivateRequestContext { - @BeforeEach - public void reset() { - FakeSession.state = INIT; - clientHandler = Arc.container().instance(SessionClient.class); - } + InstanceHandle clientHandler; - @AfterEach - public void terminate() { - clientHandler.close(); - clientHandler.destroy(); - } + @BeforeEach + void reset() { + FakeSession.state = INIT; + clientHandler = Arc.container().instance(SessionClient.class); + } - @Test - public void testUni() throws Exception { - Assertions.assertEquals(INIT, FakeSession.getState()); - FakeSession.State result = clientHandler.get().openUniSession().await().indefinitely(); + @AfterEach + void terminate() { + clientHandler.close(); + clientHandler.destroy(); + } - // Closed in the dispose method - Assertions.assertEquals(CLOSED, FakeSession.getState()); - Assertions.assertEquals(OPENED, result); - } + @Test + public void testUni() throws Exception { + Assertions.assertEquals(INIT, FakeSession.getState()); + FakeSession.State result = clientHandler.get().openUniSession().await().indefinitely(); - @Test - public void testMulti() throws Exception { - Assertions.assertEquals(INIT, FakeSession.getState()); - FakeSession.State result = clientHandler.get().openMultiSession() - .toUni() - .await().indefinitely(); + // Closed in the dispose method + Assertions.assertEquals(CLOSED, FakeSession.getState()); + Assertions.assertEquals(OPENED, result); + } - // Closed in the dispose method - Assertions.assertEquals(CLOSED, FakeSession.getState()); - Assertions.assertEquals(OPENED, result); - } + @Test + public void testMulti() throws Exception { + Assertions.assertEquals(INIT, FakeSession.getState()); + FakeSession.State result = clientHandler.get().openMultiSession() + .toUni() + .await().indefinitely(); - @Test - public void testFuture() throws Exception { - Assertions.assertEquals(INIT, FakeSession.getState()); - FakeSession.State result = clientHandler.get() - .openFutureSession().toCompletableFuture().join(); + // Closed in the dispose method + Assertions.assertEquals(CLOSED, FakeSession.getState()); + Assertions.assertEquals(OPENED, result); + } - // Closed in the dispose method - Assertions.assertEquals(CLOSED, FakeSession.getState()); - Assertions.assertEquals(OPENED, result); - } + @Test + public void testFuture() throws Exception { + Assertions.assertEquals(INIT, FakeSession.getState()); + FakeSession.State result = clientHandler.get() + .openFutureSession().toCompletableFuture().join(); - @Test - public void testStage() throws Exception { - Assertions.assertEquals(INIT, FakeSession.getState()); - FakeSession.State result = clientHandler.get().openStageSession() - .toCompletableFuture().join(); + // Closed in the dispose method + Assertions.assertEquals(CLOSED, FakeSession.getState()); + Assertions.assertEquals(OPENED, result); + } + + @Test + public void testStage() throws Exception { + Assertions.assertEquals(INIT, FakeSession.getState()); + FakeSession.State result = clientHandler.get().openStageSession() + .toCompletableFuture().join(); + + // Closed in the dispose method + Assertions.assertEquals(CLOSED, FakeSession.getState()); + Assertions.assertEquals(OPENED, result); + } + + @Test + public void testNonReactive() throws Exception { + Assertions.assertEquals(INIT, FakeSession.getState()); + FakeSession.State result = clientHandler.get().openSession(); + + // Closed in the dispose method + Assertions.assertEquals(CLOSED, FakeSession.getState()); + Assertions.assertEquals(OPENED, result); + } - // Closed in the dispose method - Assertions.assertEquals(CLOSED, FakeSession.getState()); - Assertions.assertEquals(OPENED, result); } - @Test - public void testNonReactive() throws Exception { - Assertions.assertEquals(INIT, FakeSession.getState()); - FakeSession.State result = clientHandler.get().openSession(); + @Nested + class AsyncCompletingActivateRequestContext { + + InstanceHandle asyncClientHandler; + + @BeforeEach + void reset() { + FakeSession.state = INIT; + asyncClientHandler = Arc.container().instance(SessionClientCompletingOnDifferentThread.class); + } + + @AfterEach + void terminate() { + asyncClientHandler.close(); + asyncClientHandler.destroy(); + } + + @Test + public void testUni() throws Exception { + Assertions.assertEquals(INIT, FakeSession.getState()); + FakeSession.State result = asyncClientHandler.get().openUniSession().await().indefinitely(); + + // Closed in the dispose method + Assertions.assertEquals(CLOSED, FakeSession.getState()); + Assertions.assertEquals(OPENED, result); + } + + @Test + public void testMulti() throws Exception { + Assertions.assertEquals(INIT, FakeSession.getState()); + FakeSession.State result = asyncClientHandler.get().openMultiSession() + .toUni() + .await().indefinitely(); + + // Closed in the dispose method + Assertions.assertEquals(CLOSED, FakeSession.getState()); + Assertions.assertEquals(OPENED, result); + } + + @Test + public void testFuture() throws Exception { + Assertions.assertEquals(INIT, FakeSession.getState()); + FakeSession.State result = asyncClientHandler.get() + .openFutureSession().toCompletableFuture().join(); + + // Closed in the dispose method + Assertions.assertEquals(CLOSED, FakeSession.getState()); + Assertions.assertEquals(OPENED, result); + } + + @Test + public void testStage() throws Exception { + Assertions.assertEquals(INIT, FakeSession.getState()); + FakeSession.State result = asyncClientHandler.get().openStageSession() + .toCompletableFuture().join(); + + // Closed in the dispose method + Assertions.assertEquals(CLOSED, FakeSession.getState()); + Assertions.assertEquals(OPENED, result); + } - // Closed in the dispose method - Assertions.assertEquals(CLOSED, FakeSession.getState()); - Assertions.assertEquals(OPENED, result); } @ApplicationScoped @@ -139,6 +212,48 @@ public State openSession() { } } + @ApplicationScoped + static class SessionClientCompletingOnDifferentThread { + @Inject + FakeSession session; + + @Inject + Executor executor; + + @ActivateRequestContext + public Uni openUniSession() { + return Uni.createFrom() + .item(() -> session) + .map(FakeSession::open) + .emitOn(executor); + } + + @ActivateRequestContext + public Multi openMultiSession() { + return Multi.createFrom() + .item(() -> session) + .map(FakeSession::open) + .emitOn(executor); + } + + @ActivateRequestContext + public CompletionStage openStageSession() { + return CompletableFuture.completedStage(session.open()) + .thenApplyAsync(s -> s, executor); + } + + @ActivateRequestContext + public CompletableFuture openFutureSession() { + return CompletableFuture.completedFuture(session.open()) + .thenApplyAsync(s -> s, executor); + } + + @ActivateRequestContext + public State openSession() { + return session.open(); + } + } + static class FakeSession { enum State { INIT, @@ -177,4 +292,19 @@ void disposeSession(@Disposes FakeSession session) { } } + @Singleton + static class ExecutorProducer { + + @Produces + @ApplicationScoped + ExecutorService produceExecutor() { + return Executors.newSingleThreadExecutor(); + } + + void disposeSession(@Disposes ExecutorService executor) { + executor.shutdown(); + } + + } + }