Skip to content

Commit

Permalink
Merge pull request #563 from kenfinnigan/broadcast-emitter
Browse files Browse the repository at this point in the history
Fixes #471
  • Loading branch information
cescoffier authored Jun 1, 2020
2 parents 8152e8b + 40b31b1 commit ef3f64b
Show file tree
Hide file tree
Showing 13 changed files with 238 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package emitter;

import io.smallrye.reactive.messaging.annotations.Broadcast;
import messages.MyMetadata;
import org.eclipse.microprofile.reactive.messaging.*;

Expand Down Expand Up @@ -52,6 +53,26 @@ public void sendAsMessageWithAckAndMetadata(double d) {
}
// end::message-meta[]

// tag::broadcast[]
@Inject
@Broadcast
@Channel("prices") Emitter<Double> emitter;

public void emit(double d) {
emitter.send(d);
}

@Incoming("prices")
public void handle(double d) {
// Handle the new price
}

@Incoming("prices")
public void audit(double d) {
// Audit the price change
}
// end::broadcast[]

static class OverflowExample {
// tag::overflow[]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ This allows waiting for the complete graph to be weaved:
include::example$broadcast/BroadcastWithCountExamples.java[tag=chain]
----

== Use with Emitter

For details on how to use `@Broadcast` with `Emitter` see the xref:emitter/emitter.adoc#emitter-broadcast[documentation].
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ It is not rare to combine in a single application imperative parts (Jax-RS, regu
In these case, it's often required to send _messages_ from the imperative part to the reactive part.
In other words, send messages to channels handled by reactive messaging and how can you retrieve messages.

== @Emitter and @Channel
== Emitter and @Channel

To send _things_ (payload or `Message`) from imperative code to a specific channel you need to use:

Expand Down Expand Up @@ -113,4 +113,16 @@ IMPORTANT: You must have a `@Outgoing("my-channel")` somewhere in your applicati

Be aware that when using a channel, the messages are not acknowledged automatically and, so, you must either handle the acknowledgment beforehand or manage it manually.

[#emitter-broadcast]
== Emitter and @Broadcast

When using an `Emitter`, you can now `@Broadcast` what is emitted to all subscribers.

Here is an example of emitting a price with two methods marked `@Incoming` to receive the broadcast:

[source, java, indent=0]
----
include::example$emitter/EmitterExamples.java[tag=broadcast]
----

For more details see xref:advanced/broadcast.adoc[@Broadcast] documentation.
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.slf4j.LoggerFactory;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.connectors.WorkerPoolRegistry;
import io.smallrye.reactive.messaging.helpers.BroadcastHelper;

public abstract class AbstractMediator {

Expand Down Expand Up @@ -142,13 +141,7 @@ public PublisherBuilder<? extends Message<?>> decorate(PublisherBuilder<? extend
}

if (configuration.getBroadcast()) {
Multi<? extends Message<?>> publisher = Multi.createFrom().publisher(input.buildRs());
if (configuration.getNumberOfSubscriberBeforeConnecting() != 0) {
return ReactiveStreams.fromPublisher(publisher
.broadcast().toAtLeast(configuration.getNumberOfSubscriberBeforeConnecting()));
} else {
return ReactiveStreams.fromPublisher(publisher.broadcast().toAllSubscribers());
}
return BroadcastHelper.broadcastPublisher(input.buildRs(), configuration.getNumberOfSubscriberBeforeConnecting());
} else {
return input;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.smallrye.reactive.messaging.extension;

import org.eclipse.microprofile.reactive.messaging.OnOverflow;

import io.smallrye.reactive.messaging.annotations.Broadcast;

public class EmitterConfiguration {
public final String name;
public final OnOverflow.Strategy overflowBufferStrategy;
public final long overflowBufferSize;
public final Boolean broadcast;
public final int numberOfSubscriberBeforeConnecting;

public EmitterConfiguration(String name, OnOverflow onOverflow, Broadcast broadcast) {
this.name = name;

if (onOverflow != null) {
this.overflowBufferStrategy = onOverflow.value();
this.overflowBufferSize = onOverflow.bufferSize();
} else {
this.overflowBufferStrategy = null;
this.overflowBufferSize = -1;
}

if (broadcast != null) {
this.broadcast = Boolean.TRUE;
this.numberOfSubscriberBeforeConnecting = broadcast.value();
} else {
this.broadcast = Boolean.FALSE;
this.numberOfSubscriberBeforeConnecting = -1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.BackPressureStrategy;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.reactive.messaging.helpers.BroadcastHelper;

/**
* Implementation of the emitter pattern.
Expand All @@ -28,8 +29,9 @@ public class EmitterImpl<T> implements Emitter<T> {

private AtomicReference<Throwable> synchronousFailure = new AtomicReference<>();

public EmitterImpl(String name, String overFlowStrategy, long bufferSize, long defaultBufferSize) {
this.name = name;
@SuppressWarnings("unchecked")
public EmitterImpl(EmitterConfiguration config, long defaultBufferSize) {
this.name = config.name;
if (defaultBufferSize <= 0) {
throw new IllegalArgumentException("The default buffer size must be strictly positive");
}
Expand All @@ -41,20 +43,27 @@ public EmitterImpl(String name, String overFlowStrategy, long bufferSize, long d
}
};

if (overFlowStrategy == null) {
Multi<Message<? extends T>> tempPublisher;
if (config.overflowBufferStrategy == null) {
Multi<Message<? extends T>> multi = Multi.createFrom().emitter(deferred, BackPressureStrategy.BUFFER);
publisher = getPublisherUsingBufferStrategy(defaultBufferSize, multi);
tempPublisher = getPublisherUsingBufferStrategy(defaultBufferSize, multi);
} else {
publisher = getPublisherForStrategy(overFlowStrategy, bufferSize, defaultBufferSize, deferred);
tempPublisher = getPublisherForStrategy(config.overflowBufferStrategy, config.overflowBufferSize,
defaultBufferSize, deferred);
}

if (config.broadcast) {
publisher = (Multi<Message<? extends T>>) BroadcastHelper
.broadcastPublisher(tempPublisher, config.numberOfSubscriberBeforeConnecting).buildRs();
} else {
publisher = tempPublisher;
}
}

Multi<Message<? extends T>> getPublisherForStrategy(String overFlowStrategy, long bufferSize,
Multi<Message<? extends T>> getPublisherForStrategy(OnOverflow.Strategy overFlowStrategy, long bufferSize,
long defaultBufferSize,
Consumer<MultiEmitter<? super Message<? extends T>>> deferred) {
OnOverflow.Strategy strategy = OnOverflow.Strategy.valueOf(overFlowStrategy);

switch (strategy) {
switch (overFlowStrategy) {
case BUFFER:
Multi<Message<? extends T>> multi = Multi.createFrom().emitter(deferred, BackPressureStrategy.BUFFER);
if (bufferSize > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -26,7 +25,6 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
Expand Down Expand Up @@ -386,14 +384,10 @@ private Optional<PublisherBuilder<? extends Message<?>>> getAggregatedSource(

}

public void initializeEmitters(Map<String, OnOverflow> emitters) {
for (Map.Entry<String, OnOverflow> e : emitters.entrySet()) {
public void initializeEmitters(List<EmitterConfiguration> emitters) {
for (EmitterConfiguration config : emitters) {
int bufferSize = getDefaultBufferSize();
if (e.getValue() != null) {
initializeEmitter(e.getKey(), e.getValue().value().name(), e.getValue().bufferSize(), bufferSize);
} else {
initializeEmitter(e.getKey(), null, bufferSize, bufferSize);
}
initializeEmitter(config, bufferSize);
}
}

Expand All @@ -405,10 +399,10 @@ private int getDefaultBufferSize() {
}
}

public void initializeEmitter(String name, String overFlowStrategy, long bufferSize, long defaultBufferSize) {
EmitterImpl<?> emitter = new EmitterImpl<>(name, overFlowStrategy, bufferSize, defaultBufferSize);
public void initializeEmitter(EmitterConfiguration emitterConfiguration, long defaultBufferSize) {
EmitterImpl<?> emitter = new EmitterImpl<>(emitterConfiguration, defaultBufferSize);
Publisher<? extends Message<?>> publisher = emitter.getPublisher();
channelRegistry.register(name, ReactiveStreams.fromPublisher(publisher));
channelRegistry.register(name, emitter);
channelRegistry.register(emitterConfiguration.name, ReactiveStreams.fromPublisher(publisher));
channelRegistry.register(emitterConfiguration.name, emitter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import io.smallrye.reactive.messaging.annotations.Incomings;
import io.smallrye.reactive.messaging.connectors.WorkerPoolRegistry;

Expand Down Expand Up @@ -78,14 +79,15 @@ void afterDeploymentValidation(@Observes AfterDeploymentValidation done, BeanMan
ChannelRegistry registry = instance.select(ChannelRegistry.class)
.get();

Map<String, OnOverflow> emitters = new HashMap<>();
List<EmitterConfiguration> emitters = new ArrayList<>();
for (InjectionPoint point : emitterInjectionPoints) {
String name = ChannelProducer.getChannelName(point);
OnOverflow onOverflow = point.getAnnotated().getAnnotation(OnOverflow.class);
if (onOverflow == null) {
onOverflow = createOnOverflowForLegacyAnnotation(point);
}
emitters.put(name, onOverflow);
Broadcast broadcast = point.getAnnotated().getAnnotation(Broadcast.class);
emitters.add(new EmitterConfiguration(name, onOverflow, broadcast));
}

WorkerPoolRegistry workerPoolRegistry = instance.select(WorkerPoolRegistry.class).get();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.smallrye.reactive.messaging.helpers;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;

import io.smallrye.mutiny.Multi;

public class BroadcastHelper {

private BroadcastHelper() {
// Avoid direct instantiation.
}

/**
* <p>
* Wraps an existing {@code Publisher} for broadcasting.
* </p>
*
* @param publisher The publisher to be wrapped
* @param numberOfSubscriberBeforeConnecting Number of subscribers that must be present before broadcast occurs.
* A value of 0 means any number of subscribers will trigger the broadcast.
* @return The wrapped {@code Publisher} in a new {@code PublisherBuilder}
*/
public static PublisherBuilder<? extends Message<?>> broadcastPublisher(Publisher<? extends Message<?>> publisher,
int numberOfSubscriberBeforeConnecting) {
Multi<? extends Message<?>> broadcastPublisher = Multi.createFrom().publisher(publisher);
if (numberOfSubscriberBeforeConnecting != 0) {
return ReactiveStreams.fromPublisher(broadcastPublisher
.broadcast().toAtLeast(numberOfSubscriberBeforeConnecting));
} else {
return ReactiveStreams.fromPublisher(broadcastPublisher.broadcast().toAllSubscribers());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.smallrye.reactive.messaging.broadcast;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.reactive.messaging.annotations.Broadcast;

@ApplicationScoped
public class BeanEmitterBroadcast {
@Inject
@Broadcast
@Channel("X")
Emitter<String> emitter;

private final List<String> list = new CopyOnWriteArrayList<>();

public Emitter<String> emitter() {
return emitter;
}

public List<String> list() {
return list;
}

@Incoming("X")
public void consume(final String s) {
list.add(s);
}

public void send(String s) {
emitter.send(s);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.smallrye.reactive.messaging.broadcast;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class BeanEmitterConsumer {
private final List<String> list = new CopyOnWriteArrayList<>();

public List<String> list() {
return list;
}

@Incoming("X")
public void consume(final String s) {
list.add(s);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,24 @@ public void testBroadcast() {
assertThat(bean.l1()).containsExactly("A", "B", "C", "D").containsExactlyElementsOf(bean.l2());
}

@Test
public void testBroadcastOfEmitter() {
addBeanClass(BeanEmitterBroadcast.class, BeanEmitterConsumer.class);
initialize();

BeanEmitterBroadcast broadcastAndConsumer = get(BeanEmitterBroadcast.class);
BeanEmitterConsumer consumer = get(BeanEmitterConsumer.class);

broadcastAndConsumer.send("a");
broadcastAndConsumer.send("b");
broadcastAndConsumer.send("c");
broadcastAndConsumer.send("d");

assertThat(broadcastAndConsumer.emitter()).isNotNull();

await().until(() -> broadcastAndConsumer.list().size() == 4);
await().until(() -> consumer.list().size() == 4);

assertThat(broadcastAndConsumer.list()).containsExactly("a", "b", "c", "d").containsExactlyElementsOf(consumer.list());
}
}
Loading

0 comments on commit ef3f64b

Please sign in to comment.