diff --git a/integration-tests/reactive-messaging-amqp/src/main/java/io/quarkus/it/amqp/PeopleProducer.java b/integration-tests/reactive-messaging-amqp/src/main/java/io/quarkus/it/amqp/PeopleProducer.java index 6a9b4f65fff5c..719d42d32aa27 100644 --- a/integration-tests/reactive-messaging-amqp/src/main/java/io/quarkus/it/amqp/PeopleProducer.java +++ b/integration-tests/reactive-messaging-amqp/src/main/java/io/quarkus/it/amqp/PeopleProducer.java @@ -1,10 +1,15 @@ package io.quarkus.it.amqp; +import java.time.Duration; +import java.util.function.Consumer; + import javax.enterprise.context.ApplicationScoped; import org.eclipse.microprofile.reactive.messaging.Outgoing; import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.MultiEmitter; import io.vertx.core.json.Json; @ApplicationScoped @@ -12,13 +17,27 @@ public class PeopleProducer { @Outgoing("people-out") public Multi generatePeople() { - return Multi.createFrom().items( - new Person("bob"), - new Person("alice"), - new Person("tom"), - new Person("jerry"), - new Person("anna"), - new Person("ken")) - .map(Json::encode); + //TODO: this can be replaced with Multi.onItem().delayIt when it exists + //TODO: this delay should not even be necessary, the queue is created on + //subscriber connect, so we delay to make sure it is connected + //we should be able to just define the queue in broker.xml, but that does not + //work atm, see https://github.com/smallrye/smallrye-reactive-messaging/issues/555 + return Multi.createFrom().emitter(new Consumer>() { + @Override + public void accept(MultiEmitter multiEmitter) { + Uni.createFrom().item("dummy").onItem().delayIt().by(Duration.ofSeconds(2)) + .subscribe().with(new Consumer() { + @Override + public void accept(String s) { + multiEmitter.emit(new Person("bob")); + multiEmitter.emit(new Person("alice")); + multiEmitter.emit(new Person("tom")); + multiEmitter.emit(new Person("jerry")); + multiEmitter.emit(new Person("anna")); + multiEmitter.emit(new Person("ken")); + } + }); + } + }).map(Json::encode); } }