From f832ea0d705099cb5c7012459a0933403b948f77 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Sat, 26 Feb 2022 21:01:53 -0800 Subject: [PATCH] Add more tests for Publisher.multicast (#2111) Motivation: Publisher.multicast had recent changes to request more if the Subscription with minimum demand has been cancelled. We currently don't have a test for long overflow protection and respecing queueLimit for max outstanding demand. --- .../api/MulticastPublisherTest.java | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/MulticastPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/MulticastPublisherTest.java index 0bd1354c3d..340691db22 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/MulticastPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/MulticastPublisherTest.java @@ -424,7 +424,7 @@ void threeSubscribersOneLateQueueData(boolean onError) throws InterruptedExcepti @Test void cancelMinSubscriberRequestsMore() throws InterruptedException { - Publisher publisher = source.multicast(1, false); + Publisher publisher = source.multicast(1); toSource(publisher).subscribe(subscriber1); Subscription localSubscription1 = subscriber1.awaitSubscription(); toSource(publisher).subscribe(subscriber2); @@ -435,6 +435,32 @@ void cancelMinSubscriberRequestsMore() throws InterruptedException { subscription.awaitRequestN(1); } + @Test + void cancelMinSubscriberRespectsQueueLimit() throws InterruptedException { + final int queueLimit = 64; + Publisher publisher = source.multicast(2, queueLimit); + toSource(publisher).subscribe(subscriber1); + Subscription localSubscription1 = subscriber1.awaitSubscription(); + localSubscription1.request(10); + assertThat(subscription.requested(), is(0L)); + + toSource(publisher).subscribe(subscriber2); + Subscription localSubscription2 = subscriber2.awaitSubscription(); + localSubscription2.request(Long.MAX_VALUE); + subscription.awaitRequestN(10); + assertThat(subscription.requested(), is(10L)); + + // Increase the min subscriber demand to allow queueLimit upstream demand. + localSubscription1.request(Long.MAX_VALUE - 1); + subscription.awaitRequestN(queueLimit); + assertThat(subscription.requested(), is((long) queueLimit)); + + localSubscription1.cancel(); // cancel the subscription that is expected to be min + // Even though we removed the min subscriber we already have queueLimit outstanding demand, so we shouldn't + // exceed queueLimit outstanding demand. + assertThat(subscription.requested(), is((long) queueLimit)); + } + @ParameterizedTest @MethodSource("trueFalseStream") void threeSubscribersOneLateAfterCancel(boolean cancelMax, boolean cancelUpstream) throws InterruptedException {