diff --git a/bom/runtime/pom.xml b/bom/runtime/pom.xml
index 863b28a209af9..ecbadb17f6601 100644
--- a/bom/runtime/pom.xml
+++ b/bom/runtime/pom.xml
@@ -43,7 +43,7 @@
1.0.13
1.0.13
1.0.13
- 2.0.4
+ 2.1.0
3.26.1
1.2.1
1.3.5
diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java
index c6e5c8e10e359..60338ef8ed1b6 100644
--- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java
+++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java
@@ -3,9 +3,11 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import java.util.function.Consumer;
import javax.security.auth.spi.LoginModule;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.consumer.StickyAssignor;
@@ -42,7 +44,6 @@
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.Capabilities;
-import io.quarkus.deployment.Capability;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
@@ -90,18 +91,20 @@ public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer i : BUILT_INS) {
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, i.getName()));
collectSubclasses(toRegister, indexBuildItem, i);
}
- if (capabilities.isPresent(Capability.JSONB)) {
+ if (capabilities.isCapabilityPresent(Capabilities.JSONB)) {
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, JsonbSerializer.class, JsonbDeserializer.class));
collectSubclasses(toRegister, indexBuildItem, JsonbSerializer.class);
collectSubclasses(toRegister, indexBuildItem, JsonbDeserializer.class);
}
- if (capabilities.isPresent(Capability.JACKSON)) {
+ if (capabilities.isCapabilityPresent(Capabilities.JACKSON)) {
reflectiveClass.produce(
new ReflectiveClassBuildItem(false, false, ObjectMapperSerializer.class, ObjectMapperDeserializer.class));
collectSubclasses(toRegister, indexBuildItem, ObjectMapperSerializer.class);
@@ -157,14 +160,17 @@ private static void collectSubclasses(Set set, CombinedIndexBuildItem i
}
private static void collectClassNames(Set set, Collection classInfos) {
- classInfos.forEach(c -> {
- set.add(c.name());
+ classInfos.forEach(new Consumer() {
+ @Override
+ public void accept(ClassInfo c) {
+ set.add(c.name());
+ }
});
}
@BuildStep
HealthBuildItem addHealthCheck(KafkaBuildTimeConfig buildTimeConfig) {
return new HealthBuildItem("io.quarkus.kafka.client.health.KafkaHealthCheck",
- buildTimeConfig.healthEnabled, "kafka");
+ buildTimeConfig.healthEnabled);
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/BroadcastLiteral.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/BroadcastLiteral.java
new file mode 100644
index 0000000000000..fa948e51ff339
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/BroadcastLiteral.java
@@ -0,0 +1,23 @@
+package io.quarkus.smallrye.reactivemessaging.deployment;
+
+import java.lang.annotation.Annotation;
+
+import io.smallrye.reactive.messaging.annotations.Broadcast;
+
+public class BroadcastLiteral implements Broadcast {
+ private final int subscribers;
+
+ public BroadcastLiteral(int subscribers) {
+ this.subscribers = subscribers;
+ }
+
+ @Override
+ public int value() {
+ return subscribers;
+ }
+
+ @Override
+ public Class extends Annotation> annotationType() {
+ return Broadcast.class;
+ }
+}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java
index 6765bf263de33..a38874becd62f 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java
@@ -1,6 +1,7 @@
package io.quarkus.smallrye.reactivemessaging.deployment;
import io.quarkus.builder.item.MultiBuildItem;
+import io.smallrye.reactive.messaging.extension.EmitterConfiguration;
public final class EmitterBuildItem extends MultiBuildItem {
@@ -12,18 +13,8 @@ public final class EmitterBuildItem extends MultiBuildItem {
* @param bufferSize the buffer size, if overflow is set to {@code BUFFER}
* @return the new {@link EmitterBuildItem}
*/
- static EmitterBuildItem of(String name, String overflow, int bufferSize) {
- return new EmitterBuildItem(name, overflow, bufferSize);
- }
-
- /**
- * Creates a new instance of {@link EmitterBuildItem} using the default overflow strategy.
- *
- * @param name the name of the stream
- * @return the new {@link EmitterBuildItem}
- */
- static EmitterBuildItem of(String name) {
- return new EmitterBuildItem(name, null, -1);
+ static EmitterBuildItem of(String name, String overflow, int bufferSize, boolean hasBroadcast, int awaitSubscribers) {
+ return new EmitterBuildItem(name, overflow, bufferSize, hasBroadcast, awaitSubscribers);
}
/**
@@ -43,22 +34,28 @@ static EmitterBuildItem of(String name) {
*/
private final int bufferSize;
- public EmitterBuildItem(String name, String overflow, int bufferSize) {
+ /**
+ * Whether the emitter uses the {@link io.smallrye.reactive.messaging.annotations.Broadcast} annotation.
+ */
+ private final boolean hasBroadcast;
+
+ /**
+ * If the emitter uses the {@link io.smallrye.reactive.messaging.annotations.Broadcast} annotation, indicates the
+ * number of subscribers to be expected before subscribing upstream.
+ */
+ private final int awaitSubscribers;
+
+ public EmitterBuildItem(String name, String overflow, int bufferSize, boolean hasBroadcast, int awaitSubscribers) {
this.name = name;
this.overflow = overflow;
this.bufferSize = bufferSize;
+ this.hasBroadcast = hasBroadcast;
+ this.awaitSubscribers = hasBroadcast ? awaitSubscribers : -1;
}
- public String getName() {
- return name;
- }
-
- public String getOverflow() {
- return overflow;
- }
-
- public int getBufferSize() {
- return bufferSize;
+ public EmitterConfiguration getEmitterConfig() {
+ return new EmitterConfiguration(name, OnOverflowLiteral.create(overflow, bufferSize),
+ hasBroadcast ? new BroadcastLiteral(awaitSubscribers) : null);
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/OnOverflowLiteral.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/OnOverflowLiteral.java
new file mode 100644
index 0000000000000..d37284391d238
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/OnOverflowLiteral.java
@@ -0,0 +1,35 @@
+package io.quarkus.smallrye.reactivemessaging.deployment;
+
+import java.lang.annotation.Annotation;
+
+import org.eclipse.microprofile.reactive.messaging.OnOverflow;
+
+public class OnOverflowLiteral implements OnOverflow {
+
+ private final Strategy strategy;
+ private final long buffer;
+
+ OnOverflowLiteral(String strategy, long buffer) {
+ this.strategy = strategy == null ? Strategy.BUFFER : Strategy.valueOf(strategy.toUpperCase());
+ this.buffer = buffer;
+ }
+
+ public static OnOverflow create(String strategy, long buffer) {
+ return new OnOverflowLiteral(strategy, buffer);
+ }
+
+ @Override
+ public Strategy value() {
+ return strategy;
+ }
+
+ @Override
+ public long bufferSize() {
+ return buffer;
+ }
+
+ @Override
+ public Class extends Annotation> annotationType() {
+ return OnOverflow.class;
+ }
+}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
index 1fd7a879811cc..646353c071a6b 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
@@ -8,6 +8,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Supplier;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Vetoed;
@@ -68,7 +69,7 @@
import io.smallrye.reactive.messaging.annotations.Blocking;
/**
- *
+ *
*/
public class SmallRyeReactiveMessagingProcessor {
@@ -86,7 +87,8 @@ FeatureBuildItem feature() {
AdditionalBeanBuildItem beans() {
// We add the connector and channel qualifiers to make them part of the index.
return new AdditionalBeanBuildItem(SmallRyeReactiveMessagingLifecycle.class, Connector.class,
- Channel.class, io.smallrye.reactive.messaging.annotations.Channel.class, QuarkusWorkerPoolRegistry.class);
+ Channel.class, io.smallrye.reactive.messaging.annotations.Channel.class,
+ QuarkusWorkerPoolRegistry.class);
}
@BuildStep
@@ -163,6 +165,19 @@ void validateBeanDeployment(
for (InjectionPointInfo injectionPoint : validationPhase.getContext()
.get(BuildExtension.Key.INJECTION_POINTS)) {
+
+ Optional broadcast = annotationStore.getAnnotations(injectionPoint.getTarget())
+ .stream()
+ .filter(ai -> ReactiveMessagingDotNames.BROADCAST.equals(ai.name()))
+ .filter(ai -> {
+ if (ai.target().kind() == AnnotationTarget.Kind.METHOD_PARAMETER && injectionPoint
+ .isParam()) {
+ return ai.target().asMethodParameter().position() == injectionPoint.getPosition();
+ }
+ return true;
+ })
+ .findAny();
+
// New emitter from the spec.
if (injectionPoint.getRequiredType().name().equals(
ReactiveMessagingDotNames.EMITTER)) {
@@ -187,7 +202,7 @@ void validateBeanDeployment(
return true;
})
.findAny();
- createEmitter(emitters, injectionPoint, channelName, overflow);
+ createEmitter(emitters, injectionPoint, channelName, overflow, broadcast);
}
}
@@ -215,7 +230,8 @@ void validateBeanDeployment(
return true;
})
.findAny();
- createEmitter(emitters, injectionPoint, channelName, overflow);
+
+ createEmitter(emitters, injectionPoint, channelName, overflow, broadcast);
}
}
}
@@ -224,18 +240,30 @@ void validateBeanDeployment(
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private void createEmitter(BuildProducer emitters, InjectionPointInfo injectionPoint,
String channelName,
- Optional overflow) {
+ Optional overflow,
+ Optional broadcast) {
LOGGER.debugf("Emitter injection point '%s' detected, channel name: '%s'",
injectionPoint.getTargetInfo(), channelName);
+
+ boolean hasBroadcast = false;
+ int awaitSubscribers = -1;
+ int bufferSize = -1;
+ String strategy = null;
+ if (broadcast.isPresent()) {
+ hasBroadcast = true;
+ AnnotationValue value = broadcast.get().value();
+ awaitSubscribers = value == null ? 0 : value.asInt();
+ }
+
if (overflow.isPresent()) {
AnnotationInstance annotation = overflow.get();
AnnotationValue maybeBufferSize = annotation.value("bufferSize");
- int bufferSize = maybeBufferSize != null ? maybeBufferSize.asInt() : 0;
- emitters.produce(
- EmitterBuildItem.of(channelName, annotation.value().asString(), bufferSize));
- } else {
- emitters.produce(EmitterBuildItem.of(channelName));
+ bufferSize = maybeBufferSize == null ? 0 : maybeBufferSize.asInt();
+ strategy = annotation.value().asString();
}
+
+ emitters.produce(
+ EmitterBuildItem.of(channelName, strategy, bufferSize, hasBroadcast, awaitSubscribers));
}
@BuildStep
@@ -327,16 +355,15 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
for (EmitterBuildItem it : emitterFields) {
Config config = ConfigProvider.getConfig();
int defaultBufferSize = config.getOptionalValue("mp.messaging.emitter.default-buffer-size", Integer.class)
- .orElseGet(() -> config
- .getOptionalValue("smallrye.messaging.emitter.default-buffer-size", Integer.class)
- .orElse(127));
- if (it.getOverflow() != null) {
- recorder.configureEmitter(beanContainer.getValue(), it.getName(), it.getOverflow(),
- it.getBufferSize(),
- defaultBufferSize);
- } else {
- recorder.configureEmitter(beanContainer.getValue(), it.getName(), null, 0, defaultBufferSize);
- }
+ .orElseGet(new Supplier() {
+ @Override
+ public Integer get() {
+ return config
+ .getOptionalValue("smallrye.messaging.emitter.default-buffer-size", Integer.class)
+ .orElse(127);
+ }
+ });
+ recorder.configureEmitter(beanContainer.getValue(), it.getEmitterConfig(), defaultBufferSize);
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/ChannelEmitterWithOverflowAndBroadcast.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/ChannelEmitterWithOverflowAndBroadcast.java
new file mode 100644
index 0000000000000..87b592793c757
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/ChannelEmitterWithOverflowAndBroadcast.java
@@ -0,0 +1,112 @@
+package io.quarkus.smallrye.reactivemessaging;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Emitter;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.OnOverflow;
+
+import io.smallrye.reactive.messaging.annotations.Broadcast;
+
+@ApplicationScoped
+public class ChannelEmitterWithOverflowAndBroadcast {
+
+ @Inject
+ @Channel("sink")
+ @OnOverflow(value = OnOverflow.Strategy.BUFFER)
+ @Broadcast
+ Emitter emitter;
+
+ private Emitter emitterForSink2;
+ private Emitter emitterForSink1;
+
+ @Inject
+ public void setEmitter(@Channel("sink-1") @Broadcast Emitter sink1,
+ @Channel("sink-2") @Broadcast @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 4) Emitter sink2) {
+ this.emitterForSink1 = sink1;
+ this.emitterForSink2 = sink2;
+ }
+
+ private List list = new CopyOnWriteArrayList<>();
+ private List sink1 = new CopyOnWriteArrayList<>();
+ private List sink2 = new CopyOnWriteArrayList<>();
+
+ private List list2 = new CopyOnWriteArrayList<>();
+ private List sink12 = new CopyOnWriteArrayList<>();
+ private List sink22 = new CopyOnWriteArrayList<>();
+
+ public void run() {
+ emitter.send("a");
+ emitter.send("b");
+ emitter.send("c").toCompletableFuture().join();
+ emitter.complete();
+ emitterForSink1.send("a1");
+ emitterForSink1.send("b1").toCompletableFuture().join();
+ emitterForSink1.send("c1");
+ emitterForSink1.complete();
+ emitterForSink2.send("a2").toCompletableFuture().join();
+ emitterForSink2.send("b2");
+ emitterForSink2.send("c2");
+ emitterForSink2.complete();
+ }
+
+ @Incoming("sink")
+ public void consume1(String s) {
+ list.add(s);
+ }
+
+ @Incoming("sink")
+ public void consume2(String s) {
+ list2.add(s);
+ }
+
+ @Incoming("sink-1")
+ public void sink11(String s) {
+ sink1.add(s);
+ }
+
+ @Incoming("sink-1")
+ public void sink12(String s) {
+ sink12.add(s);
+ }
+
+ @Incoming("sink-2")
+ public void sink21(String s) {
+ sink2.add(s);
+ }
+
+ @Incoming("sink-2")
+ public void sink22(String s) {
+ sink22.add(s);
+ }
+
+ public List list() {
+ return list;
+ }
+
+ public List list2() {
+ return list2;
+ }
+
+ public List sink11() {
+ return sink1;
+ }
+
+ public List sink12() {
+ return sink12;
+ }
+
+ public List sink21() {
+ return sink2;
+ }
+
+ public List sink22() {
+ return sink22;
+ }
+
+}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastAndSubscriberExample.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastAndSubscriberExample.java
new file mode 100644
index 0000000000000..f6e268d74c8d7
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastAndSubscriberExample.java
@@ -0,0 +1,51 @@
+package io.quarkus.smallrye.reactivemessaging;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Emitter;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+
+import io.smallrye.reactive.messaging.annotations.Broadcast;
+
+@ApplicationScoped
+public class EmitterWithBroadcastAndSubscriberExample {
+
+ @Inject
+ @Channel("sink")
+ @Broadcast(2)
+ Emitter emitter;
+
+ private List list = new CopyOnWriteArrayList<>();
+ private List list2 = new CopyOnWriteArrayList<>();
+
+ public void run() {
+ emitter.send("a");
+ emitter.send("b");
+ emitter.send("c");
+ emitter.complete();
+ }
+
+ @Incoming("sink")
+ public void consume(String s) {
+ list.add(s);
+ }
+
+ @Incoming("sink")
+ public void consume2(String s) {
+ list2.add(s);
+ }
+
+ public List list() {
+ return list;
+ }
+
+ public List list2() {
+ return list2;
+ }
+
+}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastAndSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastAndSubscriberTest.java
new file mode 100644
index 0000000000000..0d37dc879bf0b
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastAndSubscriberTest.java
@@ -0,0 +1,42 @@
+package io.quarkus.smallrye.reactivemessaging;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.List;
+
+import javax.inject.Inject;
+
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.spec.JavaArchive;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.test.QuarkusUnitTest;
+
+public class EmitterWithBroadcastAndSubscriberTest {
+
+ @RegisterExtension
+ static final QuarkusUnitTest config = new QuarkusUnitTest()
+ .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
+ .addClasses(EmitterWithBroadcastAndSubscriberExample.class));
+
+ @Inject
+ EmitterWithBroadcastAndSubscriberExample bean;
+
+ @Test
+ public void testEmitter() {
+ bean.run();
+ List list = bean.list();
+ assertEquals(3, list.size());
+ assertEquals("a", list.get(0));
+ assertEquals("b", list.get(1));
+ assertEquals("c", list.get(2));
+
+ List list2 = bean.list2();
+ assertEquals(3, list2.size());
+ assertEquals("a", list2.get(0));
+ assertEquals("b", list2.get(1));
+ assertEquals("c", list2.get(2));
+ }
+
+}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastExample.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastExample.java
new file mode 100644
index 0000000000000..2400c889b61cc
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastExample.java
@@ -0,0 +1,51 @@
+package io.quarkus.smallrye.reactivemessaging;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Emitter;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+
+import io.smallrye.reactive.messaging.annotations.Broadcast;
+
+@ApplicationScoped
+public class EmitterWithBroadcastExample {
+
+ @Inject
+ @Channel("sink")
+ @Broadcast
+ Emitter emitter;
+
+ private List list = new CopyOnWriteArrayList<>();
+ private List list2 = new CopyOnWriteArrayList<>();
+
+ public void run() {
+ emitter.send("a");
+ emitter.send("b");
+ emitter.send("c");
+ emitter.complete();
+ }
+
+ @Incoming("sink")
+ public void consume(String s) {
+ list.add(s);
+ }
+
+ @Incoming("sink")
+ public void consume2(String s) {
+ list2.add(s);
+ }
+
+ public List list() {
+ return list;
+ }
+
+ public List list2() {
+ return list2;
+ }
+
+}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastTest.java
new file mode 100644
index 0000000000000..12961ac1b37db
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastTest.java
@@ -0,0 +1,42 @@
+package io.quarkus.smallrye.reactivemessaging;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.List;
+
+import javax.inject.Inject;
+
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.spec.JavaArchive;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.test.QuarkusUnitTest;
+
+public class EmitterWithBroadcastTest {
+
+ @RegisterExtension
+ static final QuarkusUnitTest config = new QuarkusUnitTest()
+ .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
+ .addClasses(EmitterWithBroadcastExample.class));
+
+ @Inject
+ EmitterWithBroadcastExample bean;
+
+ @Test
+ public void testEmitter() {
+ bean.run();
+ List list = bean.list();
+ assertEquals(3, list.size());
+ assertEquals("a", list.get(0));
+ assertEquals("b", list.get(1));
+ assertEquals("c", list.get(2));
+
+ List list2 = bean.list2();
+ assertEquals(3, list2.size());
+ assertEquals("a", list2.get(0));
+ assertEquals("b", list2.get(1));
+ assertEquals("c", list2.get(2));
+ }
+
+}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithOverflowAndBroadcastTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithOverflowAndBroadcastTest.java
new file mode 100644
index 0000000000000..f5c6e86c30df0
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithOverflowAndBroadcastTest.java
@@ -0,0 +1,66 @@
+package io.quarkus.smallrye.reactivemessaging;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.List;
+
+import javax.inject.Inject;
+
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.spec.JavaArchive;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.test.QuarkusUnitTest;
+
+public class EmitterWithOverflowAndBroadcastTest {
+
+ @RegisterExtension
+ static final QuarkusUnitTest config = new QuarkusUnitTest()
+ .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
+ .addClasses(ChannelEmitterWithOverflowAndBroadcast.class));
+
+ @Inject
+ ChannelEmitterWithOverflowAndBroadcast bean;
+
+ @Test
+ public void testEmitter() {
+ bean.run();
+ List list = bean.list();
+ assertEquals(3, list.size());
+ assertEquals("a", list.get(0));
+ assertEquals("b", list.get(1));
+ assertEquals("c", list.get(2));
+
+ List list2 = bean.list2();
+ assertEquals(3, list2.size());
+ assertEquals("a", list2.get(0));
+ assertEquals("b", list2.get(1));
+ assertEquals("c", list2.get(2));
+
+ List sink1 = bean.sink11();
+ assertEquals(3, sink1.size());
+ assertEquals("a1", sink1.get(0));
+ assertEquals("b1", sink1.get(1));
+ assertEquals("c1", sink1.get(2));
+
+ List sink2 = bean.sink12();
+ assertEquals(3, sink2.size());
+ assertEquals("a1", sink2.get(0));
+ assertEquals("b1", sink2.get(1));
+ assertEquals("c1", sink2.get(2));
+
+ List sink3 = bean.sink21();
+ assertEquals(3, sink3.size());
+ assertEquals("a2", sink3.get(0));
+ assertEquals("b2", sink3.get(1));
+ assertEquals("c2", sink3.get(2));
+
+ List sink4 = bean.sink22();
+ assertEquals(3, sink4.size());
+ assertEquals("a2", sink4.get(0));
+ assertEquals("b2", sink4.get(1));
+ assertEquals("c2", sink4.get(2));
+ }
+
+}
diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingRecorder.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingRecorder.java
index ebcdcdf51c468..e590e99dd0658 100644
--- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingRecorder.java
+++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingRecorder.java
@@ -4,6 +4,7 @@
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.runtime.annotations.Recorder;
+import io.smallrye.reactive.messaging.extension.EmitterConfiguration;
import io.smallrye.reactive.messaging.extension.MediatorManager;
/**
@@ -12,10 +13,9 @@
@Recorder
public class SmallRyeReactiveMessagingRecorder {
- public void configureEmitter(BeanContainer container, String name, String strategy, int bufferSize,
- int defaultBufferSize) {
+ public void configureEmitter(BeanContainer container, EmitterConfiguration ec, long defaultBufferSize) {
MediatorManager mediatorManager = container.instance(MediatorManager.class);
- mediatorManager.initializeEmitter(name, strategy, bufferSize, defaultBufferSize);
+ mediatorManager.initializeEmitter(ec, defaultBufferSize);
}
public void registerMediators(List configurations, BeanContainer container) {