Skip to content

Commit

Permalink
Merge pull request #4746 from mkouba/issue-4701
Browse files Browse the repository at this point in the history
Use the correct default executor to fire async CDI events
  • Loading branch information
gsmet authored Oct 22, 2019
2 parents 4dcc2ee + 5b20e55 commit 401d9d8
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.arc.deployment;

import static io.quarkus.deployment.annotations.ExecutionTime.RUNTIME_INIT;
import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT;

import java.util.Collection;
Expand Down Expand Up @@ -46,6 +47,7 @@
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.ApplicationArchivesBuildItem;
import io.quarkus.deployment.builditem.ApplicationClassPredicateBuildItem;
import io.quarkus.deployment.builditem.ExecutorBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.GeneratedClassBuildItem;
import io.quarkus.deployment.builditem.GeneratedResourceBuildItem;
Expand Down Expand Up @@ -320,6 +322,12 @@ public void registerField(FieldInfo fieldInfo) {

}

@BuildStep
@Record(value = RUNTIME_INIT)
void setupExecutor(ExecutorBuildItem executor, ArcRecorder recorder) {
recorder.initExecutor(executor.getExecutorProxy());
}

private abstract static class AbstractCompositeApplicationClassesPredicate<T> implements Predicate<T> {

private final IndexView applicationClassesIndex;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package io.quarkus.arc.test.observer;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.annotation.PostConstruct;
import javax.enterprise.context.Dependent;
import javax.enterprise.context.RequestScoped;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.enterprise.event.ObservesAsync;
import javax.inject.Inject;
import javax.inject.Singleton;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;

public class AsyncObserverTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(StringProducer.class, StringObserver.class,
ThreadNameProvider.class));

@Inject
StringProducer producer;

@Inject
StringObserver observer;

@Test
public void testAsyncObservers() throws InterruptedException, ExecutionException, TimeoutException {
String currentThread = Thread.currentThread().getName();

producer.fire("ping");
List<String> events = observer.getEvents();

assertEquals(1, events.size());
assertTrue(events.get(0).startsWith("sync::ping"));
assertTrue(events.get(0).endsWith(currentThread));

events.clear();

CompletionStage<String> completionStage = producer.fireAsync("pong");
assertEquals("pong", completionStage.toCompletableFuture().get(10, TimeUnit.SECONDS));
assertEquals(1, events.size());
assertTrue(events.get(0).startsWith("async::pong"));
assertFalse(events.get(0).endsWith(currentThread));
}

@Singleton
static class StringObserver {

private List<String> events;

@Inject
ThreadNameProvider threadNameProvider;

@PostConstruct
void init() {
events = new CopyOnWriteArrayList<>();
}

void observeAsync(@ObservesAsync String value) {
events.add("async::" + value + "::" + threadNameProvider.get());
}

void observeSync(@Observes String value) {
events.add("sync::" + value + "::" + Thread.currentThread().getName());
}

List<String> getEvents() {
return events;
}

}

@Dependent
static class StringProducer {

@Inject
Event<String> event;

void fire(String value) {
event.fire(value);
}

CompletionStage<String> fireAsync(String value) {
return event.fireAsync(value);
}

}

@RequestScoped
static class ThreadNameProvider {

String get() {
return Thread.currentThread().getName();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import org.jboss.logging.Logger;
Expand Down Expand Up @@ -40,6 +41,10 @@ public void run() {
return container;
}

public void initExecutor(ExecutorService executor) {
Arc.setExecutor(executor);
}

public void initSupplierBeans(Map<String, Supplier<Object>> beans) {
supplierMap = beans;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.arc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -10,18 +11,22 @@
public final class Arc {

private static final AtomicReference<ArcContainerImpl> INSTANCE = new AtomicReference<>();

private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false);

public static ArcContainer initialize() {
if (INITIALIZED.compareAndSet(false, true)) {
ArcContainerImpl container = new ArcContainerImpl();
INSTANCE.set(container);
container.init();
return container;
}
return container();
}

public static void setExecutor(ExecutorService executor) {
INSTANCE.get().setExecutor(executor);
}

/**
*
* @return the container instance
Expand All @@ -33,8 +38,9 @@ public static ArcContainer container() {
public static void shutdown() {
if (INSTANCE.get() != null) {
synchronized (INSTANCE) {
if (INSTANCE.get() != null) {
INSTANCE.get().shutdown();
ArcContainerImpl container = INSTANCE.get();
if (container != null) {
container.shutdown();
INSTANCE.set(null);
INITIALIZED.set(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import javax.enterprise.context.ContextNotActiveException;
import javax.enterprise.inject.spi.BeanManager;
Expand Down Expand Up @@ -117,4 +118,9 @@ public interface ArcContainer {
* @return the bean manager
*/
BeanManager beanManager();

/**
* @return the default executor service
*/
ExecutorService getExecutorService();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
Expand Down Expand Up @@ -69,6 +71,8 @@ class ArcContainerImpl implements ArcContainer {

private final List<ResourceReferenceProvider> resourceProviders;

private volatile ExecutorService executorService;

public ArcContainerImpl() {
id = "" + ID_GENERATOR.incrementAndGet();
running = new AtomicBoolean(true);
Expand Down Expand Up @@ -251,6 +255,16 @@ public BeanManager beanManager() {
return BeanManagerImpl.INSTANCE.get();
}

@Override
public ExecutorService getExecutorService() {
ExecutorService executor = executorService;
return executor != null ? executor : ForkJoinPool.commonPool();
}

void setExecutor(ExecutorService executor) {
this.executorService = executor;
}

@Override
public String toString() {
return "ArcContainerImpl [id=" + id + ", running=" + running + ", beans=" + beans.size() + ", observers="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import javax.enterprise.context.RequestScoped;
import javax.enterprise.event.Event;
Expand All @@ -38,17 +37,11 @@
class EventImpl<T> implements Event<T> {

private static final int DEFAULT_CACHE_CAPACITY = 4;

private static final Executor DEFAULT_EXECUTOR = ForkJoinPool.commonPool();

private static final NotificationOptions DEFAULT_OPTIONS = NotificationOptions.ofExecutor(DEFAULT_EXECUTOR);
private static final NotificationOptions EMPTY_OPTIONS = NotificationOptions.builder().build();

private final HierarchyDiscovery injectionPointTypeHierarchy;

private final Type eventType;

private final Set<Annotation> qualifiers;

private final ConcurrentMap<Class<?>, Notifier<? super T>> notifiers;

private transient volatile Notifier<? super T> lastNotifier;
Expand All @@ -71,7 +64,7 @@ public void fire(T event) {

@Override
public <U extends T> CompletionStage<U> fireAsync(U event) {
return fireAsync(event, DEFAULT_OPTIONS);
return fireAsync(event, EMPTY_OPTIONS);
}

@Override
Expand All @@ -83,7 +76,7 @@ public <U extends T> CompletionStage<U> fireAsync(U event, NotificationOptions o

Executor executor = options.getExecutor();
if (executor == null) {
executor = DEFAULT_EXECUTOR;
executor = Arc.container().getExecutorService();
}

if (notifier.isEmpty()) {
Expand Down

0 comments on commit 401d9d8

Please sign in to comment.