diff --git a/bom/pom.xml b/bom/pom.xml
index b3e532827..ca95c0b90 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -6,7 +6,7 @@
io.smallrye.reactive
mutiny-bom
- 2.5.6
+ 2.5.7-SNAPSHOT
pom
diff --git a/context-propagation/pom.xml b/context-propagation/pom.xml
index ee663803c..55a09c921 100644
--- a/context-propagation/pom.xml
+++ b/context-propagation/pom.xml
@@ -7,7 +7,7 @@
io.smallrye.reactive
mutiny-project
- 2.5.6
+ 2.5.7-SNAPSHOT
mutiny-smallrye-context-propagation
diff --git a/documentation/pom.xml b/documentation/pom.xml
index 256c3ae9a..f67cb3d3c 100644
--- a/documentation/pom.xml
+++ b/documentation/pom.xml
@@ -5,7 +5,7 @@
io.smallrye.reactive
mutiny-project
- 2.5.6
+ 2.5.7-SNAPSHOT
mutiny-documentation
diff --git a/implementation/pom.xml b/implementation/pom.xml
index 928d56cdf..d882c9781 100644
--- a/implementation/pom.xml
+++ b/implementation/pom.xml
@@ -6,7 +6,7 @@
io.smallrye.reactive
mutiny-project
- 2.5.6
+ 2.5.7-SNAPSHOT
mutiny
diff --git a/implementation/src/main/java/io/smallrye/mutiny/converters/uni/UniToMultiPublisher.java b/implementation/src/main/java/io/smallrye/mutiny/converters/uni/UniToMultiPublisher.java
index 7540a0e24..7e38b55c1 100644
--- a/implementation/src/main/java/io/smallrye/mutiny/converters/uni/UniToMultiPublisher.java
+++ b/implementation/src/main/java/io/smallrye/mutiny/converters/uni/UniToMultiPublisher.java
@@ -58,19 +58,21 @@ public Context context() {
@Override
public void cancel() {
- if (upstream != null) {
- upstream.cancel();
+ if (STATE_UPDATER.getAndSet(this, State.DONE) != State.DONE) {
+ if (upstream != null) {
+ upstream.cancel();
+ }
}
}
@Override
public void request(long n) {
- if (n <= 0L) {
- downstream.onError(new IllegalArgumentException("Invalid request"));
- return;
- }
if (STATE_UPDATER.compareAndSet(this, State.INIT, State.UNI_REQUESTED)) {
- AbstractUni.subscribe(uni, this);
+ if (n <= 0L) {
+ onFailure(new IllegalArgumentException("Invalid request"));
+ } else {
+ AbstractUni.subscribe(uni, this);
+ }
}
}
diff --git a/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java b/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java
index f5f6f729a..b841c1afd 100644
--- a/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java
+++ b/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java
@@ -4,6 +4,7 @@
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -94,4 +95,17 @@ void reproducer_quarkus_21528() {
Uni.join().first(asList).withItem();
}
+
+ @Test
+ void reproducer_1520() {
+ // From https://github.com/smallrye/smallrye-mutiny/issues/1520 and
+ // to address https://github.com/quarkusio/quarkus/issues/34613
+ AtomicInteger counter = new AtomicInteger();
+ Multi.createFrom().iterable(List.of("aa", "bb", "cc"))
+ .collect().asList()
+ .onTermination().invoke(counter::incrementAndGet)
+ .onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(2)
+ .await().atMost(Duration.ofSeconds(5));
+ assertThat(counter).hasValue(1);
+ }
}
diff --git a/kotlin/pom.xml b/kotlin/pom.xml
index 73be4910f..68017f0d4 100644
--- a/kotlin/pom.xml
+++ b/kotlin/pom.xml
@@ -6,7 +6,7 @@
io.smallrye.reactive
mutiny-project
- 2.5.6
+ 2.5.7-SNAPSHOT
mutiny-kotlin
diff --git a/math/pom.xml b/math/pom.xml
index 1311631e7..7e2289992 100644
--- a/math/pom.xml
+++ b/math/pom.xml
@@ -7,7 +7,7 @@
io.smallrye.reactive
mutiny-project
- 2.5.6
+ 2.5.7-SNAPSHOT
SmallRye Mutiny - Math operators
diff --git a/pom.xml b/pom.xml
index f530bc1f4..52bad4e05 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,7 +11,7 @@
io.smallrye.reactive
mutiny-project
- 2.5.6
+ 2.5.7-SNAPSHOT
pom
SmallRye Mutiny - Parent
diff --git a/reactive-streams-operators-jakarta/pom.xml b/reactive-streams-operators-jakarta/pom.xml
index 5cfd06927..0918afee0 100644
--- a/reactive-streams-operators-jakarta/pom.xml
+++ b/reactive-streams-operators-jakarta/pom.xml
@@ -6,7 +6,7 @@
io.smallrye.reactive
mutiny-project
- 2.5.6
+ 2.5.7-SNAPSHOT
mutiny-reactive-streams-operators-jakarta
diff --git a/reactive-streams-operators/pom.xml b/reactive-streams-operators/pom.xml
index 78270eeeb..9e7dcfc30 100644
--- a/reactive-streams-operators/pom.xml
+++ b/reactive-streams-operators/pom.xml
@@ -6,7 +6,7 @@
io.smallrye.reactive
mutiny-project
- 2.5.6
+ 2.5.7-SNAPSHOT
mutiny-reactive-streams-operators
diff --git a/reactive-streams-tck-tests/pom.xml b/reactive-streams-tck-tests/pom.xml
index bd83ea93d..3172a6a72 100644
--- a/reactive-streams-tck-tests/pom.xml
+++ b/reactive-streams-tck-tests/pom.xml
@@ -7,7 +7,7 @@
io.smallrye.reactive
mutiny-project
- 2.5.6
+ 2.5.7-SNAPSHOT
reactive-streams-tck-tests
diff --git a/reactor/pom.xml b/reactor/pom.xml
index 691daae3c..d81679029 100644
--- a/reactor/pom.xml
+++ b/reactor/pom.xml
@@ -5,7 +5,7 @@
io.smallrye.reactive
mutiny-project
- 2.5.6
+ 2.5.7-SNAPSHOT
mutiny-reactor
diff --git a/release/pom.xml b/release/pom.xml
index 135d854b3..6ce2769a5 100644
--- a/release/pom.xml
+++ b/release/pom.xml
@@ -5,7 +5,7 @@
io.smallrye.reactive
mutiny-project
- 2.5.6
+ 2.5.7-SNAPSHOT
smallrye-mutiny-release
diff --git a/rxjava3/pom.xml b/rxjava3/pom.xml
index b78d42b6b..45c154a64 100644
--- a/rxjava3/pom.xml
+++ b/rxjava3/pom.xml
@@ -5,7 +5,7 @@
io.smallrye.reactive
mutiny-project
- 2.5.6
+ 2.5.7-SNAPSHOT
mutiny-rxjava3
diff --git a/test-utils/pom.xml b/test-utils/pom.xml
index 4bee07267..12fe712b5 100644
--- a/test-utils/pom.xml
+++ b/test-utils/pom.xml
@@ -5,7 +5,7 @@
io.smallrye.reactive
mutiny-project
- 2.5.6
+ 2.5.7-SNAPSHOT
mutiny-test-utils
diff --git a/workshop-examples/pom.xml b/workshop-examples/pom.xml
index becd26d63..4d9923303 100644
--- a/workshop-examples/pom.xml
+++ b/workshop-examples/pom.xml
@@ -7,7 +7,7 @@
io.smallrye.reactive
mutiny-project
- 2.5.6
+ 2.5.7-SNAPSHOT
SmallRye Mutiny - Workshop examples