Skip to content

Commit

Permalink
Merge pull request #27802 from ozangunalp/fix_context_propagation_rea…
Browse files Browse the repository at this point in the history
…ctive_messaging_request_context

Fix for Context Propagation on Reactive Messaging
  • Loading branch information
Sanne authored Sep 9, 2022
2 parents 382511f + 0f32c84 commit 7ba722a
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import javax.inject.Inject;
import javax.interceptor.Interceptor;

import io.quarkus.arc.Arc;
import io.quarkus.arc.InjectableContext;
import io.quarkus.arc.ManagedContext;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.providers.extension.ChannelConfiguration;
Expand Down Expand Up @@ -37,13 +40,25 @@ void onStaticInit(@Observes @Initialized(ApplicationScoped.class) Object event,
}

void onApplicationStart(@Observes @Priority(Interceptor.Priority.LIBRARY_BEFORE) StartupEvent event) {
// We do not want a request scope during the wiring, or it will be propagated and never terminated.
ManagedContext requestContext = Arc.container().requestContext();
boolean isRequestScopeActive = requestContext.isActive();
InjectableContext.ContextState state = null;
if (isRequestScopeActive) {
state = requestContext.getState();
requestContext.deactivate();
}
try {
mediatorManager.start();
} catch (Exception e) {
if (e instanceof DeploymentException || e instanceof DefinitionException) {
throw e;
}
throw new DeploymentException(e);
} finally {
if (state != null) {
requestContext.activate(state);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.arc.impl;

import io.quarkus.arc.Arc;
import io.quarkus.arc.InjectableContext;
import io.quarkus.arc.ManagedContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -38,14 +39,16 @@ private CompletionStage<?> invokeStage(InvocationContext ctx) {
}

return activate(requestContext)
.thenCompose(v -> proceedWithStage(ctx))
.whenComplete((r, t) -> requestContext.terminate());
.thenCompose(state -> proceedWithStage(ctx).whenComplete((r, t) -> {
requestContext.destroy(state);
requestContext.deactivate();
}));
}

private static CompletionStage<ManagedContext> activate(ManagedContext requestContext) {
private static CompletionStage<InjectableContext.ContextState> activate(ManagedContext requestContext) {
try {
requestContext.activate();
return CompletableFuture.completedStage(requestContext);
return CompletableFuture.completedStage(requestContext.getState());
} catch (Throwable t) {
return CompletableFuture.failedStage(t);
}
Expand All @@ -68,8 +71,13 @@ private Multi<?> invokeMulti(InvocationContext ctx) {

return Multi.createFrom().deferred(() -> {
requestContext.activate();
return proceedWithMulti(ctx);
}).onTermination().invoke(requestContext::terminate);
InjectableContext.ContextState state = requestContext.getState();
return proceedWithMulti(ctx)
.onTermination().invoke(() -> {
requestContext.destroy(state);
requestContext.deactivate();
});
});
});
}

Expand All @@ -90,8 +98,13 @@ private Uni<?> invokeUni(InvocationContext ctx) {

return Uni.createFrom().deferred(() -> {
requestContext.activate();
return proceedWithUni(ctx);
}).eventually(requestContext::terminate);
InjectableContext.ContextState state = requestContext.getState();
return proceedWithUni(ctx)
.eventually(() -> {
requestContext.destroy(state);
requestContext.deactivate();
});
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import io.smallrye.mutiny.Uni;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.RequestScoped;
import javax.enterprise.context.control.ActivateRequestContext;
Expand All @@ -23,6 +26,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand All @@ -32,74 +36,143 @@
public class ActivateRequestContextInterceptorTest {

@RegisterExtension
public ArcTestContainer container = new ArcTestContainer(FakeSessionProducer.class, SessionClient.class);
public ArcTestContainer container = new ArcTestContainer(FakeSessionProducer.class, SessionClient.class,
ExecutorProducer.class, SessionClientCompletingOnDifferentThread.class);

InstanceHandle<SessionClient> clientHandler;
@Nested
class CompletingActivateRequestContext {

@BeforeEach
public void reset() {
FakeSession.state = INIT;
clientHandler = Arc.container().instance(SessionClient.class);
}
InstanceHandle<SessionClient> clientHandler;

@AfterEach
public void terminate() {
clientHandler.close();
clientHandler.destroy();
}
@BeforeEach
void reset() {
FakeSession.state = INIT;
clientHandler = Arc.container().instance(SessionClient.class);
}

@Test
public void testUni() throws Exception {
Assertions.assertEquals(INIT, FakeSession.getState());
FakeSession.State result = clientHandler.get().openUniSession().await().indefinitely();
@AfterEach
void terminate() {
clientHandler.close();
clientHandler.destroy();
}

// Closed in the dispose method
Assertions.assertEquals(CLOSED, FakeSession.getState());
Assertions.assertEquals(OPENED, result);
}
@Test
public void testUni() throws Exception {
Assertions.assertEquals(INIT, FakeSession.getState());
FakeSession.State result = clientHandler.get().openUniSession().await().indefinitely();

@Test
public void testMulti() throws Exception {
Assertions.assertEquals(INIT, FakeSession.getState());
FakeSession.State result = clientHandler.get().openMultiSession()
.toUni()
.await().indefinitely();
// Closed in the dispose method
Assertions.assertEquals(CLOSED, FakeSession.getState());
Assertions.assertEquals(OPENED, result);
}

// Closed in the dispose method
Assertions.assertEquals(CLOSED, FakeSession.getState());
Assertions.assertEquals(OPENED, result);
}
@Test
public void testMulti() throws Exception {
Assertions.assertEquals(INIT, FakeSession.getState());
FakeSession.State result = clientHandler.get().openMultiSession()
.toUni()
.await().indefinitely();

@Test
public void testFuture() throws Exception {
Assertions.assertEquals(INIT, FakeSession.getState());
FakeSession.State result = clientHandler.get()
.openFutureSession().toCompletableFuture().join();
// Closed in the dispose method
Assertions.assertEquals(CLOSED, FakeSession.getState());
Assertions.assertEquals(OPENED, result);
}

// Closed in the dispose method
Assertions.assertEquals(CLOSED, FakeSession.getState());
Assertions.assertEquals(OPENED, result);
}
@Test
public void testFuture() throws Exception {
Assertions.assertEquals(INIT, FakeSession.getState());
FakeSession.State result = clientHandler.get()
.openFutureSession().toCompletableFuture().join();

@Test
public void testStage() throws Exception {
Assertions.assertEquals(INIT, FakeSession.getState());
FakeSession.State result = clientHandler.get().openStageSession()
.toCompletableFuture().join();
// Closed in the dispose method
Assertions.assertEquals(CLOSED, FakeSession.getState());
Assertions.assertEquals(OPENED, result);
}

@Test
public void testStage() throws Exception {
Assertions.assertEquals(INIT, FakeSession.getState());
FakeSession.State result = clientHandler.get().openStageSession()
.toCompletableFuture().join();

// Closed in the dispose method
Assertions.assertEquals(CLOSED, FakeSession.getState());
Assertions.assertEquals(OPENED, result);
}

@Test
public void testNonReactive() throws Exception {
Assertions.assertEquals(INIT, FakeSession.getState());
FakeSession.State result = clientHandler.get().openSession();

// Closed in the dispose method
Assertions.assertEquals(CLOSED, FakeSession.getState());
Assertions.assertEquals(OPENED, result);
}

// Closed in the dispose method
Assertions.assertEquals(CLOSED, FakeSession.getState());
Assertions.assertEquals(OPENED, result);
}

@Test
public void testNonReactive() throws Exception {
Assertions.assertEquals(INIT, FakeSession.getState());
FakeSession.State result = clientHandler.get().openSession();
@Nested
class AsyncCompletingActivateRequestContext {

InstanceHandle<SessionClientCompletingOnDifferentThread> asyncClientHandler;

@BeforeEach
void reset() {
FakeSession.state = INIT;
asyncClientHandler = Arc.container().instance(SessionClientCompletingOnDifferentThread.class);
}

@AfterEach
void terminate() {
asyncClientHandler.close();
asyncClientHandler.destroy();
}

@Test
public void testUni() throws Exception {
Assertions.assertEquals(INIT, FakeSession.getState());
FakeSession.State result = asyncClientHandler.get().openUniSession().await().indefinitely();

// Closed in the dispose method
Assertions.assertEquals(CLOSED, FakeSession.getState());
Assertions.assertEquals(OPENED, result);
}

@Test
public void testMulti() throws Exception {
Assertions.assertEquals(INIT, FakeSession.getState());
FakeSession.State result = asyncClientHandler.get().openMultiSession()
.toUni()
.await().indefinitely();

// Closed in the dispose method
Assertions.assertEquals(CLOSED, FakeSession.getState());
Assertions.assertEquals(OPENED, result);
}

@Test
public void testFuture() throws Exception {
Assertions.assertEquals(INIT, FakeSession.getState());
FakeSession.State result = asyncClientHandler.get()
.openFutureSession().toCompletableFuture().join();

// Closed in the dispose method
Assertions.assertEquals(CLOSED, FakeSession.getState());
Assertions.assertEquals(OPENED, result);
}

@Test
public void testStage() throws Exception {
Assertions.assertEquals(INIT, FakeSession.getState());
FakeSession.State result = asyncClientHandler.get().openStageSession()
.toCompletableFuture().join();

// Closed in the dispose method
Assertions.assertEquals(CLOSED, FakeSession.getState());
Assertions.assertEquals(OPENED, result);
}

// Closed in the dispose method
Assertions.assertEquals(CLOSED, FakeSession.getState());
Assertions.assertEquals(OPENED, result);
}

@ApplicationScoped
Expand Down Expand Up @@ -139,6 +212,48 @@ public State openSession() {
}
}

@ApplicationScoped
static class SessionClientCompletingOnDifferentThread {
@Inject
FakeSession session;

@Inject
Executor executor;

@ActivateRequestContext
public Uni<State> openUniSession() {
return Uni.createFrom()
.item(() -> session)
.map(FakeSession::open)
.emitOn(executor);
}

@ActivateRequestContext
public Multi<State> openMultiSession() {
return Multi.createFrom()
.item(() -> session)
.map(FakeSession::open)
.emitOn(executor);
}

@ActivateRequestContext
public CompletionStage<State> openStageSession() {
return CompletableFuture.completedStage(session.open())
.thenApplyAsync(s -> s, executor);
}

@ActivateRequestContext
public CompletableFuture<State> openFutureSession() {
return CompletableFuture.completedFuture(session.open())
.thenApplyAsync(s -> s, executor);
}

@ActivateRequestContext
public State openSession() {
return session.open();
}
}

static class FakeSession {
enum State {
INIT,
Expand Down Expand Up @@ -177,4 +292,19 @@ void disposeSession(@Disposes FakeSession session) {
}
}

@Singleton
static class ExecutorProducer {

@Produces
@ApplicationScoped
ExecutorService produceExecutor() {
return Executors.newSingleThreadExecutor();
}

void disposeSession(@Disposes ExecutorService executor) {
executor.shutdown();
}

}

}

0 comments on commit 7ba722a

Please sign in to comment.