From fbf086e6d858d32bd3ffa1bf2c3a46cbfd75e781 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Wed, 3 Oct 2018 09:01:59 +0200 Subject: [PATCH] Arc - basic asynchronous observer methods support --- .../protean/arc/processor/BeanDeployment.java | 18 +- .../jboss/protean/arc/processor/DotNames.java | 2 + .../protean/arc/processor/Injection.java | 2 +- .../arc/processor/InjectionPointInfo.java | 4 +- .../arc/processor/ObserverGenerator.java | 30 ++- .../protean/arc/processor/ObserverInfo.java | 17 +- .../protean/arc/AsyncEventDeliveryStage.java | 247 ++++++++++++++++++ .../java/org/jboss/protean/arc/EventImpl.java | 124 ++++++++- .../async/AsyncObserverExceptionTest.java | 94 +++++++ .../observers/async/AsyncObserverTest.java | 95 +++++++ 10 files changed, 605 insertions(+), 28 deletions(-) create mode 100644 ext/arc/runtime/src/main/java/org/jboss/protean/arc/AsyncEventDeliveryStage.java create mode 100644 ext/arc/tests/src/test/java/org/jboss/protean/arc/test/observers/async/AsyncObserverExceptionTest.java create mode 100644 ext/arc/tests/src/test/java/org/jboss/protean/arc/test/observers/async/AsyncObserverTest.java diff --git a/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/BeanDeployment.java b/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/BeanDeployment.java index 192c0566f2133..f7a9fddfbb112 100644 --- a/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/BeanDeployment.java +++ b/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/BeanDeployment.java @@ -190,7 +190,8 @@ private List findBeans(List beanDefiningAnnotations, List producerMethods = new HashSet<>(); Set disposerMethods = new HashSet<>(); Set producerFields = new HashSet<>(); - Set observerMethods = new HashSet<>(); + Set syncObserverMethods = new HashSet<>(); + Set asyncObserverMethods = new HashSet<>(); for (DotName beanDefiningAnnotation : beanDefiningAnnotations) { for (AnnotationInstance annotation : index.getAnnotations(beanDefiningAnnotation)) { @@ -222,7 +223,10 @@ private List findBeans(List beanDefiningAnnotations, List findBeans(List beanDefiningAnnotations, List annotations.stream().anyMatch(a -> a.name().equals(DotNames.OBSERVES)))); + annotations -> annotations.stream().anyMatch(a -> a.name().equals(DotNames.OBSERVES) || a.name().equals(DotNames.OBSERVES_ASYNC)))); } final AnnotationTarget target; diff --git a/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/InjectionPointInfo.java b/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/InjectionPointInfo.java index 2bde1ef967ac8..a3bbdd3c37287 100644 --- a/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/InjectionPointInfo.java +++ b/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/InjectionPointInfo.java @@ -32,7 +32,7 @@ static List fromMethod(MethodInfo method, BeanDeployment bea return fromMethod(method, beanDeployment, null); } - static List fromMethod(MethodInfo method, BeanDeployment beanDeployment, Predicate> skiPredicate) { + static List fromMethod(MethodInfo method, BeanDeployment beanDeployment, Predicate> skipPredicate) { List injectionPoints = new ArrayList<>(); for (ListIterator iterator = method.parameters().listIterator(); iterator.hasNext();) { Type paramType = iterator.next(); @@ -43,7 +43,7 @@ static List fromMethod(MethodInfo method, BeanDeployment bea paramAnnotations.add(annotation); } } - if (skiPredicate != null && skiPredicate.test(paramAnnotations)) { + if (skipPredicate != null && skipPredicate.test(paramAnnotations)) { // Skip parameter, e.g. @Disposes continue; } diff --git a/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/ObserverGenerator.java b/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/ObserverGenerator.java index 33888889166dd..0e3b0e505c689 100644 --- a/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/ObserverGenerator.java +++ b/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/ObserverGenerator.java @@ -96,16 +96,19 @@ Collection generate(ObserverInfo observer, ReflectionRegistration refl initMaps(observer, injectionPointToProviderField); createProviderFields(observerCreator, observer, injectionPointToProviderField); - createConstructor(classOutput, observerCreator, observer, baseName, injectionPointToProviderField, annotationLiterals); - createGetObservedType(observerCreator, observedType.getFieldDescriptor()); + + implementGetObservedType(observerCreator, observedType.getFieldDescriptor()); if (observedQualifiers != null) { - createGetObservedQualifiers(observerCreator, observedQualifiers.getFieldDescriptor()); + implementGetObservedQualifiers(observerCreator, observedQualifiers.getFieldDescriptor()); } - createGetBeanClass(observerCreator, observer.getDeclaringBean().getTarget().asClass().name()); - createNotify(observer, observerCreator, injectionPointToProviderField, reflectionRegistration); + implementGetBeanClass(observerCreator, observer.getDeclaringBean().getTarget().asClass().name()); + implementNotify(observer, observerCreator, injectionPointToProviderField, reflectionRegistration); if (observer.getPriority() != ObserverMethod.DEFAULT_PRIORITY) { - createGetPriority(observerCreator, observer); + implementGetPriority(observerCreator, observer); + } + if (observer.isAsync()) { + implementIsAsync(observerCreator); } observerCreator.close(); @@ -120,27 +123,32 @@ protected void initMaps(ObserverInfo observer, Map i } } - protected void createGetObservedType(ClassCreator observerCreator, FieldDescriptor observedTypeField) { + protected void implementGetObservedType(ClassCreator observerCreator, FieldDescriptor observedTypeField) { MethodCreator getObservedType = observerCreator.getMethodCreator("getObservedType", Type.class).setModifiers(ACC_PUBLIC); getObservedType.returnValue(getObservedType.readInstanceField(observedTypeField, getObservedType.getThis())); } - protected void createGetObservedQualifiers(ClassCreator observerCreator, FieldDescriptor observedQualifiersField) { + protected void implementGetObservedQualifiers(ClassCreator observerCreator, FieldDescriptor observedQualifiersField) { MethodCreator getObservedQualifiers = observerCreator.getMethodCreator("getObservedQualifiers", Set.class).setModifiers(ACC_PUBLIC); getObservedQualifiers.returnValue(getObservedQualifiers.readInstanceField(observedQualifiersField, getObservedQualifiers.getThis())); } - protected void createGetBeanClass(ClassCreator observerCreator, DotName beanClass) { + protected void implementGetBeanClass(ClassCreator observerCreator, DotName beanClass) { MethodCreator getBeanClass = observerCreator.getMethodCreator("getBeanClass", Class.class).setModifiers(ACC_PUBLIC); getBeanClass.returnValue(getBeanClass.loadClass(beanClass.toString())); } - protected void createGetPriority(ClassCreator observerCreator, ObserverInfo observer) { + protected void implementGetPriority(ClassCreator observerCreator, ObserverInfo observer) { MethodCreator getPriority = observerCreator.getMethodCreator("getPriority", int.class).setModifiers(ACC_PUBLIC); getPriority.returnValue(getPriority.load(observer.getPriority())); } - protected void createNotify(ObserverInfo observer, ClassCreator observerCreator, Map injectionPointToProviderField, + protected void implementIsAsync(ClassCreator observerCreator) { + MethodCreator isAsync = observerCreator.getMethodCreator("isAsync", boolean.class).setModifiers(ACC_PUBLIC); + isAsync.returnValue(isAsync.load(true)); + } + + protected void implementNotify(ObserverInfo observer, ClassCreator observerCreator, Map injectionPointToProviderField, ReflectionRegistration reflectionRegistration) { MethodCreator notify = observerCreator.getMethodCreator("notify", void.class, EventContext.class).setModifiers(ACC_PUBLIC); diff --git a/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/ObserverInfo.java b/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/ObserverInfo.java index 8828cb6204cb1..2808964a7285d 100644 --- a/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/ObserverInfo.java +++ b/ext/arc/processor/src/main/java/org/jboss/protean/arc/processor/ObserverInfo.java @@ -15,6 +15,11 @@ import org.jboss.jandex.MethodParameterInfo; import org.jboss.jandex.Type; +/** + * Represents an observer method. + * + * @author Martin Kouba + */ public class ObserverInfo { private final BeanInfo declaringBean; @@ -29,7 +34,9 @@ public class ObserverInfo { private final int priority; - ObserverInfo(BeanInfo declaringBean, MethodInfo observerMethod, Injection injection) { + private final boolean isAsync; + + ObserverInfo(BeanInfo declaringBean, MethodInfo observerMethod, Injection injection, boolean isAsync) { this.declaringBean = declaringBean; this.observerMethod = observerMethod; this.injection = injection; @@ -41,6 +48,7 @@ public class ObserverInfo { } else { this.priority = ObserverMethod.DEFAULT_PRIORITY; } + this.isAsync = isAsync; } BeanInfo getDeclaringBean() { @@ -63,6 +71,10 @@ Injection getInjection() { return injection; } + boolean isAsync() { + return isAsync; + } + void init() { for (InjectionPointInfo injectionPoint : injection.injectionPoints) { Beans.resolveInjectionPoint(declaringBean.getDeployment(), null, injectionPoint); @@ -90,7 +102,8 @@ int getPriority() { MethodParameterInfo initEventParam(MethodInfo observerMethod) { List eventParams = new ArrayList<>(); for (AnnotationInstance annotation : observerMethod.annotations()) { - if (Kind.METHOD_PARAMETER == annotation.target().kind() && annotation.name().equals(DotNames.OBSERVES)) { + if (Kind.METHOD_PARAMETER == annotation.target().kind() + && (annotation.name().equals(DotNames.OBSERVES) || annotation.name().equals(DotNames.OBSERVES_ASYNC))) { eventParams.add(annotation.target().asMethodParameter()); } } diff --git a/ext/arc/runtime/src/main/java/org/jboss/protean/arc/AsyncEventDeliveryStage.java b/ext/arc/runtime/src/main/java/org/jboss/protean/arc/AsyncEventDeliveryStage.java new file mode 100644 index 0000000000000..d35bd8dd9388c --- /dev/null +++ b/ext/arc/runtime/src/main/java/org/jboss/protean/arc/AsyncEventDeliveryStage.java @@ -0,0 +1,247 @@ +/* + * JBoss, Home of Professional Open Source + * Copyright 2014, Red Hat, Inc., and individual contributors + * by the @authors tag. See the copyright.txt in the distribution for a + * full listing of individual contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jboss.protean.arc; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * + * @param + */ +class AsyncEventDeliveryStage implements CompletionStage { + + static AsyncEventDeliveryStage completed(T event, Executor executor) { + final CompletableFuture delegate = new CompletableFuture<>(); + delegate.complete(event); + return new AsyncEventDeliveryStage<>(delegate, executor); + } + + private final Executor defaultExecutor; + + private final CompletionStage delegate; + + AsyncEventDeliveryStage(Supplier supplier, Executor executor) { + this(CompletableFuture.supplyAsync(supplier, executor), executor); + } + + AsyncEventDeliveryStage(CompletionStage delegate, Executor executor) { + this.delegate = delegate; + this.defaultExecutor = executor; + } + + @Override + public CompletionStage thenApply(Function fn) { + return wrap(delegate.thenApply(fn)); + } + + @Override + public CompletionStage thenApplyAsync(Function fn) { + return wrap(delegate.thenApplyAsync(fn, defaultExecutor)); + } + + @Override + public CompletionStage thenApplyAsync(Function fn, Executor executor) { + return wrap(delegate.thenApplyAsync(fn, executor)); + } + + @Override + public CompletionStage thenAccept(Consumer action) { + return wrap(delegate.thenAccept(action)); + } + + @Override + public CompletionStage thenAcceptAsync(Consumer action) { + return wrap(delegate.thenAcceptAsync(action, defaultExecutor)); + } + + @Override + public CompletionStage thenAcceptAsync(Consumer action, Executor executor) { + return wrap(delegate.thenAcceptAsync(action, executor)); + } + + @Override + public CompletionStage thenRun(Runnable action) { + return wrap(delegate.thenRun(action)); + } + + @Override + public CompletionStage thenRunAsync(Runnable action) { + return wrap(delegate.thenRunAsync(action, defaultExecutor)); + } + + @Override + public CompletionStage thenRunAsync(Runnable action, Executor executor) { + return wrap(delegate.thenRunAsync(action, executor)); + } + + @Override + public CompletionStage thenCombine(CompletionStage other, BiFunction fn) { + return wrap(delegate.thenCombine(other, fn)); + } + + @Override + public CompletionStage thenCombineAsync(CompletionStage other, BiFunction fn) { + return wrap(delegate.thenCombineAsync(other, fn, defaultExecutor)); + } + + @Override + public CompletionStage thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor) { + return wrap(delegate.thenCombineAsync(other, fn, executor)); + } + + @Override + public CompletionStage thenAcceptBoth(CompletionStage other, BiConsumer action) { + return wrap(delegate.thenAcceptBoth(other, action)); + } + + @Override + public CompletionStage thenAcceptBothAsync(CompletionStage other, BiConsumer action) { + return wrap(delegate.thenAcceptBothAsync(other, action, defaultExecutor)); + } + + @Override + public CompletionStage thenAcceptBothAsync(CompletionStage other, BiConsumer action, Executor executor) { + return wrap(delegate.thenAcceptBothAsync(other, action, executor)); + } + + @Override + public CompletionStage runAfterBoth(CompletionStage other, Runnable action) { + return wrap(delegate.runAfterBoth(other, action)); + } + + @Override + public CompletionStage runAfterBothAsync(CompletionStage other, Runnable action) { + return wrap(delegate.runAfterBothAsync(other, action, defaultExecutor)); + } + + @Override + public CompletionStage runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) { + return wrap(delegate.runAfterBothAsync(other, action, executor)); + } + + @Override + public CompletionStage applyToEither(CompletionStage other, Function fn) { + return wrap(delegate.applyToEither(other, fn)); + } + + @Override + public CompletionStage applyToEitherAsync(CompletionStage other, Function fn) { + return wrap(delegate.applyToEitherAsync(other, fn, defaultExecutor)); + } + + @Override + public CompletionStage applyToEitherAsync(CompletionStage other, Function fn, Executor executor) { + return wrap(delegate.applyToEitherAsync(other, fn, executor)); + } + + @Override + public CompletionStage acceptEither(CompletionStage other, Consumer action) { + return wrap(delegate.acceptEither(other, action)); + } + + @Override + public CompletionStage acceptEitherAsync(CompletionStage other, Consumer action) { + return wrap(delegate.acceptEitherAsync(other, action, defaultExecutor)); + } + + @Override + public CompletionStage acceptEitherAsync(CompletionStage other, Consumer action, Executor executor) { + return wrap(delegate.acceptEitherAsync(other, action, executor)); + } + + @Override + public CompletionStage runAfterEither(CompletionStage other, Runnable action) { + return wrap(delegate.runAfterEither(other, action)); + } + + @Override + public CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action) { + return wrap(delegate.runAfterEitherAsync(other, action, defaultExecutor)); + } + + @Override + public CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor) { + return wrap(delegate.runAfterEitherAsync(other, action, executor)); + } + + @Override + public CompletionStage thenCompose(Function> fn) { + return wrap(delegate.thenCompose(fn)); + } + + @Override + public CompletionStage thenComposeAsync(Function> fn) { + return wrap(delegate.thenComposeAsync(fn, defaultExecutor)); + } + + @Override + public CompletionStage thenComposeAsync(Function> fn, Executor executor) { + return wrap(delegate.thenComposeAsync(fn, executor)); + } + + @Override + public CompletionStage exceptionally(Function fn) { + return wrap(delegate.exceptionally(fn)); + } + + @Override + public CompletionStage whenComplete(BiConsumer action) { + return wrap(delegate.whenComplete(action)); + } + + @Override + public CompletionStage whenCompleteAsync(BiConsumer action) { + return wrap(delegate.whenCompleteAsync(action, defaultExecutor)); + } + + @Override + public CompletionStage whenCompleteAsync(BiConsumer action, Executor executor) { + return wrap(delegate.whenCompleteAsync(action, executor)); + } + + @Override + public CompletionStage handle(BiFunction fn) { + return wrap(delegate.handle(fn)); + } + + @Override + public CompletionStage handleAsync(BiFunction fn) { + return wrap(delegate.handleAsync(fn, defaultExecutor)); + } + + @Override + public CompletionStage handleAsync(BiFunction fn, Executor executor) { + return wrap(delegate.handleAsync(fn, executor)); + } + + private CompletionStage wrap(CompletionStage completionStage) { + return new AsyncEventDeliveryStage(completionStage, defaultExecutor); + } + + @Override + public CompletableFuture toCompletableFuture() { + return delegate.toCompletableFuture(); + } + +} diff --git a/ext/arc/runtime/src/main/java/org/jboss/protean/arc/EventImpl.java b/ext/arc/runtime/src/main/java/org/jboss/protean/arc/EventImpl.java index 4ff9f6d499d9d..255aacf2d0d91 100644 --- a/ext/arc/runtime/src/main/java/org/jboss/protean/arc/EventImpl.java +++ b/ext/arc/runtime/src/main/java/org/jboss/protean/arc/EventImpl.java @@ -4,14 +4,22 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; import javax.enterprise.event.Event; import javax.enterprise.event.NotificationOptions; +import javax.enterprise.event.ObserverException; import javax.enterprise.inject.Any; import javax.enterprise.inject.spi.EventContext; import javax.enterprise.inject.spi.EventMetadata; @@ -29,11 +37,13 @@ class EventImpl implements Event { private static final int DEFAULT_CACHE_CAPACITY = 4; + private static final NotificationOptions DEFAULT_OPTIONS = NotificationOptions.ofExecutor(ForkJoinPool.commonPool()); + private final HierarchyDiscovery injectionPointTypeHierarchy; private final Set qualifiers; - private final ConcurrentMap, Notifier> notifierCache; + private final ConcurrentMap, Notifier> notifiers; public EventImpl(Type eventType, Set qualifiers) { if (eventType instanceof ParameterizedType) { @@ -44,22 +54,39 @@ public EventImpl(Type eventType, Set qualifiers) { this.injectionPointTypeHierarchy = new HierarchyDiscovery(eventType); this.qualifiers = qualifiers; this.qualifiers.add(Any.Literal.INSTANCE); - this.notifierCache = new ConcurrentHashMap<>(DEFAULT_CACHE_CAPACITY); + this.notifiers = new ConcurrentHashMap<>(DEFAULT_CACHE_CAPACITY); } @Override public void fire(T event) { - notifierCache.computeIfAbsent(event.getClass(), this::createNotifier).notify(event); + notifiers.computeIfAbsent(event.getClass(), this::createNotifier).notify(event, ObserverExceptionHandler.IMMEDIATE_HANDLER, false); } @Override public CompletionStage fireAsync(U event) { - throw new UnsupportedOperationException(); + return fireAsync(event, DEFAULT_OPTIONS); } @Override public CompletionStage fireAsync(U event, NotificationOptions options) { - throw new UnsupportedOperationException(); + Objects.requireNonNull(options); + + @SuppressWarnings("unchecked") + Notifier notifier = (Notifier) notifiers.computeIfAbsent(event.getClass(), this::createNotifier); + Executor executor = options.getExecutor(); + + if (notifier.isEmpty()) { + return AsyncEventDeliveryStage.completed(event, executor); + } + + ObserverExceptionHandler exceptionHandler = new CollectingExceptionHandler(); + CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { + notifier.notify(event, exceptionHandler, true); + handleExceptions(exceptionHandler); + return event; + }, executor); + + return new AsyncEventDeliveryStage<>(completableFuture, executor); } @Override @@ -113,6 +140,22 @@ private Type getEventType(Class runtimeType) { return resolvedType; } + private void handleExceptions(ObserverExceptionHandler handler) { + List handledExceptions = handler.getHandledExceptions(); + if (!handledExceptions.isEmpty()) { + CompletionException exception = null; + if (handledExceptions.size() == 1) { + exception = new CompletionException(handledExceptions.get(0)); + } else { + exception = new CompletionException(null); + } + for (Throwable handledException : handledExceptions) { + exception.addSuppressed(handledException); + } + throw exception; + } + } + static class Notifier { private final List> observerMethods; @@ -124,16 +167,30 @@ static class Notifier { this.eventMetadata = eventMetadata; } - @SuppressWarnings({ "rawtypes", "unchecked" }) void notify(T event) { - if (!observerMethods.isEmpty()) { + notify(event, ObserverExceptionHandler.IMMEDIATE_HANDLER, false); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + void notify(T event, ObserverExceptionHandler exceptionHandler, boolean async) { + if (!isEmpty()) { EventContext eventContext = new EventContextImpl<>(event, eventMetadata); for (ObserverMethod observerMethod : observerMethods) { - observerMethod.notify(eventContext); + if (observerMethod.isAsync() == async) { + try { + observerMethod.notify(eventContext); + } catch (Throwable e) { + exceptionHandler.handle(e); + } + } } } } + boolean isEmpty() { + return observerMethods.isEmpty(); + } + } static class EventContextImpl implements EventContext { @@ -188,4 +245,55 @@ public Type getType() { } + /** + * There are two different strategies of exception handling for observer methods. When an exception is raised by a synchronous or transactional observer for + * a synchronous event, this exception stops the notification chain and the exception is propagated immediately. On the other hand, an exception thrown + * during asynchronous event delivery never is never propagated directly. Instead, all the exceptions for a given asynchronous event are collected and then + * made available together using CompletionException. + * + * @author Jozef Hartinger + * + */ + protected interface ObserverExceptionHandler { + + ObserverExceptionHandler IMMEDIATE_HANDLER = throwable -> { + if (throwable instanceof RuntimeException) { + throw (RuntimeException) throwable; + } + if (throwable instanceof Error) { + throw (Error) throwable; + } + throw new ObserverException(throwable); + }; + + void handle(Throwable throwable); + + default List getHandledExceptions() { + return Collections.emptyList(); + } + } + + static class CollectingExceptionHandler implements ObserverExceptionHandler { + + private List throwables; + + CollectingExceptionHandler() { + this(new LinkedList<>()); + } + + CollectingExceptionHandler(List throwables) { + this.throwables = throwables; + } + + @Override + public void handle(Throwable throwable) { + throwables.add(throwable); + } + + @Override + public List getHandledExceptions() { + return throwables; + } + } + } diff --git a/ext/arc/tests/src/test/java/org/jboss/protean/arc/test/observers/async/AsyncObserverExceptionTest.java b/ext/arc/tests/src/test/java/org/jboss/protean/arc/test/observers/async/AsyncObserverExceptionTest.java new file mode 100644 index 0000000000000..6d0d5e6f271c3 --- /dev/null +++ b/ext/arc/tests/src/test/java/org/jboss/protean/arc/test/observers/async/AsyncObserverExceptionTest.java @@ -0,0 +1,94 @@ +package org.jboss.protean.arc.test.observers.async; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.PostConstruct; +import javax.annotation.Priority; +import javax.enterprise.context.Dependent; +import javax.enterprise.event.Event; +import javax.enterprise.event.ObservesAsync; +import javax.inject.Inject; +import javax.inject.Singleton; + +import org.jboss.protean.arc.Arc; +import org.jboss.protean.arc.ArcContainer; +import org.jboss.protean.arc.test.ArcTestContainer; +import org.junit.Rule; +import org.junit.Test; + +public class AsyncObserverExceptionTest { + + @Rule + public ArcTestContainer container = new ArcTestContainer(StringProducer.class, StringObserver.class); + + @Test + public void testAsyncObservers() throws InterruptedException, ExecutionException, TimeoutException { + ArcContainer container = Arc.container(); + StringProducer producer = container.instance(StringProducer.class).get(); + StringObserver observer = container.instance(StringObserver.class).get(); + + BlockingQueue synchronizer = new LinkedBlockingQueue<>(); + producer.produceAsync("pong").exceptionally(ex -> { + synchronizer.add(ex); + return ex.getMessage(); + }); + + Object exception = synchronizer.poll(10, TimeUnit.SECONDS); + assertNotNull(exception); + assertTrue(exception instanceof RuntimeException); + + List events = observer.getEvents(); + assertEquals(2, events.size()); + assertEquals("async1::pong", events.get(0)); + assertEquals("async2::pong", events.get(1)); + } + + @Singleton + static class StringObserver { + + private List events; + + @PostConstruct + void init() { + events = new CopyOnWriteArrayList<>(); + } + + void observeAsync1(@ObservesAsync @Priority(1) String value) { + events.add("async1::" + value); + throw new RuntimeException("nok"); + } + + void observeAsync2(@ObservesAsync @Priority(2) String value) { + events.add("async2::" + value); + } + + List getEvents() { + return events; + } + + } + + @Dependent + static class StringProducer { + + @Inject + Event event; + + CompletionStage produceAsync(String value) { + return event.fireAsync(value); + } + + } + +} diff --git a/ext/arc/tests/src/test/java/org/jboss/protean/arc/test/observers/async/AsyncObserverTest.java b/ext/arc/tests/src/test/java/org/jboss/protean/arc/test/observers/async/AsyncObserverTest.java new file mode 100644 index 0000000000000..bd6e96b48cc3e --- /dev/null +++ b/ext/arc/tests/src/test/java/org/jboss/protean/arc/test/observers/async/AsyncObserverTest.java @@ -0,0 +1,95 @@ +package org.jboss.protean.arc.test.observers.async; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.PostConstruct; +import javax.enterprise.context.Dependent; +import javax.enterprise.event.Event; +import javax.enterprise.event.Observes; +import javax.enterprise.event.ObservesAsync; +import javax.inject.Inject; +import javax.inject.Singleton; + +import org.jboss.protean.arc.Arc; +import org.jboss.protean.arc.ArcContainer; +import org.jboss.protean.arc.test.ArcTestContainer; +import org.junit.Rule; +import org.junit.Test; + +public class AsyncObserverTest { + + @Rule + public ArcTestContainer container = new ArcTestContainer(StringProducer.class, StringObserver.class); + + @Test + public void testAsyncObservers() throws InterruptedException, ExecutionException, TimeoutException { + ArcContainer container = Arc.container(); + StringProducer producer = container.instance(StringProducer.class).get(); + StringObserver observer = container.instance(StringObserver.class).get(); + String currentThread = Thread.currentThread().getName(); + + producer.produce("ping"); + List events = observer.getEvents(); + assertEquals(1, events.size()); + assertTrue(events.get(0).startsWith("sync::ping")); + assertTrue(events.get(0).endsWith(currentThread)); + + events.clear(); + + CompletionStage completionStage = producer.produceAsync("pong"); + assertEquals("pong", completionStage.toCompletableFuture().get(10, TimeUnit.SECONDS)); + assertEquals(1, events.size()); + assertTrue(events.get(0).startsWith("async::pong")); + assertFalse(events.get(0).endsWith(currentThread)); + } + + @Singleton + static class StringObserver { + + private List events; + + @PostConstruct + void init() { + events = new CopyOnWriteArrayList<>(); + } + + void observeAsync(@ObservesAsync String value) { + events.add("async::" + value + "::" + Thread.currentThread().getName()); + } + + void observeSync(@Observes String value) { + events.add("sync::" + value + "::" + Thread.currentThread().getName()); + } + + List getEvents() { + return events; + } + + } + + @Dependent + static class StringProducer { + + @Inject + Event event; + + void produce(String value) { + event.fire(value); + } + + CompletionStage produceAsync(String value) { + return event.fireAsync(value); + } + + } + +}