Skip to content

Commit

Permalink
Fixes #471
Browse files Browse the repository at this point in the history
Adds support for `@Broadcast` on `Emitter` to broadcast to all subscribers
  • Loading branch information
kenfinnigan committed May 27, 2020
1 parent 5cf72f6 commit c8e907b
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.slf4j.LoggerFactory;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.connectors.WorkerPoolRegistry;
import io.smallrye.reactive.messaging.helpers.BroadcastHelper;

public abstract class AbstractMediator {

Expand Down Expand Up @@ -142,13 +141,7 @@ public PublisherBuilder<? extends Message<?>> decorate(PublisherBuilder<? extend
}

if (configuration.getBroadcast()) {
Multi<? extends Message<?>> publisher = Multi.createFrom().publisher(input.buildRs());
if (configuration.getNumberOfSubscriberBeforeConnecting() != 0) {
return ReactiveStreams.fromPublisher(publisher
.broadcast().toAtLeast(configuration.getNumberOfSubscriberBeforeConnecting()));
} else {
return ReactiveStreams.fromPublisher(publisher.broadcast().toAllSubscribers());
}
return BroadcastHelper.broadcastPublisher(input.buildRs(), configuration.getNumberOfSubscriberBeforeConnecting());
} else {
return input;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.smallrye.reactive.messaging.extension;

import org.eclipse.microprofile.reactive.messaging.OnOverflow;

import io.smallrye.reactive.messaging.annotations.Broadcast;

public class EmitterConfiguration {
private final String name;
private OnOverflow.Strategy overflowBufferStrategy = null;
private long overflowBufferSize = -1;
private Boolean broadcast = Boolean.FALSE;
private int numberOfSubscriberBeforeConnecting = -1;

public EmitterConfiguration(String name, OnOverflow onOverflow, Broadcast broadcast) {
this.name = name;

if (onOverflow != null) {
this.overflowBufferStrategy = onOverflow.value();
this.overflowBufferSize = onOverflow.bufferSize();
}

if (broadcast != null) {
this.broadcast = Boolean.TRUE;
this.numberOfSubscriberBeforeConnecting = broadcast.value();
}
}

public String getName() {
return this.name;
}

public OnOverflow.Strategy getOverflowBufferStrategy() {
return this.overflowBufferStrategy;
}

public long getOverflowBufferSize() {
return this.overflowBufferSize;
}

public Boolean isBroadcast() {
return broadcast;
}

public int getNumberOfSubscriberBeforeConnecting() {
return this.numberOfSubscriberBeforeConnecting;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.BackPressureStrategy;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.reactive.messaging.helpers.BroadcastHelper;

/**
* Implementation of the emitter pattern.
Expand All @@ -28,8 +29,8 @@ public class EmitterImpl<T> implements Emitter<T> {

private AtomicReference<Throwable> synchronousFailure = new AtomicReference<>();

public EmitterImpl(String name, String overFlowStrategy, long bufferSize, long defaultBufferSize) {
this.name = name;
public EmitterImpl(EmitterConfiguration config, long defaultBufferSize) {
this.name = config.getName();
if (defaultBufferSize <= 0) {
throw new IllegalArgumentException("The default buffer size must be strictly positive");
}
Expand All @@ -41,20 +42,27 @@ public EmitterImpl(String name, String overFlowStrategy, long bufferSize, long d
}
};

if (overFlowStrategy == null) {
Multi<Message<? extends T>> tempPublisher;
if (config.getOverflowBufferStrategy() == null) {
Multi<Message<? extends T>> multi = Multi.createFrom().emitter(deferred, BackPressureStrategy.BUFFER);
publisher = getPublisherUsingBufferStrategy(defaultBufferSize, multi);
tempPublisher = getPublisherUsingBufferStrategy(defaultBufferSize, multi);
} else {
publisher = getPublisherForStrategy(overFlowStrategy, bufferSize, defaultBufferSize, deferred);
tempPublisher = getPublisherForStrategy(config.getOverflowBufferStrategy(), config.getOverflowBufferSize(),
defaultBufferSize, deferred);
}

if (config.isBroadcast()) {
publisher = (Multi<Message<? extends T>>) BroadcastHelper
.broadcastPublisher(tempPublisher, config.getNumberOfSubscriberBeforeConnecting()).buildRs();
} else {
publisher = tempPublisher;
}
}

Multi<Message<? extends T>> getPublisherForStrategy(String overFlowStrategy, long bufferSize,
Multi<Message<? extends T>> getPublisherForStrategy(OnOverflow.Strategy overFlowStrategy, long bufferSize,
long defaultBufferSize,
Consumer<MultiEmitter<? super Message<? extends T>>> deferred) {
OnOverflow.Strategy strategy = OnOverflow.Strategy.valueOf(overFlowStrategy);

switch (strategy) {
switch (overFlowStrategy) {
case BUFFER:
Multi<Message<? extends T>> multi = Multi.createFrom().emitter(deferred, BackPressureStrategy.BUFFER);
if (bufferSize > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -26,7 +25,6 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
Expand Down Expand Up @@ -386,14 +384,10 @@ private Optional<PublisherBuilder<? extends Message<?>>> getAggregatedSource(

}

public void initializeEmitters(Map<String, OnOverflow> emitters) {
for (Map.Entry<String, OnOverflow> e : emitters.entrySet()) {
public void initializeEmitters(List<EmitterConfiguration> emitters) {
for (EmitterConfiguration config : emitters) {
int bufferSize = getDefaultBufferSize();
if (e.getValue() != null) {
initializeEmitter(e.getKey(), e.getValue().value().name(), e.getValue().bufferSize(), bufferSize);
} else {
initializeEmitter(e.getKey(), null, bufferSize, bufferSize);
}
initializeEmitter(config, bufferSize);
}
}

Expand All @@ -405,10 +399,10 @@ private int getDefaultBufferSize() {
}
}

public void initializeEmitter(String name, String overFlowStrategy, long bufferSize, long defaultBufferSize) {
EmitterImpl<?> emitter = new EmitterImpl<>(name, overFlowStrategy, bufferSize, defaultBufferSize);
public void initializeEmitter(EmitterConfiguration emitterConfiguration, long defaultBufferSize) {
EmitterImpl<?> emitter = new EmitterImpl<>(emitterConfiguration, defaultBufferSize);
Publisher<? extends Message<?>> publisher = emitter.getPublisher();
channelRegistry.register(name, ReactiveStreams.fromPublisher(publisher));
channelRegistry.register(name, emitter);
channelRegistry.register(emitterConfiguration.getName(), ReactiveStreams.fromPublisher(publisher));
channelRegistry.register(emitterConfiguration.getName(), emitter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import io.smallrye.reactive.messaging.annotations.Incomings;
import io.smallrye.reactive.messaging.connectors.WorkerPoolRegistry;

Expand Down Expand Up @@ -78,14 +79,15 @@ void afterDeploymentValidation(@Observes AfterDeploymentValidation done, BeanMan
ChannelRegistry registry = instance.select(ChannelRegistry.class)
.get();

Map<String, OnOverflow> emitters = new HashMap<>();
List<EmitterConfiguration> emitters = new ArrayList<>();
for (InjectionPoint point : emitterInjectionPoints) {
String name = ChannelProducer.getChannelName(point);
OnOverflow onOverflow = point.getAnnotated().getAnnotation(OnOverflow.class);
if (onOverflow == null) {
onOverflow = createOnOverflowForLegacyAnnotation(point);
}
emitters.put(name, onOverflow);
Broadcast broadcast = point.getAnnotated().getAnnotation(Broadcast.class);
emitters.add(new EmitterConfiguration(name, onOverflow, broadcast));
}

WorkerPoolRegistry workerPoolRegistry = instance.select(WorkerPoolRegistry.class).get();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.smallrye.reactive.messaging.helpers;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;

import io.smallrye.mutiny.Multi;

public class BroadcastHelper {

private BroadcastHelper() {
// Avoid direct instantiation.
}

/**
* <p>
* Wraps an existing {@code Publisher} for broadcasting.
* </p>
*
* @param publisher The publisher to be wrapped
* @param numberOfSubscriberBeforeConnecting Number of subscribers that must be present before broadcast occurs.
* A value of 0 means any number of subscribers will trigger the broadcast.
* @return The wrapped {@code Publisher} in a new {@code PublisherBuilder}
*/
public static PublisherBuilder<? extends Message<?>> broadcastPublisher(Publisher<? extends Message<?>> publisher,
int numberOfSubscriberBeforeConnecting) {
Multi<? extends Message<?>> broadcastPublisher = Multi.createFrom().publisher(publisher);
if (numberOfSubscriberBeforeConnecting != 0) {
return ReactiveStreams.fromPublisher(broadcastPublisher
.broadcast().toAtLeast(numberOfSubscriberBeforeConnecting));
} else {
return ReactiveStreams.fromPublisher(broadcastPublisher.broadcast().toAllSubscribers());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.smallrye.reactive.messaging.broadcast;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.reactive.messaging.annotations.Broadcast;

@ApplicationScoped
public class BeanEmitterBroadcast {
@Inject
@Broadcast
@Channel("X")
Emitter<String> emitter;

private final List<String> list = new CopyOnWriteArrayList<>();

public Emitter<String> emitter() {
return emitter;
}

public List<String> list() {
return list;
}

@Incoming("X")
public void consume(final String s) {
list.add(s);
}

public void send(String s) {
emitter.send(s);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.smallrye.reactive.messaging.broadcast;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class BeanEmitterConsumer {
private final List<String> list = new CopyOnWriteArrayList<>();

public List<String> list() {
return list;
}

@Incoming("X")
public void consume(final String s) {
list.add(s);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,24 @@ public void testBroadcast() {
assertThat(bean.l1()).containsExactly("A", "B", "C", "D").containsExactlyElementsOf(bean.l2());
}

@Test
public void testBroadcastOfEmitter() {
addBeanClass(BeanEmitterBroadcast.class, BeanEmitterConsumer.class);
initialize();

BeanEmitterBroadcast broadcastAndConsumer = get(BeanEmitterBroadcast.class);
BeanEmitterConsumer consumer = get(BeanEmitterConsumer.class);

broadcastAndConsumer.send("a");
broadcastAndConsumer.send("b");
broadcastAndConsumer.send("c");
broadcastAndConsumer.send("d");

assertThat(broadcastAndConsumer.emitter()).isNotNull();

await().until(() -> broadcastAndConsumer.list().size() == 4);
await().until(() -> consumer.list().size() == 4);

assertThat(broadcastAndConsumer.list()).containsExactly("a", "b", "c", "d").containsExactlyElementsOf(consumer.list());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -23,6 +24,7 @@
import io.reactivex.subscribers.TestSubscriber;
import io.smallrye.reactive.messaging.WeldTestBaseWithoutTails;
import io.smallrye.reactive.messaging.annotations.Merge;
import io.smallrye.reactive.messaging.extension.EmitterConfiguration;
import io.smallrye.reactive.messaging.extension.EmitterImpl;

public class EmitterInjectionTest extends WeldTestBaseWithoutTails {
Expand Down Expand Up @@ -581,7 +583,24 @@ public void consume(final String b) {

@Test // Reproduce #511
public void testWeCanHaveSeveralSubscribers() {
EmitterImpl<String> emitter = new EmitterImpl<>("my-channel", "BUFFER", 128, 128);
OnOverflow overflow = new OnOverflow() {
@Override
public Class<? extends Annotation> annotationType() {
return OnOverflow.class;
}

@Override
public Strategy value() {
return OnOverflow.Strategy.BUFFER;
}

@Override
public long bufferSize() {
return 128;
}
};
EmitterConfiguration config = new EmitterConfiguration("my-channel", overflow, null);
EmitterImpl<String> emitter = new EmitterImpl<>(config, 128);
Publisher<Message<? extends String>> publisher = emitter.getPublisher();

TestSubscriber<Message<? extends String>> sub1 = new TestSubscriber<>();
Expand Down

0 comments on commit c8e907b

Please sign in to comment.