Skip to content

Commit

Permalink
Add more tests for Publisher.multicast (#2111)
Browse files Browse the repository at this point in the history
Motivation:
Publisher.multicast had recent changes to request more
if the Subscription with minimum demand has been cancelled.
We currently don't have a test for long overflow protection
and respecing queueLimit for max outstanding demand.
  • Loading branch information
Scottmitch authored Feb 27, 2022
1 parent 40563d0 commit f832ea0
Showing 1 changed file with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ void threeSubscribersOneLateQueueData(boolean onError) throws InterruptedExcepti

@Test
void cancelMinSubscriberRequestsMore() throws InterruptedException {
Publisher<Integer> publisher = source.multicast(1, false);
Publisher<Integer> publisher = source.multicast(1);
toSource(publisher).subscribe(subscriber1);
Subscription localSubscription1 = subscriber1.awaitSubscription();
toSource(publisher).subscribe(subscriber2);
Expand All @@ -435,6 +435,32 @@ void cancelMinSubscriberRequestsMore() throws InterruptedException {
subscription.awaitRequestN(1);
}

@Test
void cancelMinSubscriberRespectsQueueLimit() throws InterruptedException {
final int queueLimit = 64;
Publisher<Integer> publisher = source.multicast(2, queueLimit);
toSource(publisher).subscribe(subscriber1);
Subscription localSubscription1 = subscriber1.awaitSubscription();
localSubscription1.request(10);
assertThat(subscription.requested(), is(0L));

toSource(publisher).subscribe(subscriber2);
Subscription localSubscription2 = subscriber2.awaitSubscription();
localSubscription2.request(Long.MAX_VALUE);
subscription.awaitRequestN(10);
assertThat(subscription.requested(), is(10L));

// Increase the min subscriber demand to allow queueLimit upstream demand.
localSubscription1.request(Long.MAX_VALUE - 1);
subscription.awaitRequestN(queueLimit);
assertThat(subscription.requested(), is((long) queueLimit));

localSubscription1.cancel(); // cancel the subscription that is expected to be min
// Even though we removed the min subscriber we already have queueLimit outstanding demand, so we shouldn't
// exceed queueLimit outstanding demand.
assertThat(subscription.requested(), is((long) queueLimit));
}

@ParameterizedTest
@MethodSource("trueFalseStream")
void threeSubscribersOneLateAfterCancel(boolean cancelMax, boolean cancelUpstream) throws InterruptedException {
Expand Down

0 comments on commit f832ea0

Please sign in to comment.