diff --git a/reactor-core/build.gradle b/reactor-core/build.gradle index bdab44535e..ff06e9f31b 100644 --- a/reactor-core/build.gradle +++ b/reactor-core/build.gradle @@ -166,8 +166,7 @@ task japicmp(type: JapicmpTask) { 'reactor.core.publisher.Sinks$EmitFailureHandler#busyLooping(java.time.Duration)', 'reactor.core.publisher.FluxSink#contextView()', 'reactor.core.publisher.MonoSink#contextView()', - 'reactor.core.publisher.SynchronousSink#contextView()', - 'reactor.core.publisher.Sinks#unsafe()' + 'reactor.core.publisher.SynchronousSink#contextView()' ] } diff --git a/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java index 73644a34c0..fbd251e961 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java @@ -29,7 +29,6 @@ import reactor.core.CoreSubscriber; import reactor.core.Disposable; -import reactor.core.Disposables; import reactor.core.Exceptions; import reactor.core.Fuseable; import reactor.core.Scannable; @@ -59,8 +58,7 @@ * @deprecated To be removed in 3.5. Prefer clear cut usage of {@link Sinks} through * variations of {@link Sinks.MulticastSpec#onBackpressureBuffer() Sinks.many().multicast().onBackpressureBuffer()}. * If you really need the subscribe-to-upstream functionality of a {@link org.reactivestreams.Processor}, switch - * to {@link Sinks.ManyWithUpstream} with {@link Sinks#unsafe()} variants of - * {@link Sinks.MulticastUnsafeSpec#onBackpressureBuffer() Sinks.unsafe().many().multicast().onBackpressureBuffer()}. + * to {@link Sinks.ManyWithUpstream} with {@link Sinks#unsafe()} variants of {@link Sinks.RootSpec#manyWithUpstream() Sinks.unsafe().manyWithUpstream()}. *

This processor was blocking in {@link EmitterProcessor#onNext(Object)}. This behaviour can be implemented with the {@link Sinks} API by calling * {@link Sinks.Many#tryEmitNext(Object)} and retrying, e.g.: *

{@code while (sink.tryEmitNext(v).hasFailed()) {
diff --git a/reactor-core/src/main/java/reactor/core/publisher/Sinks.java b/reactor-core/src/main/java/reactor/core/publisher/Sinks.java
index 7eff34ac30..07efd2448b 100644
--- a/reactor-core/src/main/java/reactor/core/publisher/Sinks.java
+++ b/reactor-core/src/main/java/reactor/core/publisher/Sinks.java
@@ -22,7 +22,6 @@
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 
-import reactor.core.CoreSubscriber;
 import reactor.core.Disposable;
 import reactor.core.Exceptions;
 import reactor.core.Scannable;
@@ -95,16 +94,16 @@ public static ManySpec many() {
 	}
 
 	/**
-	 * Return a {@link UnsafeSpec root spec} for more advanced use cases such as building operators.
+	 * Return a {@link RootSpec root spec} for more advanced use cases such as building operators.
 	 * Unsafe {@link Sinks.Many}, {@link Sinks.One} and {@link Sinks.Empty} are not serialized nor thread safe,
 	 * which implies they MUST be externally synchronized so as to respect the Reactive Streams specification.
 	 * This can typically be the case when the sinks are being called from within a Reactive Streams-compliant context,
 	 * like a {@link Subscriber} or an operator. In turn, this allows the sinks to have less overhead, since they
 	 * don't care to detect concurrent access anymore.
 	 *
-	 * @return {@link UnsafeSpec}
+	 * @return {@link RootSpec}
 	 */
-	public static UnsafeSpec unsafe() {
+	public static RootSpec unsafe() {
 		return SinksSpecs.UNSAFE_ROOT_SPEC;
 	}
 
@@ -291,6 +290,7 @@ static EmitFailureHandler busyLooping(Duration duration){
 		boolean onEmitFailure(SignalType signalType, EmitResult emitResult);
 	}
 
+	//implementation note: this should now only be implemented by the Sinks.unsafe() path
 	/**
 	 * Provides a choice of {@link Sinks.One}/{@link Sinks.Empty} factories and
 	 * {@link Sinks.ManySpec further specs} for {@link Sinks.Many}.
@@ -328,41 +328,16 @@ public interface RootSpec {
 		 * @return {@link ManySpec}
 		 */
 		ManySpec many();
-	}
-
-	/**
-	 * Provides a choice of {@link Sinks.One}/{@link Sinks.Empty} factories and
-	 * {@link Sinks.ManySpec further specs} for {@link Sinks.Many}, but without the
-	 * guards against concurrent access. These raw sinks need more advanced understanding
-	 * of the Reactive Streams specification in order to be correctly used.
-	 * 

- * Some flavors of {@link Sinks.Many} are {@link ManyWithUpstream} which additionally - * support being subscribed to an upstream {@link Publisher}, at most once. - * Please note that when this is done, one MUST stop using emit/tryEmit APIs, reserving signal - * creation to be the sole responsibility of the upstream {@link Publisher}. - * The list of such flavors is as follows: - *

- */ - public interface UnsafeSpec extends RootSpec { /** - * {@inheritDoc} - *

- * Some flavors return a {@link ManyWithUpstream}, supporting being subscribed to an upstream {@link Publisher}. + * Help building {@link Sinks.ManyWithUpstream} sinks that can also be {@link ManyWithUpstream#subscribeTo(Publisher) subscribed to} + * an upstream {@link Publisher}. This is an advanced use case, see {@link ManyWithUpstream#subscribeTo(Publisher)}. + * + * @return a {@link ManyWithUpstreamUnsafeSpec} */ - @Override - ManyUnsafeSpec many(); + ManyWithUpstreamUnsafeSpec manyWithUpstream(); } + /** * Provides {@link Sinks.Many} specs for sinks which can emit multiple elements */ @@ -391,31 +366,59 @@ public interface ManySpec { } /** - * Provides multicast : 1 sink, N {@link Subscriber}. + * Instead of {@link Sinks#unsafe() unsafe} flavors of {@link Sinks.Many}, this spec provides {@link ManyWithUpstream} + * implementations. These additionally support being subscribed to an upstream {@link Publisher}, at most once. + * Please note that when this is done, one MUST stop using emit/tryEmit APIs, reserving signal creation to be the + * sole responsibility of the upstream {@link Publisher}. *

- * This {@link MulticastSpec} provides {@link Sinks#unsafe() unsafe} flavors, including some {@link ManyWithUpstream} implementations. + * As the number of such implementations is deliberately kept low, this spec doesn't further distinguish between + * multicast/unicast/replay categories other than in method naming. */ - public interface MulticastUnsafeSpec extends MulticastSpec { - - @Override - ManyWithUpstream onBackpressureBuffer(); - - @Override - ManyWithUpstream onBackpressureBuffer(int bufferSize); - - @Override - ManyWithUpstream onBackpressureBuffer(int bufferSize, boolean autoCancel); - } - - /** - * Provides {@link Sinks.Many} specs for sinks which can emit multiple elements. - *

- * This {@link ManySpec} provides {@link Sinks#unsafe() unsafe} flavors, including some {@link ManyWithUpstream} implementations. - */ - public interface ManyUnsafeSpec extends ManySpec { + public interface ManyWithUpstreamUnsafeSpec { + /** + * A {@link Sinks.ManyWithUpstream} with the following characteristics: + *

+ *

+ * + */ + ManyWithUpstream multicastOnBackpressureBuffer(); - @Override - MulticastUnsafeSpec multicast(); + /** + * A {@link Sinks.ManyWithUpstream} with the following characteristics: + *

+ *

+ * + * + * @param bufferSize the maximum queue size + * @param autoCancel should the sink fully shutdowns (not publishing anymore) when the last subscriber cancels + */ + ManyWithUpstream multicastOnBackpressureBuffer(int bufferSize, boolean autoCancel); } /** diff --git a/reactor-core/src/main/java/reactor/core/publisher/SinksSpecs.java b/reactor-core/src/main/java/reactor/core/publisher/SinksSpecs.java index 5ffa3da3f5..e72e7fec03 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/SinksSpecs.java +++ b/reactor-core/src/main/java/reactor/core/publisher/SinksSpecs.java @@ -30,7 +30,7 @@ final class SinksSpecs { - static final Sinks.UnsafeSpec UNSAFE_ROOT_SPEC = new UnsafeSpecImpl(); + static final Sinks.RootSpec UNSAFE_ROOT_SPEC = new UnsafeSpecImpl(); static final DefaultSinksSpecs DEFAULT_SINKS = new DefaultSinksSpecs(); abstract static class AbstractSerializedSink { @@ -64,7 +64,7 @@ boolean tryAcquire(Thread currentThread) { static final class UnsafeSpecImpl - implements Sinks.UnsafeSpec, Sinks.ManyUnsafeSpec, Sinks.MulticastUnsafeSpec, Sinks.MulticastReplaySpec { + implements Sinks.RootSpec, Sinks.ManySpec, Sinks.ManyWithUpstreamUnsafeSpec, Sinks.MulticastSpec, Sinks.MulticastReplaySpec { final Sinks.UnicastSpec unicastSpec; @@ -83,7 +83,7 @@ public One one() { } @Override - public Sinks.ManyUnsafeSpec many() { + public Sinks.ManySpec many() { return this; } @@ -92,29 +92,34 @@ public Sinks.UnicastSpec unicast() { return this.unicastSpec; } + @Override + public Sinks.MulticastSpec multicast() { + return this; + } + @Override public Sinks.MulticastReplaySpec replay() { return this; } @Override - public Sinks.ManyWithUpstream onBackpressureBuffer() { - return new EmitterProcessor<>(true, Queues.SMALL_BUFFER_SIZE); + public Sinks.ManyWithUpstreamUnsafeSpec manyWithUpstream() { + return this; } @Override - public Sinks.ManyWithUpstream onBackpressureBuffer(int bufferSize) { - return new EmitterProcessor<>(true, bufferSize); + public Sinks.Many onBackpressureBuffer() { + return new EmitterProcessor<>(true, Queues.SMALL_BUFFER_SIZE); } @Override - public Sinks.ManyWithUpstream onBackpressureBuffer(int bufferSize, boolean autoCancel) { - return new EmitterProcessor<>(autoCancel, bufferSize); + public Sinks.Many onBackpressureBuffer(int bufferSize) { + return new EmitterProcessor<>(true, bufferSize); } @Override - public Sinks.MulticastUnsafeSpec multicast() { - return this; + public Sinks.Many onBackpressureBuffer(int bufferSize, boolean autoCancel) { + return new EmitterProcessor<>(autoCancel, bufferSize); } @Override @@ -171,6 +176,16 @@ public Many limit(int historySize, Duration maxAge) { public Many limit(int historySize, Duration maxAge, Scheduler scheduler) { return ReplayProcessor.createSizeAndTimeout(historySize, maxAge, scheduler); } + + @Override + public Sinks.ManyWithUpstream multicastOnBackpressureBuffer() { + return new EmitterProcessor<>(true, Queues.SMALL_BUFFER_SIZE); + } + + @Override + public Sinks.ManyWithUpstream multicastOnBackpressureBuffer(int bufferSize, boolean autoCancel) { + return new EmitterProcessor<>(autoCancel, bufferSize); + } } //Note: RootSpec is now reserved for Sinks.unsafe() diff --git a/reactor-core/src/test/java/reactor/core/publisher/EmitterProcessorTest.java b/reactor-core/src/test/java/reactor/core/publisher/EmitterProcessorTest.java index 5c8a6354bc..bd04e9936b 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/EmitterProcessorTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/EmitterProcessorTest.java @@ -23,12 +23,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; -import org.assertj.core.data.Percentage; import org.awaitility.Awaitility; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -48,7 +46,6 @@ import reactor.test.publisher.TestPublisher; import reactor.test.subscriber.AssertSubscriber; import reactor.test.subscriber.TestSubscriber; -import reactor.test.util.RaceTestUtils; import reactor.util.annotation.Nullable; import reactor.util.concurrent.Queues; import reactor.util.context.Context; @@ -70,8 +67,8 @@ public class EmitterProcessorTest { AutoDisposingExtension afterTest = new AutoDisposingExtension(); @Test - void smokeTestManySubscriber() { - final Sinks.ManyWithUpstream adapter = Sinks.unsafe().many().multicast().onBackpressureBuffer(); + void smokeTestManyWithUpstream() { + final Sinks.ManyWithUpstream adapter = Sinks.unsafe().manyWithUpstream().multicastOnBackpressureBuffer(); final TestSubscriber testSubscriber1 = TestSubscriber.create(); final TestSubscriber testSubscriber2 = TestSubscriber.create(); final Flux upstream = Flux.range(1, 10); @@ -102,7 +99,7 @@ void smokeTestManySubscriber() { @Test void smokeTestSubscribeAndDispose() { - final Sinks.ManyWithUpstream adapter = Sinks.unsafe().many().multicast().onBackpressureBuffer(); + final Sinks.ManyWithUpstream adapter = Sinks.unsafe().manyWithUpstream().multicastOnBackpressureBuffer(); final TestSubscriber testSubscriber1 = TestSubscriber.create(); final TestSubscriber testSubscriber2 = TestSubscriber.create(); final Flux upstream = Flux.never(); @@ -130,7 +127,7 @@ void smokeTestSubscribeAndDispose() { @Test void subscribeToUpstreamTwiceSkipsSecondSubscription() { - final Sinks.ManyWithUpstream adapter = Sinks.unsafe().many().multicast().onBackpressureBuffer(123); + final Sinks.ManyWithUpstream adapter = Sinks.unsafe().manyWithUpstream().multicastOnBackpressureBuffer(123, true); final TestPublisher upstream1 = TestPublisher.create(); final TestPublisher upstream2 = TestPublisher.create();