diff --git a/src/main/java/io/reactivex/internal/functions/Functions.java b/src/main/java/io/reactivex/internal/functions/Functions.java index c5d0248c5bf..6aee33fea32 100644 --- a/src/main/java/io/reactivex/internal/functions/Functions.java +++ b/src/main/java/io/reactivex/internal/functions/Functions.java @@ -753,9 +753,9 @@ public static Consumer boundedConsumer(int bufferSize) { public static class BoundedConsumer implements Consumer { - protected final int bufferSize; + final int bufferSize; - protected BoundedConsumer(int bufferSize) { + BoundedConsumer(int bufferSize) { this.bufferSize = bufferSize; } @@ -763,9 +763,5 @@ protected BoundedConsumer(int bufferSize) { public void accept(Subscription s) throws Exception { s.request(bufferSize); } - - public int getBufferSize() { - return bufferSize; - } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java index dac6cb32c01..3d40aef2f9d 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java @@ -118,13 +118,13 @@ public static void subscribe(Publisher o, final Consumer the value type */ - @SuppressWarnings("ResultOfMethodCallIgnored") public static void subscribe(Publisher o, final Consumer onNext, final Consumer onError, final Action onComplete, int bufferSize) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.verifyPositive(bufferSize, "number > 0 required"); - subscribe(o, new BoundedSubscriber(onNext, onError, onComplete, Functions.boundedConsumer(bufferSize))); + subscribe(o, new BoundedSubscriber(onNext, onError, onComplete, Functions.boundedConsumer(bufferSize), + bufferSize)); } } diff --git a/src/main/java/io/reactivex/internal/subscribers/BoundedSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/BoundedSubscriber.java index 25d3c7c1f53..5adbfeb2071 100644 --- a/src/main/java/io/reactivex/internal/subscribers/BoundedSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/BoundedSubscriber.java @@ -41,14 +41,13 @@ public final class BoundedSubscriber extends AtomicReference final int limit; public BoundedSubscriber(Consumer onNext, Consumer onError, - Action onComplete, - Consumer onSubscribe) { + Action onComplete, Consumer onSubscribe, int bufferSize) { super(); this.onNext = onNext; this.onError = onError; this.onComplete = onComplete; this.onSubscribe = onSubscribe; - this.bufferSize = ((Functions.BoundedConsumer) onSubscribe).getBufferSize(); + this.bufferSize = bufferSize; this.limit = bufferSize - (bufferSize >> 2); } diff --git a/src/test/java/io/reactivex/internal/subscribers/BoundedSubscriberTest.java b/src/test/java/io/reactivex/internal/subscribers/BoundedSubscriberTest.java index 406b533848b..954496890de 100644 --- a/src/test/java/io/reactivex/internal/subscribers/BoundedSubscriberTest.java +++ b/src/test/java/io/reactivex/internal/subscribers/BoundedSubscriberTest.java @@ -1,11 +1,11 @@ /** * Copyright (c) 2016-present, RxJava Contributors. - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. @@ -20,12 +20,16 @@ import io.reactivex.functions.Action; import io.reactivex.functions.Consumer; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.testng.annotations.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.junit.Assert.assertEquals; @@ -53,12 +57,12 @@ public void accept(Throwable throwable) throws Exception { public void run() throws Exception { received.add(1); } - }, new Functions.BoundedConsumer(128) { + }, new Consumer() { @Override public void accept(Subscription subscription) throws Exception { throw new TestException(); } - }); + }, 128); assertFalse(o.isDisposed()); @@ -89,12 +93,12 @@ public void accept(Throwable throwable) throws Exception { public void run() throws Exception { received.add(1); } - }, new Functions.BoundedConsumer(128) { + }, new Consumer() { @Override public void accept(Subscription subscription) throws Exception { - subscription.request(bufferSize); + subscription.request(128); } - }); + }, 128); assertFalse(o.isDisposed()); @@ -128,12 +132,12 @@ public void accept(Throwable throwable) throws Exception { public void run() throws Exception { received.add(1); } - }, new Functions.BoundedConsumer(128) { + }, new Consumer() { @Override public void accept(Subscription subscription) throws Exception { - subscription.request(bufferSize); + subscription.request(128); } - }); + }, 128); assertFalse(o.isDisposed()); @@ -174,12 +178,12 @@ public void accept(Throwable throwable) throws Exception { public void run() throws Exception { throw new TestException(); } - }, new Functions.BoundedConsumer(128) { + }, new Consumer() { @Override public void accept(Subscription subscription) throws Exception { - subscription.request(bufferSize); + subscription.request(128); } - }); + }, 128); assertFalse(o.isDisposed()); @@ -216,12 +220,12 @@ public void accept(Throwable e) throws Exception { public void run() throws Exception { } - }, new Functions.BoundedConsumer(128) { + }, new Consumer() { @Override public void accept(Subscription subscription) throws Exception { - subscription.request(bufferSize); + subscription.request(128); } - }); + }, 128); pp.subscribe(s); @@ -255,12 +259,12 @@ public void accept(Throwable e) throws Exception { @Override public void run() throws Exception { } - }, new Functions.BoundedConsumer(128) { + }, new Consumer() { @Override public void accept(Subscription s) throws Exception { throw new TestException(); } - }); + }, 128); pp.subscribe(s); @@ -270,12 +274,104 @@ public void accept(Subscription s) throws Exception { assertTrue(errors.toString(), errors.get(0) instanceof TestException); } + @Test + public void badSourceOnSubscribe() { + Flowable source = Flowable.fromPublisher(new Publisher() { + @Override + public void subscribe(Subscriber s) { + BooleanSubscription s1 = new BooleanSubscription(); + s.onSubscribe(s1); + BooleanSubscription s2 = new BooleanSubscription(); + s.onSubscribe(s2); + + assertFalse(s1.isCancelled()); + assertTrue(s2.isCancelled()); + + s.onNext(1); + s.onComplete(); + } + }); + + final List received = new ArrayList(); + + BoundedSubscriber o = new BoundedSubscriber(new Consumer() { + @Override + public void accept(Object v) throws Exception { + received.add(v); + } + }, new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + received.add(e); + } + }, new Action() { + @Override + public void run() throws Exception { + received.add(100); + } + }, new Consumer() { + @Override + public void accept(Subscription s) throws Exception { + s.request(128); + } + }, 128); + + source.subscribe(o); + + assertEquals(Arrays.asList(1, 100), received); + } + + @Test + public void badSourceEmitAfterDone() { + Flowable source = Flowable.fromPublisher(new Publisher() { + @Override + public void subscribe(Subscriber s) { + BooleanSubscription s1 = new BooleanSubscription(); + s.onSubscribe(s1); + + s.onNext(1); + s.onComplete(); + s.onNext(2); + s.onError(new TestException()); + s.onComplete(); + } + }); + + final List received = new ArrayList(); + + BoundedSubscriber o = new BoundedSubscriber(new Consumer() { + @Override + public void accept(Object v) throws Exception { + received.add(v); + } + }, new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + received.add(e); + } + }, new Action() { + @Override + public void run() throws Exception { + received.add(100); + } + }, new Consumer() { + @Override + public void accept(Subscription s) throws Exception { + s.request(128); + } + }, 128); + + source.subscribe(o); + + assertEquals(Arrays.asList(1, 100), received); + } + @Test public void onErrorMissingShouldReportNoCustomOnError() { BoundedSubscriber o = new BoundedSubscriber(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, - Functions.boundedConsumer(128)); + Functions.boundedConsumer(128), 128); assertFalse(o.hasCustomOnError()); } @@ -285,7 +381,7 @@ public void customOnErrorShouldReportCustomOnError() { BoundedSubscriber o = new BoundedSubscriber(Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.EMPTY_ACTION, - Functions.boundedConsumer(128)); + Functions.boundedConsumer(128), 128); assertTrue(o.hasCustomOnError()); }