diff --git a/bom/pom.xml b/bom/pom.xml index b3e532827..ca95c0b90 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -6,7 +6,7 @@ io.smallrye.reactive mutiny-bom - 2.5.6 + 2.5.7-SNAPSHOT pom diff --git a/context-propagation/pom.xml b/context-propagation/pom.xml index ee663803c..55a09c921 100644 --- a/context-propagation/pom.xml +++ b/context-propagation/pom.xml @@ -7,7 +7,7 @@ io.smallrye.reactive mutiny-project - 2.5.6 + 2.5.7-SNAPSHOT mutiny-smallrye-context-propagation diff --git a/documentation/pom.xml b/documentation/pom.xml index 256c3ae9a..f67cb3d3c 100644 --- a/documentation/pom.xml +++ b/documentation/pom.xml @@ -5,7 +5,7 @@ io.smallrye.reactive mutiny-project - 2.5.6 + 2.5.7-SNAPSHOT mutiny-documentation diff --git a/implementation/pom.xml b/implementation/pom.xml index 928d56cdf..d882c9781 100644 --- a/implementation/pom.xml +++ b/implementation/pom.xml @@ -6,7 +6,7 @@ io.smallrye.reactive mutiny-project - 2.5.6 + 2.5.7-SNAPSHOT mutiny diff --git a/implementation/src/main/java/io/smallrye/mutiny/converters/uni/UniToMultiPublisher.java b/implementation/src/main/java/io/smallrye/mutiny/converters/uni/UniToMultiPublisher.java index 7540a0e24..7e38b55c1 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/converters/uni/UniToMultiPublisher.java +++ b/implementation/src/main/java/io/smallrye/mutiny/converters/uni/UniToMultiPublisher.java @@ -58,19 +58,21 @@ public Context context() { @Override public void cancel() { - if (upstream != null) { - upstream.cancel(); + if (STATE_UPDATER.getAndSet(this, State.DONE) != State.DONE) { + if (upstream != null) { + upstream.cancel(); + } } } @Override public void request(long n) { - if (n <= 0L) { - downstream.onError(new IllegalArgumentException("Invalid request")); - return; - } if (STATE_UPDATER.compareAndSet(this, State.INIT, State.UNI_REQUESTED)) { - AbstractUni.subscribe(uni, this); + if (n <= 0L) { + onFailure(new IllegalArgumentException("Invalid request")); + } else { + AbstractUni.subscribe(uni, this); + } } } diff --git a/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java b/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java index f5f6f729a..b841c1afd 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java @@ -4,6 +4,7 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -94,4 +95,17 @@ void reproducer_quarkus_21528() { Uni.join().first(asList).withItem(); } + + @Test + void reproducer_1520() { + // From https://github.com/smallrye/smallrye-mutiny/issues/1520 and + // to address https://github.com/quarkusio/quarkus/issues/34613 + AtomicInteger counter = new AtomicInteger(); + Multi.createFrom().iterable(List.of("aa", "bb", "cc")) + .collect().asList() + .onTermination().invoke(counter::incrementAndGet) + .onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(2) + .await().atMost(Duration.ofSeconds(5)); + assertThat(counter).hasValue(1); + } } diff --git a/kotlin/pom.xml b/kotlin/pom.xml index 73be4910f..68017f0d4 100644 --- a/kotlin/pom.xml +++ b/kotlin/pom.xml @@ -6,7 +6,7 @@ io.smallrye.reactive mutiny-project - 2.5.6 + 2.5.7-SNAPSHOT mutiny-kotlin diff --git a/math/pom.xml b/math/pom.xml index 1311631e7..7e2289992 100644 --- a/math/pom.xml +++ b/math/pom.xml @@ -7,7 +7,7 @@ io.smallrye.reactive mutiny-project - 2.5.6 + 2.5.7-SNAPSHOT SmallRye Mutiny - Math operators diff --git a/pom.xml b/pom.xml index f530bc1f4..52bad4e05 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ io.smallrye.reactive mutiny-project - 2.5.6 + 2.5.7-SNAPSHOT pom SmallRye Mutiny - Parent diff --git a/reactive-streams-operators-jakarta/pom.xml b/reactive-streams-operators-jakarta/pom.xml index 5cfd06927..0918afee0 100644 --- a/reactive-streams-operators-jakarta/pom.xml +++ b/reactive-streams-operators-jakarta/pom.xml @@ -6,7 +6,7 @@ io.smallrye.reactive mutiny-project - 2.5.6 + 2.5.7-SNAPSHOT mutiny-reactive-streams-operators-jakarta diff --git a/reactive-streams-operators/pom.xml b/reactive-streams-operators/pom.xml index 78270eeeb..9e7dcfc30 100644 --- a/reactive-streams-operators/pom.xml +++ b/reactive-streams-operators/pom.xml @@ -6,7 +6,7 @@ io.smallrye.reactive mutiny-project - 2.5.6 + 2.5.7-SNAPSHOT mutiny-reactive-streams-operators diff --git a/reactive-streams-tck-tests/pom.xml b/reactive-streams-tck-tests/pom.xml index bd83ea93d..3172a6a72 100644 --- a/reactive-streams-tck-tests/pom.xml +++ b/reactive-streams-tck-tests/pom.xml @@ -7,7 +7,7 @@ io.smallrye.reactive mutiny-project - 2.5.6 + 2.5.7-SNAPSHOT reactive-streams-tck-tests diff --git a/reactor/pom.xml b/reactor/pom.xml index 691daae3c..d81679029 100644 --- a/reactor/pom.xml +++ b/reactor/pom.xml @@ -5,7 +5,7 @@ io.smallrye.reactive mutiny-project - 2.5.6 + 2.5.7-SNAPSHOT mutiny-reactor diff --git a/release/pom.xml b/release/pom.xml index 135d854b3..6ce2769a5 100644 --- a/release/pom.xml +++ b/release/pom.xml @@ -5,7 +5,7 @@ io.smallrye.reactive mutiny-project - 2.5.6 + 2.5.7-SNAPSHOT smallrye-mutiny-release diff --git a/rxjava3/pom.xml b/rxjava3/pom.xml index b78d42b6b..45c154a64 100644 --- a/rxjava3/pom.xml +++ b/rxjava3/pom.xml @@ -5,7 +5,7 @@ io.smallrye.reactive mutiny-project - 2.5.6 + 2.5.7-SNAPSHOT mutiny-rxjava3 diff --git a/test-utils/pom.xml b/test-utils/pom.xml index 4bee07267..12fe712b5 100644 --- a/test-utils/pom.xml +++ b/test-utils/pom.xml @@ -5,7 +5,7 @@ io.smallrye.reactive mutiny-project - 2.5.6 + 2.5.7-SNAPSHOT mutiny-test-utils diff --git a/workshop-examples/pom.xml b/workshop-examples/pom.xml index becd26d63..4d9923303 100644 --- a/workshop-examples/pom.xml +++ b/workshop-examples/pom.xml @@ -7,7 +7,7 @@ io.smallrye.reactive mutiny-project - 2.5.6 + 2.5.7-SNAPSHOT SmallRye Mutiny - Workshop examples