diff --git a/reactor-core/build.gradle b/reactor-core/build.gradle
index bdab44535e..ff06e9f31b 100644
--- a/reactor-core/build.gradle
+++ b/reactor-core/build.gradle
@@ -166,8 +166,7 @@ task japicmp(type: JapicmpTask) {
'reactor.core.publisher.Sinks$EmitFailureHandler#busyLooping(java.time.Duration)',
'reactor.core.publisher.FluxSink#contextView()',
'reactor.core.publisher.MonoSink#contextView()',
- 'reactor.core.publisher.SynchronousSink#contextView()',
- 'reactor.core.publisher.Sinks#unsafe()'
+ 'reactor.core.publisher.SynchronousSink#contextView()'
]
}
diff --git a/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java
index 73644a34c0..fbd251e961 100644
--- a/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java
+++ b/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java
@@ -29,7 +29,6 @@
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
-import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.Scannable;
@@ -59,8 +58,7 @@
* @deprecated To be removed in 3.5. Prefer clear cut usage of {@link Sinks} through
* variations of {@link Sinks.MulticastSpec#onBackpressureBuffer() Sinks.many().multicast().onBackpressureBuffer()}.
* If you really need the subscribe-to-upstream functionality of a {@link org.reactivestreams.Processor}, switch
- * to {@link Sinks.ManyWithUpstream} with {@link Sinks#unsafe()} variants of
- * {@link Sinks.MulticastUnsafeSpec#onBackpressureBuffer() Sinks.unsafe().many().multicast().onBackpressureBuffer()}.
+ * to {@link Sinks.ManyWithUpstream} with {@link Sinks#unsafe()} variants of {@link Sinks.RootSpec#manyWithUpstream() Sinks.unsafe().manyWithUpstream()}.
*
This processor was blocking in {@link EmitterProcessor#onNext(Object)}. This behaviour can be implemented with the {@link Sinks} API by calling
* {@link Sinks.Many#tryEmitNext(Object)} and retrying, e.g.:
* {@code while (sink.tryEmitNext(v).hasFailed()) {
diff --git a/reactor-core/src/main/java/reactor/core/publisher/Sinks.java b/reactor-core/src/main/java/reactor/core/publisher/Sinks.java
index 7eff34ac30..07efd2448b 100644
--- a/reactor-core/src/main/java/reactor/core/publisher/Sinks.java
+++ b/reactor-core/src/main/java/reactor/core/publisher/Sinks.java
@@ -22,7 +22,6 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
-import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
@@ -95,16 +94,16 @@ public static ManySpec many() {
}
/**
- * Return a {@link UnsafeSpec root spec} for more advanced use cases such as building operators.
+ * Return a {@link RootSpec root spec} for more advanced use cases such as building operators.
* Unsafe {@link Sinks.Many}, {@link Sinks.One} and {@link Sinks.Empty} are not serialized nor thread safe,
* which implies they MUST be externally synchronized so as to respect the Reactive Streams specification.
* This can typically be the case when the sinks are being called from within a Reactive Streams-compliant context,
* like a {@link Subscriber} or an operator. In turn, this allows the sinks to have less overhead, since they
* don't care to detect concurrent access anymore.
*
- * @return {@link UnsafeSpec}
+ * @return {@link RootSpec}
*/
- public static UnsafeSpec unsafe() {
+ public static RootSpec unsafe() {
return SinksSpecs.UNSAFE_ROOT_SPEC;
}
@@ -291,6 +290,7 @@ static EmitFailureHandler busyLooping(Duration duration){
boolean onEmitFailure(SignalType signalType, EmitResult emitResult);
}
+ //implementation note: this should now only be implemented by the Sinks.unsafe() path
/**
* Provides a choice of {@link Sinks.One}/{@link Sinks.Empty} factories and
* {@link Sinks.ManySpec further specs} for {@link Sinks.Many}.
@@ -328,41 +328,16 @@ public interface RootSpec {
* @return {@link ManySpec}
*/
ManySpec many();
- }
-
- /**
- * Provides a choice of {@link Sinks.One}/{@link Sinks.Empty} factories and
- * {@link Sinks.ManySpec further specs} for {@link Sinks.Many}, but without the
- * guards against concurrent access. These raw sinks need more advanced understanding
- * of the Reactive Streams specification in order to be correctly used.
- *
- * Some flavors of {@link Sinks.Many} are {@link ManyWithUpstream} which additionally
- * support being subscribed to an upstream {@link Publisher}, at most once.
- * Please note that when this is done, one MUST stop using emit/tryEmit APIs, reserving signal
- * creation to be the sole responsibility of the upstream {@link Publisher}.
- * The list of such flavors is as follows:
- *
- * -
- * {@link #many()}.{@link ManyUnsafeSpec#multicast() multicast()}.{@link MulticastUnsafeSpec#onBackpressureBuffer() onBackpressureBuffer()}
- *
- * -
- * {@link #many()}.{@link ManyUnsafeSpec#multicast() multicast()}.{@link MulticastUnsafeSpec#onBackpressureBuffer(int) onBackpressureBuffer(int)}
- *
- * -
- * {@link #many()}.{@link ManyUnsafeSpec#multicast() multicast()}.{@link MulticastUnsafeSpec#onBackpressureBuffer(int, boolean) onBackpressureBuffer(int, boolean)}
- *
- *
- */
- public interface UnsafeSpec extends RootSpec {
/**
- * {@inheritDoc}
- *
- * Some flavors return a {@link ManyWithUpstream}, supporting being subscribed to an upstream {@link Publisher}.
+ * Help building {@link Sinks.ManyWithUpstream} sinks that can also be {@link ManyWithUpstream#subscribeTo(Publisher) subscribed to}
+ * an upstream {@link Publisher}. This is an advanced use case, see {@link ManyWithUpstream#subscribeTo(Publisher)}.
+ *
+ * @return a {@link ManyWithUpstreamUnsafeSpec}
*/
- @Override
- ManyUnsafeSpec many();
+ ManyWithUpstreamUnsafeSpec manyWithUpstream();
}
+
/**
* Provides {@link Sinks.Many} specs for sinks which can emit multiple elements
*/
@@ -391,31 +366,59 @@ public interface ManySpec {
}
/**
- * Provides multicast : 1 sink, N {@link Subscriber}.
+ * Instead of {@link Sinks#unsafe() unsafe} flavors of {@link Sinks.Many}, this spec provides {@link ManyWithUpstream}
+ * implementations. These additionally support being subscribed to an upstream {@link Publisher}, at most once.
+ * Please note that when this is done, one MUST stop using emit/tryEmit APIs, reserving signal creation to be the
+ * sole responsibility of the upstream {@link Publisher}.
*
- * This {@link MulticastSpec} provides {@link Sinks#unsafe() unsafe} flavors, including some {@link ManyWithUpstream} implementations.
+ * As the number of such implementations is deliberately kept low, this spec doesn't further distinguish between
+ * multicast/unicast/replay categories other than in method naming.
*/
- public interface MulticastUnsafeSpec extends MulticastSpec {
-
- @Override
- ManyWithUpstream onBackpressureBuffer();
-
- @Override
- ManyWithUpstream onBackpressureBuffer(int bufferSize);
-
- @Override
- ManyWithUpstream onBackpressureBuffer(int bufferSize, boolean autoCancel);
- }
-
- /**
- * Provides {@link Sinks.Many} specs for sinks which can emit multiple elements.
- *
- * This {@link ManySpec} provides {@link Sinks#unsafe() unsafe} flavors, including some {@link ManyWithUpstream} implementations.
- */
- public interface ManyUnsafeSpec extends ManySpec {
+ public interface ManyWithUpstreamUnsafeSpec {
+ /**
+ * A {@link Sinks.ManyWithUpstream} with the following characteristics:
+ *
+ * - Multicast
+ * - Without {@link Subscriber}: warm up. Remembers up to {@link Queues#SMALL_BUFFER_SIZE}
+ * elements pushed via {@link Many#tryEmitNext(Object)} before the first {@link Subscriber} is registered.
+ * - Backpressure : this sink honors downstream demand by conforming to the lowest demand in case
+ * of multiple subscribers.
If the difference between multiple subscribers is greater than {@link Queues#SMALL_BUFFER_SIZE}:
+ * - {@link Many#tryEmitNext(Object) tryEmitNext} will return {@link EmitResult#FAIL_OVERFLOW}
+ * - {@link Many#emitNext(Object, Sinks.EmitFailureHandler) emitNext} will terminate the sink by {@link Many#emitError(Throwable, Sinks.EmitFailureHandler) emitting}
+ * an {@link Exceptions#failWithOverflow() overflow error}.
+ *
+ * - Replaying: No replay of values seen by earlier subscribers. Only forwards to a {@link Subscriber}
+ * the elements that have been pushed to the sink AFTER this subscriber was subscribed, or elements
+ * that have been buffered due to backpressure/warm up.
+ *
+ *
+ *
+ */
+ ManyWithUpstream multicastOnBackpressureBuffer();
- @Override
- MulticastUnsafeSpec multicast();
+ /**
+ * A {@link Sinks.ManyWithUpstream} with the following characteristics:
+ *
+ * - Multicast
+ * - Without {@link Subscriber}: warm up. Remembers up to {@code bufferSize}
+ * elements pushed via {@link Many#tryEmitNext(Object)} before the first {@link Subscriber} is registered.
+ * - Backpressure : this sink honors downstream demand by conforming to the lowest demand in case
+ * of multiple subscribers.
If the difference between multiple subscribers is too high compared to {@code bufferSize}:
+ * - {@link Many#tryEmitNext(Object) tryEmitNext} will return {@link EmitResult#FAIL_OVERFLOW}
+ * - {@link Many#emitNext(Object, Sinks.EmitFailureHandler) emitNext} will terminate the sink by {@link Many#emitError(Throwable, Sinks.EmitFailureHandler) emitting}
+ * an {@link Exceptions#failWithOverflow() overflow error}.
+ *
+ * - Replaying: No replay of values seen by earlier subscribers. Only forwards to a {@link Subscriber}
+ * the elements that have been pushed to the sink AFTER this subscriber was subscribed, or elements
+ * that have been buffered due to backpressure/warm up.
+ *
+ *
+ *
+ *
+ * @param bufferSize the maximum queue size
+ * @param autoCancel should the sink fully shutdowns (not publishing anymore) when the last subscriber cancels
+ */
+ ManyWithUpstream multicastOnBackpressureBuffer(int bufferSize, boolean autoCancel);
}
/**
diff --git a/reactor-core/src/main/java/reactor/core/publisher/SinksSpecs.java b/reactor-core/src/main/java/reactor/core/publisher/SinksSpecs.java
index 5ffa3da3f5..e72e7fec03 100644
--- a/reactor-core/src/main/java/reactor/core/publisher/SinksSpecs.java
+++ b/reactor-core/src/main/java/reactor/core/publisher/SinksSpecs.java
@@ -30,7 +30,7 @@
final class SinksSpecs {
- static final Sinks.UnsafeSpec UNSAFE_ROOT_SPEC = new UnsafeSpecImpl();
+ static final Sinks.RootSpec UNSAFE_ROOT_SPEC = new UnsafeSpecImpl();
static final DefaultSinksSpecs DEFAULT_SINKS = new DefaultSinksSpecs();
abstract static class AbstractSerializedSink {
@@ -64,7 +64,7 @@ boolean tryAcquire(Thread currentThread) {
static final class UnsafeSpecImpl
- implements Sinks.UnsafeSpec, Sinks.ManyUnsafeSpec, Sinks.MulticastUnsafeSpec, Sinks.MulticastReplaySpec {
+ implements Sinks.RootSpec, Sinks.ManySpec, Sinks.ManyWithUpstreamUnsafeSpec, Sinks.MulticastSpec, Sinks.MulticastReplaySpec {
final Sinks.UnicastSpec unicastSpec;
@@ -83,7 +83,7 @@ public One one() {
}
@Override
- public Sinks.ManyUnsafeSpec many() {
+ public Sinks.ManySpec many() {
return this;
}
@@ -92,29 +92,34 @@ public Sinks.UnicastSpec unicast() {
return this.unicastSpec;
}
+ @Override
+ public Sinks.MulticastSpec multicast() {
+ return this;
+ }
+
@Override
public Sinks.MulticastReplaySpec replay() {
return this;
}
@Override
- public Sinks.ManyWithUpstream onBackpressureBuffer() {
- return new EmitterProcessor<>(true, Queues.SMALL_BUFFER_SIZE);
+ public Sinks.ManyWithUpstreamUnsafeSpec manyWithUpstream() {
+ return this;
}
@Override
- public Sinks.ManyWithUpstream onBackpressureBuffer(int bufferSize) {
- return new EmitterProcessor<>(true, bufferSize);
+ public Sinks.Many onBackpressureBuffer() {
+ return new EmitterProcessor<>(true, Queues.SMALL_BUFFER_SIZE);
}
@Override
- public Sinks.ManyWithUpstream onBackpressureBuffer(int bufferSize, boolean autoCancel) {
- return new EmitterProcessor<>(autoCancel, bufferSize);
+ public Sinks.Many onBackpressureBuffer(int bufferSize) {
+ return new EmitterProcessor<>(true, bufferSize);
}
@Override
- public Sinks.MulticastUnsafeSpec multicast() {
- return this;
+ public Sinks.Many onBackpressureBuffer(int bufferSize, boolean autoCancel) {
+ return new EmitterProcessor<>(autoCancel, bufferSize);
}
@Override
@@ -171,6 +176,16 @@ public Many limit(int historySize, Duration maxAge) {
public Many limit(int historySize, Duration maxAge, Scheduler scheduler) {
return ReplayProcessor.createSizeAndTimeout(historySize, maxAge, scheduler);
}
+
+ @Override
+ public Sinks.ManyWithUpstream multicastOnBackpressureBuffer() {
+ return new EmitterProcessor<>(true, Queues.SMALL_BUFFER_SIZE);
+ }
+
+ @Override
+ public Sinks.ManyWithUpstream multicastOnBackpressureBuffer(int bufferSize, boolean autoCancel) {
+ return new EmitterProcessor<>(autoCancel, bufferSize);
+ }
}
//Note: RootSpec is now reserved for Sinks.unsafe()
diff --git a/reactor-core/src/test/java/reactor/core/publisher/EmitterProcessorTest.java b/reactor-core/src/test/java/reactor/core/publisher/EmitterProcessorTest.java
index 5c8a6354bc..bd04e9936b 100644
--- a/reactor-core/src/test/java/reactor/core/publisher/EmitterProcessorTest.java
+++ b/reactor-core/src/test/java/reactor/core/publisher/EmitterProcessorTest.java
@@ -23,12 +23,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
-import org.assertj.core.data.Percentage;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -48,7 +46,6 @@
import reactor.test.publisher.TestPublisher;
import reactor.test.subscriber.AssertSubscriber;
import reactor.test.subscriber.TestSubscriber;
-import reactor.test.util.RaceTestUtils;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
@@ -70,8 +67,8 @@ public class EmitterProcessorTest {
AutoDisposingExtension afterTest = new AutoDisposingExtension();
@Test
- void smokeTestManySubscriber() {
- final Sinks.ManyWithUpstream adapter = Sinks.unsafe().many().multicast().onBackpressureBuffer();
+ void smokeTestManyWithUpstream() {
+ final Sinks.ManyWithUpstream adapter = Sinks.unsafe().manyWithUpstream().multicastOnBackpressureBuffer();
final TestSubscriber testSubscriber1 = TestSubscriber.create();
final TestSubscriber testSubscriber2 = TestSubscriber.create();
final Flux upstream = Flux.range(1, 10);
@@ -102,7 +99,7 @@ void smokeTestManySubscriber() {
@Test
void smokeTestSubscribeAndDispose() {
- final Sinks.ManyWithUpstream adapter = Sinks.unsafe().many().multicast().onBackpressureBuffer();
+ final Sinks.ManyWithUpstream adapter = Sinks.unsafe().manyWithUpstream().multicastOnBackpressureBuffer();
final TestSubscriber testSubscriber1 = TestSubscriber.create();
final TestSubscriber testSubscriber2 = TestSubscriber.create();
final Flux upstream = Flux.never();
@@ -130,7 +127,7 @@ void smokeTestSubscribeAndDispose() {
@Test
void subscribeToUpstreamTwiceSkipsSecondSubscription() {
- final Sinks.ManyWithUpstream adapter = Sinks.unsafe().many().multicast().onBackpressureBuffer(123);
+ final Sinks.ManyWithUpstream adapter = Sinks.unsafe().manyWithUpstream().multicastOnBackpressureBuffer(123, true);
final TestPublisher upstream1 = TestPublisher.create();
final TestPublisher upstream2 = TestPublisher.create();