diff --git a/bom/runtime/pom.xml b/bom/runtime/pom.xml index 863b28a209af9..ecbadb17f6601 100644 --- a/bom/runtime/pom.xml +++ b/bom/runtime/pom.xml @@ -43,7 +43,7 @@ 1.0.13 1.0.13 1.0.13 - 2.0.4 + 2.1.0 3.26.1 1.2.1 1.3.5 diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java index c6e5c8e10e359..60338ef8ed1b6 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java @@ -3,9 +3,11 @@ import java.util.Collection; import java.util.HashSet; import java.util.Set; +import java.util.function.Consumer; import javax.security.auth.spi.LoginModule; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.clients.consumer.RoundRobinAssignor; import org.apache.kafka.clients.consumer.StickyAssignor; @@ -42,7 +44,6 @@ import io.quarkus.arc.deployment.AdditionalBeanBuildItem; import io.quarkus.deployment.Capabilities; -import io.quarkus.deployment.Capability; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.builditem.CombinedIndexBuildItem; @@ -90,18 +91,20 @@ public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer i : BUILT_INS) { reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, i.getName())); collectSubclasses(toRegister, indexBuildItem, i); } - if (capabilities.isPresent(Capability.JSONB)) { + if (capabilities.isCapabilityPresent(Capabilities.JSONB)) { reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, JsonbSerializer.class, JsonbDeserializer.class)); collectSubclasses(toRegister, indexBuildItem, JsonbSerializer.class); collectSubclasses(toRegister, indexBuildItem, JsonbDeserializer.class); } - if (capabilities.isPresent(Capability.JACKSON)) { + if (capabilities.isCapabilityPresent(Capabilities.JACKSON)) { reflectiveClass.produce( new ReflectiveClassBuildItem(false, false, ObjectMapperSerializer.class, ObjectMapperDeserializer.class)); collectSubclasses(toRegister, indexBuildItem, ObjectMapperSerializer.class); @@ -157,14 +160,17 @@ private static void collectSubclasses(Set set, CombinedIndexBuildItem i } private static void collectClassNames(Set set, Collection classInfos) { - classInfos.forEach(c -> { - set.add(c.name()); + classInfos.forEach(new Consumer() { + @Override + public void accept(ClassInfo c) { + set.add(c.name()); + } }); } @BuildStep HealthBuildItem addHealthCheck(KafkaBuildTimeConfig buildTimeConfig) { return new HealthBuildItem("io.quarkus.kafka.client.health.KafkaHealthCheck", - buildTimeConfig.healthEnabled, "kafka"); + buildTimeConfig.healthEnabled); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/BroadcastLiteral.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/BroadcastLiteral.java new file mode 100644 index 0000000000000..fa948e51ff339 --- /dev/null +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/BroadcastLiteral.java @@ -0,0 +1,23 @@ +package io.quarkus.smallrye.reactivemessaging.deployment; + +import java.lang.annotation.Annotation; + +import io.smallrye.reactive.messaging.annotations.Broadcast; + +public class BroadcastLiteral implements Broadcast { + private final int subscribers; + + public BroadcastLiteral(int subscribers) { + this.subscribers = subscribers; + } + + @Override + public int value() { + return subscribers; + } + + @Override + public Class annotationType() { + return Broadcast.class; + } +} diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java index 6765bf263de33..a38874becd62f 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/EmitterBuildItem.java @@ -1,6 +1,7 @@ package io.quarkus.smallrye.reactivemessaging.deployment; import io.quarkus.builder.item.MultiBuildItem; +import io.smallrye.reactive.messaging.extension.EmitterConfiguration; public final class EmitterBuildItem extends MultiBuildItem { @@ -12,18 +13,8 @@ public final class EmitterBuildItem extends MultiBuildItem { * @param bufferSize the buffer size, if overflow is set to {@code BUFFER} * @return the new {@link EmitterBuildItem} */ - static EmitterBuildItem of(String name, String overflow, int bufferSize) { - return new EmitterBuildItem(name, overflow, bufferSize); - } - - /** - * Creates a new instance of {@link EmitterBuildItem} using the default overflow strategy. - * - * @param name the name of the stream - * @return the new {@link EmitterBuildItem} - */ - static EmitterBuildItem of(String name) { - return new EmitterBuildItem(name, null, -1); + static EmitterBuildItem of(String name, String overflow, int bufferSize, boolean hasBroadcast, int awaitSubscribers) { + return new EmitterBuildItem(name, overflow, bufferSize, hasBroadcast, awaitSubscribers); } /** @@ -43,22 +34,28 @@ static EmitterBuildItem of(String name) { */ private final int bufferSize; - public EmitterBuildItem(String name, String overflow, int bufferSize) { + /** + * Whether the emitter uses the {@link io.smallrye.reactive.messaging.annotations.Broadcast} annotation. + */ + private final boolean hasBroadcast; + + /** + * If the emitter uses the {@link io.smallrye.reactive.messaging.annotations.Broadcast} annotation, indicates the + * number of subscribers to be expected before subscribing upstream. + */ + private final int awaitSubscribers; + + public EmitterBuildItem(String name, String overflow, int bufferSize, boolean hasBroadcast, int awaitSubscribers) { this.name = name; this.overflow = overflow; this.bufferSize = bufferSize; + this.hasBroadcast = hasBroadcast; + this.awaitSubscribers = hasBroadcast ? awaitSubscribers : -1; } - public String getName() { - return name; - } - - public String getOverflow() { - return overflow; - } - - public int getBufferSize() { - return bufferSize; + public EmitterConfiguration getEmitterConfig() { + return new EmitterConfiguration(name, OnOverflowLiteral.create(overflow, bufferSize), + hasBroadcast ? new BroadcastLiteral(awaitSubscribers) : null); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/OnOverflowLiteral.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/OnOverflowLiteral.java new file mode 100644 index 0000000000000..d37284391d238 --- /dev/null +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/OnOverflowLiteral.java @@ -0,0 +1,35 @@ +package io.quarkus.smallrye.reactivemessaging.deployment; + +import java.lang.annotation.Annotation; + +import org.eclipse.microprofile.reactive.messaging.OnOverflow; + +public class OnOverflowLiteral implements OnOverflow { + + private final Strategy strategy; + private final long buffer; + + OnOverflowLiteral(String strategy, long buffer) { + this.strategy = strategy == null ? Strategy.BUFFER : Strategy.valueOf(strategy.toUpperCase()); + this.buffer = buffer; + } + + public static OnOverflow create(String strategy, long buffer) { + return new OnOverflowLiteral(strategy, buffer); + } + + @Override + public Strategy value() { + return strategy; + } + + @Override + public long bufferSize() { + return buffer; + } + + @Override + public Class annotationType() { + return OnOverflow.class; + } +} diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java index 1fd7a879811cc..646353c071a6b 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java @@ -8,6 +8,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Supplier; import javax.enterprise.context.Dependent; import javax.enterprise.inject.Vetoed; @@ -68,7 +69,7 @@ import io.smallrye.reactive.messaging.annotations.Blocking; /** - * + * */ public class SmallRyeReactiveMessagingProcessor { @@ -86,7 +87,8 @@ FeatureBuildItem feature() { AdditionalBeanBuildItem beans() { // We add the connector and channel qualifiers to make them part of the index. return new AdditionalBeanBuildItem(SmallRyeReactiveMessagingLifecycle.class, Connector.class, - Channel.class, io.smallrye.reactive.messaging.annotations.Channel.class, QuarkusWorkerPoolRegistry.class); + Channel.class, io.smallrye.reactive.messaging.annotations.Channel.class, + QuarkusWorkerPoolRegistry.class); } @BuildStep @@ -163,6 +165,19 @@ void validateBeanDeployment( for (InjectionPointInfo injectionPoint : validationPhase.getContext() .get(BuildExtension.Key.INJECTION_POINTS)) { + + Optional broadcast = annotationStore.getAnnotations(injectionPoint.getTarget()) + .stream() + .filter(ai -> ReactiveMessagingDotNames.BROADCAST.equals(ai.name())) + .filter(ai -> { + if (ai.target().kind() == AnnotationTarget.Kind.METHOD_PARAMETER && injectionPoint + .isParam()) { + return ai.target().asMethodParameter().position() == injectionPoint.getPosition(); + } + return true; + }) + .findAny(); + // New emitter from the spec. if (injectionPoint.getRequiredType().name().equals( ReactiveMessagingDotNames.EMITTER)) { @@ -187,7 +202,7 @@ void validateBeanDeployment( return true; }) .findAny(); - createEmitter(emitters, injectionPoint, channelName, overflow); + createEmitter(emitters, injectionPoint, channelName, overflow, broadcast); } } @@ -215,7 +230,8 @@ void validateBeanDeployment( return true; }) .findAny(); - createEmitter(emitters, injectionPoint, channelName, overflow); + + createEmitter(emitters, injectionPoint, channelName, overflow, broadcast); } } } @@ -224,18 +240,30 @@ void validateBeanDeployment( @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private void createEmitter(BuildProducer emitters, InjectionPointInfo injectionPoint, String channelName, - Optional overflow) { + Optional overflow, + Optional broadcast) { LOGGER.debugf("Emitter injection point '%s' detected, channel name: '%s'", injectionPoint.getTargetInfo(), channelName); + + boolean hasBroadcast = false; + int awaitSubscribers = -1; + int bufferSize = -1; + String strategy = null; + if (broadcast.isPresent()) { + hasBroadcast = true; + AnnotationValue value = broadcast.get().value(); + awaitSubscribers = value == null ? 0 : value.asInt(); + } + if (overflow.isPresent()) { AnnotationInstance annotation = overflow.get(); AnnotationValue maybeBufferSize = annotation.value("bufferSize"); - int bufferSize = maybeBufferSize != null ? maybeBufferSize.asInt() : 0; - emitters.produce( - EmitterBuildItem.of(channelName, annotation.value().asString(), bufferSize)); - } else { - emitters.produce(EmitterBuildItem.of(channelName)); + bufferSize = maybeBufferSize == null ? 0 : maybeBufferSize.asInt(); + strategy = annotation.value().asString(); } + + emitters.produce( + EmitterBuildItem.of(channelName, strategy, bufferSize, hasBroadcast, awaitSubscribers)); } @BuildStep @@ -327,16 +355,15 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re for (EmitterBuildItem it : emitterFields) { Config config = ConfigProvider.getConfig(); int defaultBufferSize = config.getOptionalValue("mp.messaging.emitter.default-buffer-size", Integer.class) - .orElseGet(() -> config - .getOptionalValue("smallrye.messaging.emitter.default-buffer-size", Integer.class) - .orElse(127)); - if (it.getOverflow() != null) { - recorder.configureEmitter(beanContainer.getValue(), it.getName(), it.getOverflow(), - it.getBufferSize(), - defaultBufferSize); - } else { - recorder.configureEmitter(beanContainer.getValue(), it.getName(), null, 0, defaultBufferSize); - } + .orElseGet(new Supplier() { + @Override + public Integer get() { + return config + .getOptionalValue("smallrye.messaging.emitter.default-buffer-size", Integer.class) + .orElse(127); + } + }); + recorder.configureEmitter(beanContainer.getValue(), it.getEmitterConfig(), defaultBufferSize); } } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/ChannelEmitterWithOverflowAndBroadcast.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/ChannelEmitterWithOverflowAndBroadcast.java new file mode 100644 index 0000000000000..87b592793c757 --- /dev/null +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/ChannelEmitterWithOverflowAndBroadcast.java @@ -0,0 +1,112 @@ +package io.quarkus.smallrye.reactivemessaging; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.OnOverflow; + +import io.smallrye.reactive.messaging.annotations.Broadcast; + +@ApplicationScoped +public class ChannelEmitterWithOverflowAndBroadcast { + + @Inject + @Channel("sink") + @OnOverflow(value = OnOverflow.Strategy.BUFFER) + @Broadcast + Emitter emitter; + + private Emitter emitterForSink2; + private Emitter emitterForSink1; + + @Inject + public void setEmitter(@Channel("sink-1") @Broadcast Emitter sink1, + @Channel("sink-2") @Broadcast @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 4) Emitter sink2) { + this.emitterForSink1 = sink1; + this.emitterForSink2 = sink2; + } + + private List list = new CopyOnWriteArrayList<>(); + private List sink1 = new CopyOnWriteArrayList<>(); + private List sink2 = new CopyOnWriteArrayList<>(); + + private List list2 = new CopyOnWriteArrayList<>(); + private List sink12 = new CopyOnWriteArrayList<>(); + private List sink22 = new CopyOnWriteArrayList<>(); + + public void run() { + emitter.send("a"); + emitter.send("b"); + emitter.send("c").toCompletableFuture().join(); + emitter.complete(); + emitterForSink1.send("a1"); + emitterForSink1.send("b1").toCompletableFuture().join(); + emitterForSink1.send("c1"); + emitterForSink1.complete(); + emitterForSink2.send("a2").toCompletableFuture().join(); + emitterForSink2.send("b2"); + emitterForSink2.send("c2"); + emitterForSink2.complete(); + } + + @Incoming("sink") + public void consume1(String s) { + list.add(s); + } + + @Incoming("sink") + public void consume2(String s) { + list2.add(s); + } + + @Incoming("sink-1") + public void sink11(String s) { + sink1.add(s); + } + + @Incoming("sink-1") + public void sink12(String s) { + sink12.add(s); + } + + @Incoming("sink-2") + public void sink21(String s) { + sink2.add(s); + } + + @Incoming("sink-2") + public void sink22(String s) { + sink22.add(s); + } + + public List list() { + return list; + } + + public List list2() { + return list2; + } + + public List sink11() { + return sink1; + } + + public List sink12() { + return sink12; + } + + public List sink21() { + return sink2; + } + + public List sink22() { + return sink22; + } + +} diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastAndSubscriberExample.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastAndSubscriberExample.java new file mode 100644 index 0000000000000..f6e268d74c8d7 --- /dev/null +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastAndSubscriberExample.java @@ -0,0 +1,51 @@ +package io.quarkus.smallrye.reactivemessaging; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; + +import io.smallrye.reactive.messaging.annotations.Broadcast; + +@ApplicationScoped +public class EmitterWithBroadcastAndSubscriberExample { + + @Inject + @Channel("sink") + @Broadcast(2) + Emitter emitter; + + private List list = new CopyOnWriteArrayList<>(); + private List list2 = new CopyOnWriteArrayList<>(); + + public void run() { + emitter.send("a"); + emitter.send("b"); + emitter.send("c"); + emitter.complete(); + } + + @Incoming("sink") + public void consume(String s) { + list.add(s); + } + + @Incoming("sink") + public void consume2(String s) { + list2.add(s); + } + + public List list() { + return list; + } + + public List list2() { + return list2; + } + +} diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastAndSubscriberTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastAndSubscriberTest.java new file mode 100644 index 0000000000000..0d37dc879bf0b --- /dev/null +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastAndSubscriberTest.java @@ -0,0 +1,42 @@ +package io.quarkus.smallrye.reactivemessaging; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.List; + +import javax.inject.Inject; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; + +public class EmitterWithBroadcastAndSubscriberTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(EmitterWithBroadcastAndSubscriberExample.class)); + + @Inject + EmitterWithBroadcastAndSubscriberExample bean; + + @Test + public void testEmitter() { + bean.run(); + List list = bean.list(); + assertEquals(3, list.size()); + assertEquals("a", list.get(0)); + assertEquals("b", list.get(1)); + assertEquals("c", list.get(2)); + + List list2 = bean.list2(); + assertEquals(3, list2.size()); + assertEquals("a", list2.get(0)); + assertEquals("b", list2.get(1)); + assertEquals("c", list2.get(2)); + } + +} diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastExample.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastExample.java new file mode 100644 index 0000000000000..2400c889b61cc --- /dev/null +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastExample.java @@ -0,0 +1,51 @@ +package io.quarkus.smallrye.reactivemessaging; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; + +import io.smallrye.reactive.messaging.annotations.Broadcast; + +@ApplicationScoped +public class EmitterWithBroadcastExample { + + @Inject + @Channel("sink") + @Broadcast + Emitter emitter; + + private List list = new CopyOnWriteArrayList<>(); + private List list2 = new CopyOnWriteArrayList<>(); + + public void run() { + emitter.send("a"); + emitter.send("b"); + emitter.send("c"); + emitter.complete(); + } + + @Incoming("sink") + public void consume(String s) { + list.add(s); + } + + @Incoming("sink") + public void consume2(String s) { + list2.add(s); + } + + public List list() { + return list; + } + + public List list2() { + return list2; + } + +} diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastTest.java new file mode 100644 index 0000000000000..12961ac1b37db --- /dev/null +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithBroadcastTest.java @@ -0,0 +1,42 @@ +package io.quarkus.smallrye.reactivemessaging; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.List; + +import javax.inject.Inject; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; + +public class EmitterWithBroadcastTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(EmitterWithBroadcastExample.class)); + + @Inject + EmitterWithBroadcastExample bean; + + @Test + public void testEmitter() { + bean.run(); + List list = bean.list(); + assertEquals(3, list.size()); + assertEquals("a", list.get(0)); + assertEquals("b", list.get(1)); + assertEquals("c", list.get(2)); + + List list2 = bean.list2(); + assertEquals(3, list2.size()); + assertEquals("a", list2.get(0)); + assertEquals("b", list2.get(1)); + assertEquals("c", list2.get(2)); + } + +} diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithOverflowAndBroadcastTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithOverflowAndBroadcastTest.java new file mode 100644 index 0000000000000..f5c6e86c30df0 --- /dev/null +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/EmitterWithOverflowAndBroadcastTest.java @@ -0,0 +1,66 @@ +package io.quarkus.smallrye.reactivemessaging; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.List; + +import javax.inject.Inject; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; + +public class EmitterWithOverflowAndBroadcastTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(ChannelEmitterWithOverflowAndBroadcast.class)); + + @Inject + ChannelEmitterWithOverflowAndBroadcast bean; + + @Test + public void testEmitter() { + bean.run(); + List list = bean.list(); + assertEquals(3, list.size()); + assertEquals("a", list.get(0)); + assertEquals("b", list.get(1)); + assertEquals("c", list.get(2)); + + List list2 = bean.list2(); + assertEquals(3, list2.size()); + assertEquals("a", list2.get(0)); + assertEquals("b", list2.get(1)); + assertEquals("c", list2.get(2)); + + List sink1 = bean.sink11(); + assertEquals(3, sink1.size()); + assertEquals("a1", sink1.get(0)); + assertEquals("b1", sink1.get(1)); + assertEquals("c1", sink1.get(2)); + + List sink2 = bean.sink12(); + assertEquals(3, sink2.size()); + assertEquals("a1", sink2.get(0)); + assertEquals("b1", sink2.get(1)); + assertEquals("c1", sink2.get(2)); + + List sink3 = bean.sink21(); + assertEquals(3, sink3.size()); + assertEquals("a2", sink3.get(0)); + assertEquals("b2", sink3.get(1)); + assertEquals("c2", sink3.get(2)); + + List sink4 = bean.sink22(); + assertEquals(3, sink4.size()); + assertEquals("a2", sink4.get(0)); + assertEquals("b2", sink4.get(1)); + assertEquals("c2", sink4.get(2)); + } + +} diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingRecorder.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingRecorder.java index ebcdcdf51c468..e590e99dd0658 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingRecorder.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingRecorder.java @@ -4,6 +4,7 @@ import io.quarkus.arc.runtime.BeanContainer; import io.quarkus.runtime.annotations.Recorder; +import io.smallrye.reactive.messaging.extension.EmitterConfiguration; import io.smallrye.reactive.messaging.extension.MediatorManager; /** @@ -12,10 +13,9 @@ @Recorder public class SmallRyeReactiveMessagingRecorder { - public void configureEmitter(BeanContainer container, String name, String strategy, int bufferSize, - int defaultBufferSize) { + public void configureEmitter(BeanContainer container, EmitterConfiguration ec, long defaultBufferSize) { MediatorManager mediatorManager = container.instance(MediatorManager.class); - mediatorManager.initializeEmitter(name, strategy, bufferSize, defaultBufferSize); + mediatorManager.initializeEmitter(ec, defaultBufferSize); } public void registerMediators(List configurations, BeanContainer container) {