From c8e907b2e6988ec2f80f32891391404ab5df1d96 Mon Sep 17 00:00:00 2001 From: Ken Finnigan Date: Wed, 27 May 2020 16:22:26 -0400 Subject: [PATCH] Fixes #471 Adds support for `@Broadcast` on `Emitter` to broadcast to all subscribers --- .../reactive/messaging/AbstractMediator.java | 11 +---- .../extension/EmitterConfiguration.java | 47 +++++++++++++++++++ .../messaging/extension/EmitterImpl.java | 26 ++++++---- .../messaging/extension/MediatorManager.java | 20 +++----- .../extension/ReactiveMessagingExtension.java | 6 ++- .../messaging/helpers/BroadcastHelper.java | 36 ++++++++++++++ .../broadcast/BeanEmitterBroadcast.java | 40 ++++++++++++++++ .../broadcast/BeanEmitterConsumer.java | 22 +++++++++ .../messaging/broadcast/BroadcastTest.java | 20 ++++++++ .../inject/EmitterInjectionTest.java | 21 ++++++++- 10 files changed, 215 insertions(+), 34 deletions(-) create mode 100644 smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/EmitterConfiguration.java create mode 100644 smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/helpers/BroadcastHelper.java create mode 100644 smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/broadcast/BeanEmitterBroadcast.java create mode 100644 smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/broadcast/BeanEmitterConsumer.java diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/AbstractMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/AbstractMediator.java index a910d1db60..51eb77f86a 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/AbstractMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/AbstractMediator.java @@ -10,13 +10,12 @@ import org.eclipse.microprofile.reactive.messaging.Acknowledgment; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; -import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; import org.slf4j.LoggerFactory; -import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.connectors.WorkerPoolRegistry; +import io.smallrye.reactive.messaging.helpers.BroadcastHelper; public abstract class AbstractMediator { @@ -142,13 +141,7 @@ public PublisherBuilder> decorate(PublisherBuilder> publisher = Multi.createFrom().publisher(input.buildRs()); - if (configuration.getNumberOfSubscriberBeforeConnecting() != 0) { - return ReactiveStreams.fromPublisher(publisher - .broadcast().toAtLeast(configuration.getNumberOfSubscriberBeforeConnecting())); - } else { - return ReactiveStreams.fromPublisher(publisher.broadcast().toAllSubscribers()); - } + return BroadcastHelper.broadcastPublisher(input.buildRs(), configuration.getNumberOfSubscriberBeforeConnecting()); } else { return input; } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/EmitterConfiguration.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/EmitterConfiguration.java new file mode 100644 index 0000000000..a0e9ca07bc --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/EmitterConfiguration.java @@ -0,0 +1,47 @@ +package io.smallrye.reactive.messaging.extension; + +import org.eclipse.microprofile.reactive.messaging.OnOverflow; + +import io.smallrye.reactive.messaging.annotations.Broadcast; + +public class EmitterConfiguration { + private final String name; + private OnOverflow.Strategy overflowBufferStrategy = null; + private long overflowBufferSize = -1; + private Boolean broadcast = Boolean.FALSE; + private int numberOfSubscriberBeforeConnecting = -1; + + public EmitterConfiguration(String name, OnOverflow onOverflow, Broadcast broadcast) { + this.name = name; + + if (onOverflow != null) { + this.overflowBufferStrategy = onOverflow.value(); + this.overflowBufferSize = onOverflow.bufferSize(); + } + + if (broadcast != null) { + this.broadcast = Boolean.TRUE; + this.numberOfSubscriberBeforeConnecting = broadcast.value(); + } + } + + public String getName() { + return this.name; + } + + public OnOverflow.Strategy getOverflowBufferStrategy() { + return this.overflowBufferStrategy; + } + + public long getOverflowBufferSize() { + return this.overflowBufferSize; + } + + public Boolean isBroadcast() { + return broadcast; + } + + public int getNumberOfSubscriberBeforeConnecting() { + return this.numberOfSubscriberBeforeConnecting; + } +} diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/EmitterImpl.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/EmitterImpl.java index c50d97e0e2..b24114701b 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/EmitterImpl.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/EmitterImpl.java @@ -13,6 +13,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.subscription.BackPressureStrategy; import io.smallrye.mutiny.subscription.MultiEmitter; +import io.smallrye.reactive.messaging.helpers.BroadcastHelper; /** * Implementation of the emitter pattern. @@ -28,8 +29,8 @@ public class EmitterImpl implements Emitter { private AtomicReference synchronousFailure = new AtomicReference<>(); - public EmitterImpl(String name, String overFlowStrategy, long bufferSize, long defaultBufferSize) { - this.name = name; + public EmitterImpl(EmitterConfiguration config, long defaultBufferSize) { + this.name = config.getName(); if (defaultBufferSize <= 0) { throw new IllegalArgumentException("The default buffer size must be strictly positive"); } @@ -41,20 +42,27 @@ public EmitterImpl(String name, String overFlowStrategy, long bufferSize, long d } }; - if (overFlowStrategy == null) { + Multi> tempPublisher; + if (config.getOverflowBufferStrategy() == null) { Multi> multi = Multi.createFrom().emitter(deferred, BackPressureStrategy.BUFFER); - publisher = getPublisherUsingBufferStrategy(defaultBufferSize, multi); + tempPublisher = getPublisherUsingBufferStrategy(defaultBufferSize, multi); } else { - publisher = getPublisherForStrategy(overFlowStrategy, bufferSize, defaultBufferSize, deferred); + tempPublisher = getPublisherForStrategy(config.getOverflowBufferStrategy(), config.getOverflowBufferSize(), + defaultBufferSize, deferred); + } + + if (config.isBroadcast()) { + publisher = (Multi>) BroadcastHelper + .broadcastPublisher(tempPublisher, config.getNumberOfSubscriberBeforeConnecting()).buildRs(); + } else { + publisher = tempPublisher; } } - Multi> getPublisherForStrategy(String overFlowStrategy, long bufferSize, + Multi> getPublisherForStrategy(OnOverflow.Strategy overFlowStrategy, long bufferSize, long defaultBufferSize, Consumer>> deferred) { - OnOverflow.Strategy strategy = OnOverflow.Strategy.valueOf(overFlowStrategy); - - switch (strategy) { + switch (overFlowStrategy) { case BUFFER: Multi> multi = Multi.createFrom().emitter(deferred, BackPressureStrategy.BUFFER); if (bufferSize > 0) { diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/MediatorManager.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/MediatorManager.java index 610964b3fe..5ac672c8f6 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/MediatorManager.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/MediatorManager.java @@ -6,7 +6,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; @@ -26,7 +25,6 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Message; -import org.eclipse.microprofile.reactive.messaging.OnOverflow; import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; @@ -386,14 +384,10 @@ private Optional>> getAggregatedSource( } - public void initializeEmitters(Map emitters) { - for (Map.Entry e : emitters.entrySet()) { + public void initializeEmitters(List emitters) { + for (EmitterConfiguration config : emitters) { int bufferSize = getDefaultBufferSize(); - if (e.getValue() != null) { - initializeEmitter(e.getKey(), e.getValue().value().name(), e.getValue().bufferSize(), bufferSize); - } else { - initializeEmitter(e.getKey(), null, bufferSize, bufferSize); - } + initializeEmitter(config, bufferSize); } } @@ -405,10 +399,10 @@ private int getDefaultBufferSize() { } } - public void initializeEmitter(String name, String overFlowStrategy, long bufferSize, long defaultBufferSize) { - EmitterImpl emitter = new EmitterImpl<>(name, overFlowStrategy, bufferSize, defaultBufferSize); + public void initializeEmitter(EmitterConfiguration emitterConfiguration, long defaultBufferSize) { + EmitterImpl emitter = new EmitterImpl<>(emitterConfiguration, defaultBufferSize); Publisher> publisher = emitter.getPublisher(); - channelRegistry.register(name, ReactiveStreams.fromPublisher(publisher)); - channelRegistry.register(name, emitter); + channelRegistry.register(emitterConfiguration.getName(), ReactiveStreams.fromPublisher(publisher)); + channelRegistry.register(emitterConfiguration.getName(), emitter); } } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/ReactiveMessagingExtension.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/ReactiveMessagingExtension.java index 233fb0ed65..6bb0d9deae 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/ReactiveMessagingExtension.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/extension/ReactiveMessagingExtension.java @@ -15,6 +15,7 @@ import io.smallrye.reactive.messaging.ChannelRegistry; import io.smallrye.reactive.messaging.annotations.Blocking; +import io.smallrye.reactive.messaging.annotations.Broadcast; import io.smallrye.reactive.messaging.annotations.Incomings; import io.smallrye.reactive.messaging.connectors.WorkerPoolRegistry; @@ -78,14 +79,15 @@ void afterDeploymentValidation(@Observes AfterDeploymentValidation done, BeanMan ChannelRegistry registry = instance.select(ChannelRegistry.class) .get(); - Map emitters = new HashMap<>(); + List emitters = new ArrayList<>(); for (InjectionPoint point : emitterInjectionPoints) { String name = ChannelProducer.getChannelName(point); OnOverflow onOverflow = point.getAnnotated().getAnnotation(OnOverflow.class); if (onOverflow == null) { onOverflow = createOnOverflowForLegacyAnnotation(point); } - emitters.put(name, onOverflow); + Broadcast broadcast = point.getAnnotated().getAnnotation(Broadcast.class); + emitters.add(new EmitterConfiguration(name, onOverflow, broadcast)); } WorkerPoolRegistry workerPoolRegistry = instance.select(WorkerPoolRegistry.class).get(); diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/helpers/BroadcastHelper.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/helpers/BroadcastHelper.java new file mode 100644 index 0000000000..d81083cc21 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/helpers/BroadcastHelper.java @@ -0,0 +1,36 @@ +package io.smallrye.reactive.messaging.helpers; + +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; +import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; +import org.reactivestreams.Publisher; + +import io.smallrye.mutiny.Multi; + +public class BroadcastHelper { + + private BroadcastHelper() { + // Avoid direct instantiation. + } + + /** + *

+ * Wraps an existing {@code Publisher} for broadcasting. + *

+ * + * @param publisher The publisher to be wrapped + * @param numberOfSubscriberBeforeConnecting Number of subscribers that must be present before broadcast occurs. + * A value of 0 means any number of subscribers will trigger the broadcast. + * @return The wrapped {@code Publisher} in a new {@code PublisherBuilder} + */ + public static PublisherBuilder> broadcastPublisher(Publisher> publisher, + int numberOfSubscriberBeforeConnecting) { + Multi> broadcastPublisher = Multi.createFrom().publisher(publisher); + if (numberOfSubscriberBeforeConnecting != 0) { + return ReactiveStreams.fromPublisher(broadcastPublisher + .broadcast().toAtLeast(numberOfSubscriberBeforeConnecting)); + } else { + return ReactiveStreams.fromPublisher(broadcastPublisher.broadcast().toAllSubscribers()); + } + } +} diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/broadcast/BeanEmitterBroadcast.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/broadcast/BeanEmitterBroadcast.java new file mode 100644 index 0000000000..104c65e8b0 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/broadcast/BeanEmitterBroadcast.java @@ -0,0 +1,40 @@ +package io.smallrye.reactive.messaging.broadcast; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; + +import io.smallrye.reactive.messaging.annotations.Broadcast; + +@ApplicationScoped +public class BeanEmitterBroadcast { + @Inject + @Broadcast + @Channel("X") + Emitter emitter; + + private final List list = new CopyOnWriteArrayList<>(); + + public Emitter emitter() { + return emitter; + } + + public List list() { + return list; + } + + @Incoming("X") + public void consume(final String s) { + list.add(s); + } + + public void send(String s) { + emitter.send(s); + } +} diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/broadcast/BeanEmitterConsumer.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/broadcast/BeanEmitterConsumer.java new file mode 100644 index 0000000000..11a58054d0 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/broadcast/BeanEmitterConsumer.java @@ -0,0 +1,22 @@ +package io.smallrye.reactive.messaging.broadcast; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; + +@ApplicationScoped +public class BeanEmitterConsumer { + private final List list = new CopyOnWriteArrayList<>(); + + public List list() { + return list; + } + + @Incoming("X") + public void consume(final String s) { + list.add(s); + } +} diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/broadcast/BroadcastTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/broadcast/BroadcastTest.java index 9405f43601..3483576016 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/broadcast/BroadcastTest.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/broadcast/BroadcastTest.java @@ -22,4 +22,24 @@ public void testBroadcast() { assertThat(bean.l1()).containsExactly("A", "B", "C", "D").containsExactlyElementsOf(bean.l2()); } + @Test + public void testBroadcastOfEmitter() { + addBeanClass(BeanEmitterBroadcast.class, BeanEmitterConsumer.class); + initialize(); + + BeanEmitterBroadcast broadcastAndConsumer = get(BeanEmitterBroadcast.class); + BeanEmitterConsumer consumer = get(BeanEmitterConsumer.class); + + broadcastAndConsumer.send("a"); + broadcastAndConsumer.send("b"); + broadcastAndConsumer.send("c"); + broadcastAndConsumer.send("d"); + + assertThat(broadcastAndConsumer.emitter()).isNotNull(); + + await().until(() -> broadcastAndConsumer.list().size() == 4); + await().until(() -> consumer.list().size() == 4); + + assertThat(broadcastAndConsumer.list()).containsExactly("a", "b", "c", "d").containsExactlyElementsOf(consumer.list()); + } } diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/inject/EmitterInjectionTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/inject/EmitterInjectionTest.java index 41797da763..cb3a349036 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/inject/EmitterInjectionTest.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/inject/EmitterInjectionTest.java @@ -3,6 +3,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.lang.annotation.Annotation; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -23,6 +24,7 @@ import io.reactivex.subscribers.TestSubscriber; import io.smallrye.reactive.messaging.WeldTestBaseWithoutTails; import io.smallrye.reactive.messaging.annotations.Merge; +import io.smallrye.reactive.messaging.extension.EmitterConfiguration; import io.smallrye.reactive.messaging.extension.EmitterImpl; public class EmitterInjectionTest extends WeldTestBaseWithoutTails { @@ -581,7 +583,24 @@ public void consume(final String b) { @Test // Reproduce #511 public void testWeCanHaveSeveralSubscribers() { - EmitterImpl emitter = new EmitterImpl<>("my-channel", "BUFFER", 128, 128); + OnOverflow overflow = new OnOverflow() { + @Override + public Class annotationType() { + return OnOverflow.class; + } + + @Override + public Strategy value() { + return OnOverflow.Strategy.BUFFER; + } + + @Override + public long bufferSize() { + return 128; + } + }; + EmitterConfiguration config = new EmitterConfiguration("my-channel", overflow, null); + EmitterImpl emitter = new EmitterImpl<>(config, 128); Publisher> publisher = emitter.getPublisher(); TestSubscriber> sub1 = new TestSubscriber<>();