From 6304c01d07bd60a5d76312c8fb0574c5ffc64a8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Sat, 3 Mar 2018 23:03:20 +0100 Subject: [PATCH] 2.x: Improve coverage and fix small mistakes/untaken paths in operators --- src/main/java/io/reactivex/Maybe.java | 9 +- .../operators/flowable/FlowableFlatMap.java | 4 +- .../flowable/FlowableReduceSeedSingle.java | 33 +- .../operators/flowable/FlowableReplay.java | 8 +- .../flowable/FlowableWindowBoundary.java | 6 +- .../FlowableWindowBoundarySelector.java | 20 +- .../FlowableWindowBoundarySupplier.java | 1 - .../flowable/FlowableWindowTimed.java | 8 +- .../observable/ObservableInternalHelper.java | 54 --- .../ObservableReduceSeedSingle.java | 4 +- .../ObservableWindowBoundarySelector.java | 6 +- .../ObservableWindowBoundarySupplier.java | 1 - .../schedulers/InstantPeriodicTask.java | 16 +- .../ScheduledDirectPeriodicTask.java | 12 +- .../internal/schedulers/SchedulerWhen.java | 6 +- .../subscribers/QueueDrainSubscriber.java | 4 +- .../processors/BehaviorProcessor.java | 4 +- .../reactivex/subjects/BehaviorSubject.java | 4 +- .../observers/BlockingMultiObserverTest.java | 135 +++++++ .../DisposableLambdaObserverTest.java | 66 ++++ .../observers/FullArbiterObserverTest.java | 48 +++ .../observers/QueueDrainObserverTest.java | 162 +++++++++ .../flowable/FlowableBufferTest.java | 123 ++++++- .../flowable/FlowableReduceTest.java | 56 +++ .../flowable/FlowableRefCountTest.java | 178 +++++++++- .../flowable/FlowableReplayTest.java | 12 + .../flowable/FlowableSingleTest.java | 33 +- .../FlowableWindowWithFlowableTest.java | 103 +++++- ...lowableWindowWithStartEndFlowableTest.java | 32 ++ .../flowable/FlowableWindowWithTimeTest.java | 81 +++++ .../operators/maybe/MaybeMergeTest.java | 8 + .../maybe/MaybeSwitchIfEmptySingleTest.java | 9 + .../observable/ObservableBufferTest.java | 92 ++++- .../observable/ObservableHideTest.java | 18 + .../ObservableInternalHelperTest.java | 2 - .../observable/ObservableReduceTest.java | 132 +++++++ .../observable/ObservableRefCountTest.java | 176 +++++++++ .../observable/ObservableTakeLastOneTest.java | 9 + .../ObservableWindowWithObservableTest.java | 56 +++ ...vableWindowWithStartEndObservableTest.java | 32 ++ .../ExecutorSchedulerDelayedRunnableTest.java | 56 +++ .../schedulers/InstantPeriodicTaskTest.java | 277 +++++++++++++++ .../schedulers/SchedulerWhenTest.java | 188 +++++++++- .../subscribers/QueueDrainSubscriberTest.java | 336 ++++++++++++++++++ .../processors/BehaviorProcessorTest.java | 166 +++++++-- .../subjects/BehaviorSubjectTest.java | 105 ++++++ 46 files changed, 2713 insertions(+), 178 deletions(-) create mode 100644 src/test/java/io/reactivex/internal/observers/BlockingMultiObserverTest.java create mode 100644 src/test/java/io/reactivex/internal/observers/DisposableLambdaObserverTest.java create mode 100644 src/test/java/io/reactivex/internal/observers/FullArbiterObserverTest.java create mode 100644 src/test/java/io/reactivex/internal/observers/QueueDrainObserverTest.java create mode 100644 src/test/java/io/reactivex/internal/schedulers/ExecutorSchedulerDelayedRunnableTest.java create mode 100644 src/test/java/io/reactivex/internal/schedulers/InstantPeriodicTaskTest.java create mode 100644 src/test/java/io/reactivex/internal/subscribers/QueueDrainSubscriberTest.java diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 1f21b32a89..f3d8cec1a6 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -888,7 +888,7 @@ public static Flowable merge(Publisher public static Flowable merge(Publisher> sources, int maxConcurrency) { ObjectHelper.requireNonNull(sources, "source is null"); ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); - return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), false, maxConcurrency, Flowable.bufferSize())); + return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), false, maxConcurrency, 1)); } /** @@ -1222,12 +1222,11 @@ public static Flowable mergeDelayError(IterableReactiveX operators documentation: Merge */ - @SuppressWarnings({ "unchecked", "rawtypes" }) @BackpressureSupport(BackpressureKind.FULL) @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static Flowable mergeDelayError(Publisher> sources) { - return Flowable.fromPublisher(sources).flatMap((Function)MaybeToPublisher.instance(), true); + return mergeDelayError(sources, Integer.MAX_VALUE); } @@ -1267,7 +1266,9 @@ public static Flowable mergeDelayError(Publisher Flowable mergeDelayError(Publisher> sources, int maxConcurrency) { - return Flowable.fromPublisher(sources).flatMap((Function)MaybeToPublisher.instance(), true, maxConcurrency); + ObjectHelper.requireNonNull(sources, "source is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), true, maxConcurrency, 1)); } /** diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java index c815fdf111..58e185d12d 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java @@ -185,10 +185,10 @@ boolean addInner(InnerSubscriber inner) { void removeInner(InnerSubscriber inner) { for (;;) { InnerSubscriber[] a = subscribers.get(); - if (a == CANCELLED || a == EMPTY) { + int n = a.length; + if (n == 0) { return; } - int n = a.length; int j = -1; for (int i = 0; i < n; i++) { if (a[i] == inner) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReduceSeedSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReduceSeedSingle.java index 32cf770953..5201f1b396 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReduceSeedSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReduceSeedSingle.java @@ -21,6 +21,7 @@ import io.reactivex.functions.BiFunction; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.plugins.RxJavaPlugins; /** * Reduce a sequence of values, starting from a seed value and by using @@ -78,28 +79,36 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T value) { R v = this.value; - try { - this.value = ObjectHelper.requireNonNull(reducer.apply(v, value), "The reducer returned a null value"); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - s.cancel(); - onError(ex); + if (v != null) { + try { + this.value = ObjectHelper.requireNonNull(reducer.apply(v, value), "The reducer returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.cancel(); + onError(ex); + } } } @Override public void onError(Throwable e) { - value = null; - s = SubscriptionHelper.CANCELLED; - actual.onError(e); + if (value != null) { + value = null; + s = SubscriptionHelper.CANCELLED; + actual.onError(e); + } else { + RxJavaPlugins.onError(e); + } } @Override public void onComplete() { R v = value; - value = null; - s = SubscriptionHelper.CANCELLED; - actual.onSuccess(v); + if (v != null) { + value = null; + s = SubscriptionHelper.CANCELLED; + actual.onSuccess(v); + } } @Override diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index fa6f494acf..8d1f2eaaad 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -57,7 +57,7 @@ public final class FlowableReplay extends ConnectableFlowable implements H public static Flowable multicastSelector( final Callable> connectableFactory, final Function, ? extends Publisher> selector) { - return Flowable.unsafeCreate(new MultiCastPublisher(connectableFactory, selector)); + return new MulticastFlowable(connectableFactory, selector); } /** @@ -1100,17 +1100,17 @@ Node getHead() { } } - static final class MultiCastPublisher implements Publisher { + static final class MulticastFlowable extends Flowable { private final Callable> connectableFactory; private final Function, ? extends Publisher> selector; - MultiCastPublisher(Callable> connectableFactory, Function, ? extends Publisher> selector) { + MulticastFlowable(Callable> connectableFactory, Function, ? extends Publisher> selector) { this.connectableFactory = connectableFactory; this.selector = selector; } @Override - public void subscribe(Subscriber child) { + protected void subscribeActual(Subscriber child) { ConnectableFlowable co; try { co = ObjectHelper.requireNonNull(connectableFactory.call(), "The connectableFactory returned null"); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java index 9cf1b1100c..58cec89232 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java @@ -93,6 +93,7 @@ public void onSubscribe(Subscription s) { produced(1); } } else { + s.cancel(); a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests")); return; } @@ -254,11 +255,6 @@ void next() { } } - @Override - public boolean accept(Subscriber> a, Object v) { - // not used by this operator - return false; - } } static final class WindowBoundaryInnerSubscriber extends DisposableSubscriber { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java index 50c7e45dfb..55fee47d51 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java @@ -323,36 +323,22 @@ static final class WindowOperation { static final class OperatorWindowBoundaryOpenSubscriber extends DisposableSubscriber { final WindowBoundaryMainSubscriber parent; - boolean done; - OperatorWindowBoundaryOpenSubscriber(WindowBoundaryMainSubscriber parent) { this.parent = parent; } @Override public void onNext(B t) { - if (done) { - return; - } parent.open(t); } @Override public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - done = true; parent.error(t); } @Override public void onComplete() { - if (done) { - return; - } - done = true; parent.onComplete(); } } @@ -370,12 +356,8 @@ static final class OperatorWindowBoundaryCloseSubscriber extends Disposabl @Override public void onNext(V t) { - if (done) { - return; - } - done = true; cancel(); - parent.close(this); + onComplete(); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java index 5286407e7d..f90d74ad2a 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java @@ -326,7 +326,6 @@ public void onComplete() { } done = true; parent.onComplete(); -// parent.next(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java index f0bd22429f..460fd7d814 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java @@ -390,9 +390,7 @@ public void onNext(T t) { tm.dispose(); Disposable task = worker.schedulePeriodically( new ConsumerIndexHolder(producerIndex, this), timespan, timespan, unit); - if (!timer.compareAndSet(tm, task)) { - task.dispose(); - } + timer.replace(task); } } else { window = null; @@ -549,9 +547,7 @@ void drainLoop() { Disposable task = worker.schedulePeriodically( new ConsumerIndexHolder(producerIndex, this), timespan, timespan, unit); - if (!timer.compareAndSet(tm, task)) { - task.dispose(); - } + timer.replace(task); } } else { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java index 7bc6305703..0af4a15f0a 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java @@ -199,24 +199,6 @@ public Object apply(Object t) throws Exception { } } - static final class RepeatWhenOuterHandler - implements Function>, ObservableSource> { - private final Function, ? extends ObservableSource> handler; - - RepeatWhenOuterHandler(Function, ? extends ObservableSource> handler) { - this.handler = handler; - } - - @Override - public ObservableSource apply(Observable> no) throws Exception { - return handler.apply(no.map(MapToInt.INSTANCE)); - } - } - - public static Function>, ObservableSource> repeatWhenHandler(final Function, ? extends ObservableSource> handler) { - return new RepeatWhenOuterHandler(handler); - } - public static Callable> replayCallable(final Observable parent) { return new ReplayCallable(parent); } @@ -237,42 +219,6 @@ public static Function, ObservableSource> replayFunction return new ReplayFunction(selector, scheduler); } - enum ErrorMapperFilter implements Function, Throwable>, Predicate> { - INSTANCE; - - @Override - public Throwable apply(Notification t) throws Exception { - return t.getError(); - } - - @Override - public boolean test(Notification t) throws Exception { - return t.isOnError(); - } - } - - static final class RetryWhenInner - implements Function>, ObservableSource> { - private final Function, ? extends ObservableSource> handler; - - RetryWhenInner( - Function, ? extends ObservableSource> handler) { - this.handler = handler; - } - - @Override - public ObservableSource apply(Observable> no) throws Exception { - Observable map = no - .takeWhile(ErrorMapperFilter.INSTANCE) - .map(ErrorMapperFilter.INSTANCE); - return handler.apply(map); - } - } - - public static Function>, ObservableSource> retryWhenHandler(final Function, ? extends ObservableSource> handler) { - return new RetryWhenInner(handler); - } - static final class ZipIterableFunction implements Function>, ObservableSource> { private final Function zipper; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReduceSeedSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReduceSeedSingle.java index bbf31f4057..9006957348 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReduceSeedSingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReduceSeedSingle.java @@ -89,8 +89,8 @@ public void onNext(T value) { @Override public void onError(Throwable e) { R v = value; - value = null; if (v != null) { + value = null; actual.onError(e); } else { RxJavaPlugins.onError(e); @@ -100,8 +100,8 @@ public void onError(Throwable e) { @Override public void onComplete() { R v = value; - value = null; if (v != null) { + value = null; actual.onSuccess(v); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java index eec057bf22..d0da7e2b0a 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java @@ -338,12 +338,8 @@ static final class OperatorWindowBoundaryCloseObserver extends DisposableO @Override public void onNext(V t) { - if (done) { - return; - } - done = true; dispose(); - parent.close(this); + onComplete(); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java index f47d096556..47b0841262 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java @@ -296,7 +296,6 @@ public void onComplete() { } done = true; parent.onComplete(); -// parent.next(); } } } diff --git a/src/main/java/io/reactivex/internal/schedulers/InstantPeriodicTask.java b/src/main/java/io/reactivex/internal/schedulers/InstantPeriodicTask.java index 86086987ab..ec9b2a0d2f 100644 --- a/src/main/java/io/reactivex/internal/schedulers/InstantPeriodicTask.java +++ b/src/main/java/io/reactivex/internal/schedulers/InstantPeriodicTask.java @@ -50,16 +50,14 @@ final class InstantPeriodicTask implements Callable, Disposable { @Override public Void call() throws Exception { + runner = Thread.currentThread(); try { - runner = Thread.currentThread(); - try { - task.run(); - setRest(executor.submit(this)); - } catch (Throwable ex) { - RxJavaPlugins.onError(ex); - } - } finally { + task.run(); + setRest(executor.submit(this)); + runner = null; + } catch (Throwable ex) { runner = null; + RxJavaPlugins.onError(ex); } return null; } @@ -86,6 +84,7 @@ void setFirst(Future f) { Future current = first.get(); if (current == CANCELLED) { f.cancel(runner != Thread.currentThread()); + return; } if (first.compareAndSet(current, f)) { return; @@ -98,6 +97,7 @@ void setRest(Future f) { Future current = rest.get(); if (current == CANCELLED) { f.cancel(runner != Thread.currentThread()); + return; } if (rest.compareAndSet(current, f)) { return; diff --git a/src/main/java/io/reactivex/internal/schedulers/ScheduledDirectPeriodicTask.java b/src/main/java/io/reactivex/internal/schedulers/ScheduledDirectPeriodicTask.java index 080928f722..201847ba75 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ScheduledDirectPeriodicTask.java +++ b/src/main/java/io/reactivex/internal/schedulers/ScheduledDirectPeriodicTask.java @@ -35,14 +35,12 @@ public ScheduledDirectPeriodicTask(Runnable runnable) { public void run() { runner = Thread.currentThread(); try { - try { - runnable.run(); - } catch (Throwable ex) { - lazySet(FINISHED); - RxJavaPlugins.onError(ex); - } - } finally { + runnable.run(); runner = null; + } catch (Throwable ex) { + runner = null; + lazySet(FINISHED); + RxJavaPlugins.onError(ex); } } } diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java index dd91cc11d7..9b97ee6a36 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java @@ -28,8 +28,8 @@ import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposables; -import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; +import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.processors.FlowableProcessor; import io.reactivex.processors.UnicastProcessor; @@ -116,7 +116,7 @@ public SchedulerWhen(Function>, Completable> comb try { disposable = combine.apply(workerProcessor).subscribe(); } catch (Throwable e) { - Exceptions.propagate(e); + throw ExceptionHelper.wrapOrThrow(e); } } @@ -155,7 +155,7 @@ public Worker createWorker() { static final Disposable DISPOSED = Disposables.disposed(); @SuppressWarnings("serial") - abstract static class ScheduledAction extends AtomicReferenceimplements Disposable { + abstract static class ScheduledAction extends AtomicReference implements Disposable { ScheduledAction() { super(SUBSCRIBED); } diff --git a/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java index b23ff5957c..31d97833c2 100644 --- a/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java @@ -69,7 +69,7 @@ protected final void fastPathEmitMax(U value, boolean delayError, Disposable dis final Subscriber s = actual; final SimplePlainQueue q = queue; - if (wip.get() == 0 && wip.compareAndSet(0, 1)) { + if (fastEnter()) { long r = requested.get(); if (r != 0L) { if (accept(s, value)) { @@ -98,7 +98,7 @@ protected final void fastPathOrderedEmitMax(U value, boolean delayError, Disposa final Subscriber s = actual; final SimplePlainQueue q = queue; - if (wip.get() == 0 && wip.compareAndSet(0, 1)) { + if (fastEnter()) { long r = requested.get(); if (r != 0L) { if (q.isEmpty()) { diff --git a/src/main/java/io/reactivex/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/processors/BehaviorProcessor.java index edd5c620ef..bc0d416160 100644 --- a/src/main/java/io/reactivex/processors/BehaviorProcessor.java +++ b/src/main/java/io/reactivex/processors/BehaviorProcessor.java @@ -459,10 +459,10 @@ boolean add(BehaviorSubscription rs) { void remove(BehaviorSubscription rs) { for (;;) { BehaviorSubscription[] a = subscribers.get(); - if (a == TERMINATED || a == EMPTY) { + int len = a.length; + if (len == 0) { return; } - int len = a.length; int j = -1; for (int i = 0; i < len; i++) { if (a[i] == rs) { diff --git a/src/main/java/io/reactivex/subjects/BehaviorSubject.java b/src/main/java/io/reactivex/subjects/BehaviorSubject.java index 42878adee2..6d0d4641ac 100644 --- a/src/main/java/io/reactivex/subjects/BehaviorSubject.java +++ b/src/main/java/io/reactivex/subjects/BehaviorSubject.java @@ -410,10 +410,10 @@ boolean add(BehaviorDisposable rs) { void remove(BehaviorDisposable rs) { for (;;) { BehaviorDisposable[] a = subscribers.get(); - if (a == TERMINATED || a == EMPTY) { + int len = a.length; + if (len == 0) { return; } - int len = a.length; int j = -1; for (int i = 0; i < len; i++) { if (a[i] == rs) { diff --git a/src/test/java/io/reactivex/internal/observers/BlockingMultiObserverTest.java b/src/test/java/io/reactivex/internal/observers/BlockingMultiObserverTest.java new file mode 100644 index 0000000000..6b4238f8f6 --- /dev/null +++ b/src/test/java/io/reactivex/internal/observers/BlockingMultiObserverTest.java @@ -0,0 +1,135 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.observers; + +import static org.junit.Assert.*; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.schedulers.Schedulers; + +public class BlockingMultiObserverTest { + + @Test + public void dispose() { + BlockingMultiObserver bmo = new BlockingMultiObserver(); + bmo.dispose(); + + Disposable d = Disposables.empty(); + + bmo.onSubscribe(d); + } + + @Test + public void blockingGetDefault() { + final BlockingMultiObserver bmo = new BlockingMultiObserver(); + + Schedulers.single().scheduleDirect(new Runnable() { + @Override + public void run() { + bmo.onSuccess(1); + } + }, 100, TimeUnit.MILLISECONDS); + + assertEquals(1, bmo.blockingGet(0).intValue()); + } + + @Test + public void blockingAwait() { + final BlockingMultiObserver bmo = new BlockingMultiObserver(); + + Schedulers.single().scheduleDirect(new Runnable() { + @Override + public void run() { + bmo.onSuccess(1); + } + }, 100, TimeUnit.MILLISECONDS); + + assertTrue(bmo.blockingAwait(1, TimeUnit.MINUTES)); + } + + @Test + public void blockingGetDefaultInterrupt() { + final BlockingMultiObserver bmo = new BlockingMultiObserver(); + + Thread.currentThread().interrupt(); + try { + bmo.blockingGet(0); + fail("Should have thrown"); + } catch (RuntimeException ex) { + assertTrue(ex.getCause() instanceof InterruptedException); + } finally { + Thread.interrupted(); + } + } + + @Test + public void blockingGetErrorInterrupt() { + final BlockingMultiObserver bmo = new BlockingMultiObserver(); + + Thread.currentThread().interrupt(); + try { + assertTrue(bmo.blockingGetError() instanceof InterruptedException); + } finally { + Thread.interrupted(); + } + } + + @Test + public void blockingGetErrorTimeoutInterrupt() { + final BlockingMultiObserver bmo = new BlockingMultiObserver(); + + Thread.currentThread().interrupt(); + try { + bmo.blockingGetError(1, TimeUnit.MINUTES); + fail("Should have thrown"); + } catch (RuntimeException ex) { + assertTrue(ex.getCause() instanceof InterruptedException); + } finally { + Thread.interrupted(); + } + } + + @Test + public void blockingGetErrorDelayed() { + final BlockingMultiObserver bmo = new BlockingMultiObserver(); + + Schedulers.single().scheduleDirect(new Runnable() { + @Override + public void run() { + bmo.onError(new TestException()); + } + }, 100, TimeUnit.MILLISECONDS); + + assertTrue(bmo.blockingGetError() instanceof TestException); + } + + @Test + public void blockingGetErrorTimeoutDelayed() { + final BlockingMultiObserver bmo = new BlockingMultiObserver(); + + Schedulers.single().scheduleDirect(new Runnable() { + @Override + public void run() { + bmo.onError(new TestException()); + } + }, 100, TimeUnit.MILLISECONDS); + + assertTrue(bmo.blockingGetError(1, TimeUnit.MINUTES) instanceof TestException); + } +} diff --git a/src/test/java/io/reactivex/internal/observers/DisposableLambdaObserverTest.java b/src/test/java/io/reactivex/internal/observers/DisposableLambdaObserverTest.java new file mode 100644 index 0000000000..c152f15b37 --- /dev/null +++ b/src/test/java/io/reactivex/internal/observers/DisposableLambdaObserverTest.java @@ -0,0 +1,66 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.observers; + +import static org.junit.Assert.*; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.TestHelper; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Action; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; + +public class DisposableLambdaObserverTest { + + @Test + public void doubleOnSubscribe() { + TestHelper.doubleOnSubscribe(new DisposableLambdaObserver( + new TestObserver(), Functions.emptyConsumer(), Functions.EMPTY_ACTION + )); + } + + @Test + public void disposeCrash() { + List errors = TestHelper.trackPluginErrors(); + try { + DisposableLambdaObserver o = new DisposableLambdaObserver( + new TestObserver(), Functions.emptyConsumer(), + new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + } + ); + + o.onSubscribe(Disposables.empty()); + + assertFalse(o.isDisposed()); + + o.dispose(); + + assertTrue(o.isDisposed()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/internal/observers/FullArbiterObserverTest.java b/src/test/java/io/reactivex/internal/observers/FullArbiterObserverTest.java new file mode 100644 index 0000000000..c2b4c64e76 --- /dev/null +++ b/src/test/java/io/reactivex/internal/observers/FullArbiterObserverTest.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.observers; + +import org.junit.Test; + +import io.reactivex.TestHelper; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.TestException; +import io.reactivex.internal.disposables.ObserverFullArbiter; +import io.reactivex.observers.TestObserver; + +public class FullArbiterObserverTest { + + @Test + public void doubleOnSubscribe() { + TestObserver to = new TestObserver(); + ObserverFullArbiter fa = new ObserverFullArbiter(to, null, 16); + FullArbiterObserver fo = new FullArbiterObserver(fa); + to.onSubscribe(fa); + + TestHelper.doubleOnSubscribe(fo); + } + + @Test + public void error() { + TestObserver to = new TestObserver(); + ObserverFullArbiter fa = new ObserverFullArbiter(to, null, 16); + FullArbiterObserver fo = new FullArbiterObserver(fa); + to.onSubscribe(fa); + + fo.onSubscribe(Disposables.empty()); + fo.onError(new TestException()); + + to.assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/observers/QueueDrainObserverTest.java b/src/test/java/io/reactivex/internal/observers/QueueDrainObserverTest.java new file mode 100644 index 0000000000..12959cb898 --- /dev/null +++ b/src/test/java/io/reactivex/internal/observers/QueueDrainObserverTest.java @@ -0,0 +1,162 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.observers; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.observers.TestObserver; + +public class QueueDrainObserverTest { + + static final QueueDrainObserver createUnordered(TestObserver to, final Disposable d) { + return new QueueDrainObserver(to, new SpscArrayQueue(4)) { + @Override + public void onNext(Integer t) { + fastPathEmit(t, false, d); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + public void onSubscribe(Disposable s) { + } + + @Override + public void accept(Observer a, Integer v) { + super.accept(a, v); + a.onNext(v); + } + }; + } + + static final QueueDrainObserver createOrdered(TestObserver to, final Disposable d) { + return new QueueDrainObserver(to, new SpscArrayQueue(4)) { + @Override + public void onNext(Integer t) { + fastPathOrderedEmit(t, false, d); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + public void onSubscribe(Disposable s) { + } + + @Override + public void accept(Observer a, Integer v) { + super.accept(a, v); + a.onNext(v); + } + }; + } + + @Test + public void unorderedSlowPath() { + TestObserver to = new TestObserver(); + Disposable d = Disposables.empty(); + QueueDrainObserver qd = createUnordered(to, d); + to.onSubscribe(Disposables.empty()); + + qd.enter(); + qd.onNext(1); + + to.assertEmpty(); + } + + @Test + public void orderedSlowPath() { + TestObserver to = new TestObserver(); + Disposable d = Disposables.empty(); + QueueDrainObserver qd = createOrdered(to, d); + to.onSubscribe(Disposables.empty()); + + qd.enter(); + qd.onNext(1); + + to.assertEmpty(); + } + + @Test + public void orderedSlowPathNonEmptyQueue() { + TestObserver to = new TestObserver(); + Disposable d = Disposables.empty(); + QueueDrainObserver qd = createOrdered(to, d); + to.onSubscribe(Disposables.empty()); + + qd.queue.offer(0); + qd.onNext(1); + + to.assertValuesOnly(0, 1); + } + + @Test + public void unorderedOnNextRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + TestObserver to = new TestObserver(); + Disposable d = Disposables.empty(); + final QueueDrainObserver qd = createUnordered(to, d); + to.onSubscribe(Disposables.empty()); + + Runnable r1 = new Runnable() { + @Override + public void run() { + qd.onNext(1); + } + }; + + TestHelper.race(r1, r1); + + to.assertValuesOnly(1, 1); + } + } + + @Test + public void orderedOnNextRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + TestObserver to = new TestObserver(); + Disposable d = Disposables.empty(); + final QueueDrainObserver qd = createOrdered(to, d); + to.onSubscribe(Disposables.empty()); + + Runnable r1 = new Runnable() { + @Override + public void run() { + qd.onNext(1); + } + }; + + TestHelper.race(r1, r1); + + to.assertValuesOnly(1, 1); + } + } + +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java index 471fc5836e..a65442a895 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java @@ -33,7 +33,7 @@ import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.processors.PublishProcessor; +import io.reactivex.processors.*; import io.reactivex.schedulers.*; import io.reactivex.subscribers.*; @@ -2436,4 +2436,125 @@ protected void subscribeActual(Subscriber s) { RxJavaPlugins.reset(); } } + + @Test + public void bufferExactBoundaryDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable( + new Function, Flowable>>() { + @Override + public Flowable> apply(Flowable f) + throws Exception { + return f.buffer(Flowable.never()); + } + } + ); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferExactBoundarySecondBufferCrash() { + PublishProcessor pp = PublishProcessor.create(); + PublishProcessor b = PublishProcessor.create(); + + TestSubscriber> to = pp.buffer(b, new Callable>() { + int calls; + @Override + public List call() throws Exception { + if (++calls == 2) { + throw new TestException(); + } + return new ArrayList(); + } + }).test(); + + b.onNext(1); + + to.assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferExactBoundaryBadSource() { + Flowable pp = new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + observer.onComplete(); + observer.onNext(1); + observer.onComplete(); + } + }; + + final AtomicReference> ref = new AtomicReference>(); + Flowable b = new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + ref.set(observer); + } + }; + + TestSubscriber> ts = pp.buffer(b).test(); + + ref.get().onNext(1); + + ts.assertResult(Collections.emptyList()); + } + + @Test + public void bufferExactBoundaryCancelUpfront() { + PublishProcessor pp = PublishProcessor.create(); + PublishProcessor b = PublishProcessor.create(); + + pp.buffer(b).test(0L, true) + .assertEmpty(); + + assertFalse(pp.hasSubscribers()); + assertFalse(b.hasSubscribers()); + } + + @Test + public void bufferExactBoundaryDisposed() { + Flowable pp = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + + Disposable d = (Disposable)s; + + assertFalse(d.isDisposed()); + + d.dispose(); + + assertTrue(d.isDisposed()); + } + }; + PublishProcessor b = PublishProcessor.create(); + + pp.buffer(b).test(); + } + + @Test + public void bufferBoundaryErrorTwice() { + List errors = TestHelper.trackPluginErrors(); + try { + BehaviorProcessor.createDefault(1) + .buffer(Functions.justCallable(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onError(new TestException("first")); + s.onError(new TestException("second")); + } + })) + .test() + .assertError(TestException.class) + .assertErrorMessage("first") + .assertNotComplete(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "second"); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReduceTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReduceTest.java index e50d8faa57..63e80b3822 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReduceTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReduceTest.java @@ -29,6 +29,7 @@ import io.reactivex.internal.fuseable.HasUpstreamPublisher; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -470,4 +471,59 @@ static String blockingOp(Integer x, Integer y) { } return "x" + x + "y" + y; } + + @Test + public void seedDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowableToSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Flowable o) + throws Exception { + return o.reduce(0, new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a; + } + }); + } + }); + } + + @Test + public void seedDisposed() { + TestHelper.checkDisposed(PublishProcessor.create().reduce(0, new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a; + } + })); + } + + @Test + public void seedBadSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + observer.onComplete(); + observer.onNext(1); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .reduce(0, new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a; + } + }) + .test() + .assertResult(0); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java index 8150dc9393..9de049a90d 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java @@ -27,11 +27,13 @@ import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.processors.ReplayProcessor; import io.reactivex.schedulers.*; import io.reactivex.subscribers.TestSubscriber; @@ -786,4 +788,178 @@ public void replayIsUnsubscribed() { assertTrue(((Disposable)co).isDisposed()); } + + static final class BadFlowableSubscribe extends ConnectableFlowable { + + @Override + public void connect(Consumer connection) { + try { + connection.accept(Disposables.empty()); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + @Override + protected void subscribeActual(Subscriber observer) { + throw new TestException("subscribeActual"); + } + } + + static final class BadFlowableDispose extends ConnectableFlowable implements Disposable { + + @Override + public void dispose() { + throw new TestException("dispose"); + } + + @Override + public boolean isDisposed() { + return false; + } + + @Override + public void connect(Consumer connection) { + try { + connection.accept(Disposables.empty()); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + @Override + protected void subscribeActual(Subscriber observer) { + } + } + + static final class BadFlowableConnect extends ConnectableFlowable { + + @Override + public void connect(Consumer connection) { + throw new TestException("connect"); + } + + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + } + } + + @Test + public void badSourceSubscribe() { + BadFlowableSubscribe bo = new BadFlowableSubscribe(); + + try { + bo.refCount() + .test(); + fail("Should have thrown"); + } catch (NullPointerException ex) { + assertTrue(ex.getCause() instanceof TestException); + } + } + + @Test + public void badSourceDispose() { + BadFlowableDispose bf = new BadFlowableDispose(); + + try { + bf.refCount() + .test() + .cancel(); + fail("Should have thrown"); + } catch (TestException expected) { + } + } + + @Test + public void badSourceConnect() { + BadFlowableConnect bf = new BadFlowableConnect(); + + try { + bf.refCount() + .test(); + fail("Should have thrown"); + } catch (NullPointerException ex) { + assertTrue(ex.getCause() instanceof TestException); + } + } + + static final class BadFlowableSubscribe2 extends ConnectableFlowable { + + int count; + + @Override + public void connect(Consumer connection) { + try { + connection.accept(Disposables.empty()); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + @Override + protected void subscribeActual(Subscriber observer) { + if (++count == 1) { + observer.onSubscribe(new BooleanSubscription()); + } else { + throw new TestException("subscribeActual"); + } + } + } + + @Test + public void badSourceSubscribe2() { + BadFlowableSubscribe2 bf = new BadFlowableSubscribe2(); + + Flowable o = bf.refCount(); + o.test(); + try { + o.test(); + fail("Should have thrown"); + } catch (NullPointerException ex) { + assertTrue(ex.getCause() instanceof TestException); + } + } + + static final class BadFlowableConnect2 extends ConnectableFlowable + implements Disposable { + + @Override + public void connect(Consumer connection) { + try { + connection.accept(Disposables.empty()); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + observer.onComplete(); + } + + @Override + public void dispose() { + throw new TestException("dispose"); + } + + @Override + public boolean isDisposed() { + return false; + } + } + + @Test + public void badSourceCompleteDisconnect() { + BadFlowableConnect2 bf = new BadFlowableConnect2(); + + try { + bf.refCount() + .test(); + fail("Should have thrown"); + } catch (NullPointerException ex) { + assertTrue(ex.getCause() instanceof TestException); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java index a9c41366ad..8ad3caa14e 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java @@ -1755,4 +1755,16 @@ public Publisher apply(Flowable v) throws Exception { .test() .assertFailureAndMessage(NullPointerException.class, "The selector returned a null Publisher"); } + + @Test + public void multicastSelectorCallableConnectableCrash() { + FlowableReplay.multicastSelector(new Callable>() { + @Override + public ConnectableFlowable call() throws Exception { + throw new TestException(); + } + }, Functions.>identity()) + .test() + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java index 67c5c83881..2aff36bc6a 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java @@ -25,10 +25,10 @@ import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.Flowable; import io.reactivex.functions.*; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.subscribers.DefaultSubscriber; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.subscribers.*; public class FlowableSingleTest { @@ -760,11 +760,40 @@ public SingleSource apply(Flowable o) throws Exception { } }); + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { + @Override + public Flowable apply(Flowable o) throws Exception { + return o.singleOrError().toFlowable(); + } + }); + TestHelper.checkDoubleOnSubscribeFlowableToMaybe(new Function, MaybeSource>() { @Override public MaybeSource apply(Flowable o) throws Exception { return o.singleElement(); } }); + + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { + @Override + public Flowable apply(Flowable o) throws Exception { + return o.singleElement().toFlowable(); + } + }); + } + + @Test + public void cancelAsFlowable() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.singleOrError().toFlowable().test(); + + assertTrue(pp.hasSubscribers()); + + ts.assertEmpty(); + + ts.cancel(); + + assertFalse(pp.hasSubscribers()); } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java index 3994d5dc31..7ceccf678f 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; -import org.reactivestreams.Subscriber; +import org.reactivestreams.*; import io.reactivex.*; import io.reactivex.exceptions.*; @@ -615,4 +615,105 @@ public Flowable apply(Flowable v) throws Exception { } }, false, 1, 1, 1); } + + @Test + public void boundaryError() { + BehaviorProcessor.createDefault(1) + .window(Functions.justCallable(Flowable.error(new TestException()))) + .test() + .assertValueCount(1) + .assertNotComplete() + .assertError(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void boundaryMissingBackpressure() { + BehaviorProcessor.createDefault(1) + .window(Functions.justCallable(Flowable.error(new TestException()))) + .test(0) + .assertFailure(MissingBackpressureException.class); + } + + @Test + public void boundaryCallableCrashOnCall2() { + BehaviorProcessor.createDefault(1) + .window(new Callable>() { + int calls; + @Override + public Flowable call() throws Exception { + if (++calls == 2) { + throw new TestException(); + } + return Flowable.just(1); + } + }) + .test() + .assertError(TestException.class) + .assertNotComplete(); + } + + @Test + public void boundarySecondMissingBackpressure() { + BehaviorProcessor.createDefault(1) + .window(Functions.justCallable(Flowable.just(1))) + .test(1) + .assertError(MissingBackpressureException.class) + .assertNotComplete(); + } + + @Test + public void oneWindow() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber> ts = BehaviorProcessor.createDefault(1) + .window(Functions.justCallable(pp)) + .take(1) + .test(); + + pp.onNext(1); + + ts + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @SuppressWarnings("unchecked") + @Test + public void boundaryDirectMissingBackpressure() { + BehaviorProcessor.create() + .window(Flowable.error(new TestException())) + .test(0) + .assertFailure(MissingBackpressureException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void boundaryDirectMissingBackpressureNoNullPointerException() { + BehaviorProcessor.createDefault(1) + .window(Flowable.error(new TestException())) + .test(0) + .assertFailure(MissingBackpressureException.class); + } + + @Test + public void boundaryDirectSecondMissingBackpressure() { + BehaviorProcessor.createDefault(1) + .window(Flowable.just(1)) + .test(1) + .assertError(MissingBackpressureException.class) + .assertNotComplete(); + } + + @Test + public void boundaryDirectDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>>() { + @Override + public Publisher> apply(Flowable f) + throws Exception { + return f.window(Flowable.never()).takeLast(1); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java index 5177276bed..73535ff5c2 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java @@ -26,6 +26,7 @@ import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.*; import io.reactivex.schedulers.TestScheduler; import io.reactivex.subscribers.*; @@ -398,4 +399,35 @@ public void mainError() { .test() .assertFailure(TestException.class); } + + @Test + public void windowCloseIngoresCancel() { + List errors = TestHelper.trackPluginErrors(); + try { + BehaviorProcessor.createDefault(1) + .window(BehaviorProcessor.createDefault(1), new Function>() { + @Override + public Publisher apply(Integer f) throws Exception { + return new Flowable() { + @Override + protected void subscribeActual( + Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onNext(2); + s.onError(new TestException()); + } + }; + } + }) + .test() + .assertValueCount(1) + .assertNoErrors() + .assertNotComplete(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java index 74bcd72152..6d48c40f49 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java @@ -806,5 +806,86 @@ public void countRestartsOnTimeTick() { .assertNoErrors() .assertNotComplete(); } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>>() { + @Override + public Publisher> apply(Flowable f) + throws Exception { + return f.window(1, TimeUnit.SECONDS, 1).takeLast(0); + } + }); + } + + @SuppressWarnings("unchecked") + @Test + public void firstWindowMissingBackpressure() { + Flowable.never() + .window(1, TimeUnit.SECONDS, 1) + .test(0L) + .assertFailure(MissingBackpressureException.class); + } + + @Test + public void nextWindowMissingBackpressure() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber> ts = pp.window(1, TimeUnit.SECONDS, 1) + .test(1L); + + pp.onNext(1); + + ts.assertValueCount(1) + .assertError(MissingBackpressureException.class) + .assertNotComplete(); + } + + @Test + public void cancelUpfront() { + Flowable.never() + .window(1, TimeUnit.SECONDS, 1) + .test(0L, true) + .assertEmpty(); + } + + @Test + public void nextWindowMissingBackpressureDrainOnSize() { + final PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber> ts = pp.window(1, TimeUnit.MINUTES, 1) + .subscribeWith(new TestSubscriber>(2) { + int calls; + @Override + public void onNext(Flowable t) { + super.onNext(t); + if (++calls == 2) { + pp.onNext(2); + } + } + }); + + pp.onNext(1); + + ts.assertValueCount(2) + .assertError(MissingBackpressureException.class) + .assertNotComplete(); + } + + @Test + public void nextWindowMissingBackpressureDrainOnTime() { + final PublishProcessor pp = PublishProcessor.create(); + + final TestScheduler sch = new TestScheduler(); + + TestSubscriber> ts = pp.window(1, TimeUnit.MILLISECONDS, sch, 10) + .test(1); + + sch.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + ts.assertValueCount(1) + .assertError(MissingBackpressureException.class) + .assertNotComplete(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeMergeTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeMergeTest.java index 0d9d450a05..176ae9c793 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeMergeTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeMergeTest.java @@ -98,4 +98,12 @@ public Integer call() throws Exception { .assertFailureAndMessage(TestException.class, "2", 0, 0); } } + + @Test + public void scalar() { + Maybe.mergeDelayError( + Flowable.just(Maybe.just(1))) + .test() + .assertResult(1); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeSwitchIfEmptySingleTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeSwitchIfEmptySingleTest.java index cd8db6760f..714ba98272 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeSwitchIfEmptySingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeSwitchIfEmptySingleTest.java @@ -20,6 +20,7 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; +import io.reactivex.internal.fuseable.HasUpstreamMaybeSource; import io.reactivex.observers.TestObserver; import io.reactivex.processors.PublishProcessor; @@ -102,4 +103,12 @@ public void run() { TestHelper.race(r1, r2); } } + + @SuppressWarnings("rawtypes") + @Test + public void source() { + assertSame(Maybe.empty(), + ((HasUpstreamMaybeSource)(Maybe.empty().switchIfEmpty(Single.just(1)))).source() + ); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java index 7a435a389c..517a99a3c6 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import org.junit.*; import org.mockito.*; @@ -35,7 +35,7 @@ import io.reactivex.observers.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; -import io.reactivex.subjects.PublishSubject; +import io.reactivex.subjects.*; public class ObservableBufferTest { @@ -1796,4 +1796,92 @@ protected void subscribeActual(Observer s) { RxJavaPlugins.reset(); } } + + @Test + public void bufferExactBoundaryDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable( + new Function, ObservableSource>>() { + @Override + public ObservableSource> apply(Observable f) + throws Exception { + return f.buffer(Observable.never()); + } + } + ); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferExactBoundarySecondBufferCrash() { + PublishSubject ps = PublishSubject.create(); + PublishSubject b = PublishSubject.create(); + + TestObserver> to = ps.buffer(b, new Callable>() { + int calls; + @Override + public List call() throws Exception { + if (++calls == 2) { + throw new TestException(); + } + return new ArrayList(); + } + }).test(); + + b.onNext(1); + + to.assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferExactBoundaryBadSource() { + Observable ps = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onComplete(); + observer.onNext(1); + observer.onComplete(); + } + }; + + final AtomicReference> ref = new AtomicReference>(); + Observable b = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + ref.set(observer); + } + }; + + TestObserver> to = ps.buffer(b).test(); + + ref.get().onNext(1); + + to.assertResult(Collections.emptyList()); + } + + @Test + public void bufferBoundaryErrorTwice() { + List errors = TestHelper.trackPluginErrors(); + try { + BehaviorSubject.createDefault(1) + .buffer(Functions.justCallable(new Observable() { + @Override + protected void subscribeActual(Observer s) { + s.onSubscribe(Disposables.empty()); + s.onError(new TestException("first")); + s.onError(new TestException("second")); + } + })) + .test() + .assertError(TestException.class) + .assertErrorMessage("first") + .assertNotComplete(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "second"); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableHideTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableHideTest.java index 14c6f82c17..d4c2bf002f 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableHideTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableHideTest.java @@ -20,6 +20,7 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; import io.reactivex.subjects.PublishSubject; public class ObservableHideTest { @@ -42,6 +43,7 @@ public void testHiding() { verify(o).onComplete(); verify(o, never()).onError(any(Throwable.class)); } + @Test public void testHidingError() { PublishSubject src = PublishSubject.create(); @@ -60,4 +62,20 @@ public void testHidingError() { verify(o, never()).onComplete(); verify(o).onError(any(TestException.class)); } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable o) + throws Exception { + return o.hide(); + } + }); + } + + @Test + public void disposed() { + TestHelper.checkDisposed(PublishSubject.create().hide()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableInternalHelperTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableInternalHelperTest.java index 1e18ef043d..e03d651bda 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableInternalHelperTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableInternalHelperTest.java @@ -29,8 +29,6 @@ public void enums() { assertNotNull(ObservableInternalHelper.MapToInt.values()[0]); assertNotNull(ObservableInternalHelper.MapToInt.valueOf("INSTANCE")); - assertNotNull(ObservableInternalHelper.ErrorMapperFilter.values()[0]); - assertNotNull(ObservableInternalHelper.ErrorMapperFilter.valueOf("INSTANCE")); } @Test diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReduceTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReduceTest.java index 5faf25b837..5dce27dcaa 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReduceTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReduceTest.java @@ -14,13 +14,20 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import java.util.List; +import java.util.concurrent.Callable; + import org.junit.*; import io.reactivex.*; +import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.PublishSubject; public class ObservableReduceTest { Observer observer; @@ -234,5 +241,130 @@ public void testBackpressureWithInitialValue() throws InterruptedException { assertEquals(21, r.intValue()); } + @Test + public void reduceWithSingle() { + Observable.range(1, 5) + .reduceWith(new Callable() { + @Override + public Integer call() throws Exception { + return 0; + } + }, new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test() + .assertResult(15); + } + @Test + public void reduceMaybeDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservableToMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Observable o) + throws Exception { + return o.reduce(new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return a; + } + }); + } + }); + } + + @Test + public void reduceMaybeCheckDisposed() { + TestHelper.checkDisposed(Observable.just(new Object()).reduce(new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return a; + } + })); + } + + @Test + public void reduceMaybeBadSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onComplete(); + observer.onNext(1); + observer.onError(new TestException()); + observer.onComplete(); + } + }.reduce(new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return a; + } + }) + .test() + .assertResult(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void seedDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservableToSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Observable o) + throws Exception { + return o.reduce(0, new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a; + } + }); + } + }); + } + + @Test + public void seedDisposed() { + TestHelper.checkDisposed(PublishSubject.create().reduce(0, new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a; + } + })); + } + + @Test + public void seedBadSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onComplete(); + observer.onNext(1); + observer.onError(new TestException()); + observer.onComplete(); + } + } + .reduce(0, new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a; + } + }) + .test() + .assertResult(0); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index d30435651c..abd93aa73b 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -29,8 +29,10 @@ import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.observables.ConnectableObservable; import io.reactivex.observers.TestObserver; import io.reactivex.schedulers.*; @@ -770,4 +772,178 @@ public void replayIsUnsubscribed() { assertTrue(((Disposable)co).isDisposed()); } + + static final class BadObservableSubscribe extends ConnectableObservable { + + @Override + public void connect(Consumer connection) { + try { + connection.accept(Disposables.empty()); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + @Override + protected void subscribeActual(Observer observer) { + throw new TestException("subscribeActual"); + } + } + + static final class BadObservableDispose extends ConnectableObservable implements Disposable { + + @Override + public void dispose() { + throw new TestException("dispose"); + } + + @Override + public boolean isDisposed() { + return false; + } + + @Override + public void connect(Consumer connection) { + try { + connection.accept(Disposables.empty()); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + @Override + protected void subscribeActual(Observer observer) { + } + } + + static final class BadObservableConnect extends ConnectableObservable { + + @Override + public void connect(Consumer connection) { + throw new TestException("connect"); + } + + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + } + } + + @Test + public void badSourceSubscribe() { + BadObservableSubscribe bo = new BadObservableSubscribe(); + + try { + bo.refCount() + .test(); + fail("Should have thrown"); + } catch (NullPointerException ex) { + assertTrue(ex.getCause() instanceof TestException); + } + } + + @Test + public void badSourceDispose() { + BadObservableDispose bo = new BadObservableDispose(); + + try { + bo.refCount() + .test() + .cancel(); + fail("Should have thrown"); + } catch (TestException expected) { + } + } + + @Test + public void badSourceConnect() { + BadObservableConnect bo = new BadObservableConnect(); + + try { + bo.refCount() + .test(); + fail("Should have thrown"); + } catch (NullPointerException ex) { + assertTrue(ex.getCause() instanceof TestException); + } + } + + static final class BadObservableSubscribe2 extends ConnectableObservable { + + int count; + + @Override + public void connect(Consumer connection) { + try { + connection.accept(Disposables.empty()); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + @Override + protected void subscribeActual(Observer observer) { + if (++count == 1) { + observer.onSubscribe(Disposables.empty()); + } else { + throw new TestException("subscribeActual"); + } + } + } + + @Test + public void badSourceSubscribe2() { + BadObservableSubscribe2 bo = new BadObservableSubscribe2(); + + Observable o = bo.refCount(); + o.test(); + try { + o.test(); + fail("Should have thrown"); + } catch (NullPointerException ex) { + assertTrue(ex.getCause() instanceof TestException); + } + } + + static final class BadObservableConnect2 extends ConnectableObservable + implements Disposable { + + @Override + public void connect(Consumer connection) { + try { + connection.accept(Disposables.empty()); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onComplete(); + } + + @Override + public void dispose() { + throw new TestException("dispose"); + } + + @Override + public boolean isDisposed() { + return false; + } + } + + @Test + public void badSourceCompleteDisconnect() { + BadObservableConnect2 bo = new BadObservableConnect2(); + + try { + bo.refCount() + .test(); + fail("Should have thrown"); + } catch (NullPointerException ex) { + assertTrue(ex.getCause() instanceof TestException); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastOneTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastOneTest.java index 15daa252e8..a738d40462 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastOneTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastOneTest.java @@ -20,6 +20,7 @@ import org.junit.Test; import io.reactivex.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.observers.TestObserver; @@ -106,4 +107,12 @@ public ObservableSource apply(Observable f) throws Exception { } }); } + + @Test + public void error() { + Observable.error(new TestException()) + .takeLast(1) + .test() + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java index eeaf5be0de..8319a74d97 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java @@ -582,4 +582,60 @@ public ObservableSource apply(Observable v) throws Exception { } }, false, 1, 1, 1); } + + @Test + public void boundaryError() { + BehaviorSubject.createDefault(1) + .window(Functions.justCallable(Observable.error(new TestException()))) + .test() + .assertValueCount(1) + .assertNotComplete() + .assertError(TestException.class); + } + + @Test + public void boundaryCallableCrashOnCall2() { + BehaviorSubject.createDefault(1) + .window(new Callable>() { + int calls; + @Override + public Observable call() throws Exception { + if (++calls == 2) { + throw new TestException(); + } + return Observable.just(1); + } + }) + .test() + .assertError(TestException.class) + .assertNotComplete(); + } + + @Test + public void oneWindow() { + PublishSubject ps = PublishSubject.create(); + + TestObserver> to = BehaviorSubject.createDefault(1) + .window(Functions.justCallable(ps)) + .take(1) + .test(); + + ps.onNext(1); + + to + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void boundaryDirectDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, Observable>>() { + @Override + public Observable> apply(Observable f) + throws Exception { + return f.window(Observable.never()).takeLast(1); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java index 86e234961a..35a8bb3b60 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java @@ -28,6 +28,7 @@ import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.TestScheduler; import io.reactivex.subjects.*; @@ -391,4 +392,35 @@ public Object apply(Observable o) throws Exception { } }, false, 1, 1, (Object[])null); } + + @Test + public void windowCloseIngoresCancel() { + List errors = TestHelper.trackPluginErrors(); + try { + BehaviorSubject.createDefault(1) + .window(BehaviorSubject.createDefault(1), new Function>() { + @Override + public Observable apply(Integer f) throws Exception { + return new Observable() { + @Override + protected void subscribeActual( + Observer s) { + s.onSubscribe(Disposables.empty()); + s.onNext(1); + s.onNext(2); + s.onError(new TestException()); + } + }; + } + }) + .test() + .assertValueCount(1) + .assertNoErrors() + .assertNotComplete(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/schedulers/ExecutorSchedulerDelayedRunnableTest.java b/src/test/java/io/reactivex/internal/schedulers/ExecutorSchedulerDelayedRunnableTest.java new file mode 100644 index 0000000000..0356ed6cbf --- /dev/null +++ b/src/test/java/io/reactivex/internal/schedulers/ExecutorSchedulerDelayedRunnableTest.java @@ -0,0 +1,56 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import io.reactivex.exceptions.TestException; +import io.reactivex.internal.schedulers.ExecutorScheduler.DelayedRunnable; + +public class ExecutorSchedulerDelayedRunnableTest { + + + @Test(expected = TestException.class) + public void delayedRunnableCrash() { + DelayedRunnable dl = new DelayedRunnable(new Runnable() { + @Override + public void run() { + throw new TestException(); + } + }); + dl.run(); + } + + @Test + public void dispose() { + final AtomicInteger count = new AtomicInteger(); + DelayedRunnable dl = new DelayedRunnable(new Runnable() { + @Override + public void run() { + count.incrementAndGet(); + } + }); + + dl.dispose(); + dl.dispose(); + + dl.run(); + + assertEquals(0, count.get()); + } +} diff --git a/src/test/java/io/reactivex/internal/schedulers/InstantPeriodicTaskTest.java b/src/test/java/io/reactivex/internal/schedulers/InstantPeriodicTaskTest.java new file mode 100644 index 0000000000..286be4e33a --- /dev/null +++ b/src/test/java/io/reactivex/internal/schedulers/InstantPeriodicTaskTest.java @@ -0,0 +1,277 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.*; + +import org.junit.Test; + +import io.reactivex.TestHelper; +import io.reactivex.exceptions.TestException; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; + +public class InstantPeriodicTaskTest { + + @Test + public void taskCrash() throws Exception { + ExecutorService exec = Executors.newSingleThreadExecutor(); + List errors = TestHelper.trackPluginErrors(); + try { + + InstantPeriodicTask task = new InstantPeriodicTask(new Runnable() { + @Override + public void run() { + throw new TestException(); + } + }, exec); + + assertNull(task.call()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + exec.shutdownNow(); + RxJavaPlugins.reset(); + } + } + + @Test + public void dispose() throws Exception { + ExecutorService exec = Executors.newSingleThreadExecutor(); + try { + + InstantPeriodicTask task = new InstantPeriodicTask(new Runnable() { + @Override + public void run() { + throw new TestException(); + } + }, exec); + + assertFalse(task.isDisposed()); + + task.dispose(); + + assertTrue(task.isDisposed()); + + task.dispose(); + + assertTrue(task.isDisposed()); + } finally { + exec.shutdownNow(); + RxJavaPlugins.reset(); + } + } + + @Test + public void dispose2() throws Exception { + ExecutorService exec = Executors.newSingleThreadExecutor(); + try { + + InstantPeriodicTask task = new InstantPeriodicTask(new Runnable() { + @Override + public void run() { + throw new TestException(); + } + }, exec); + + task.setFirst(new FutureTask(Functions.EMPTY_RUNNABLE, null)); + task.setRest(new FutureTask(Functions.EMPTY_RUNNABLE, null)); + + assertFalse(task.isDisposed()); + + task.dispose(); + + assertTrue(task.isDisposed()); + + task.dispose(); + + assertTrue(task.isDisposed()); + } finally { + exec.shutdownNow(); + RxJavaPlugins.reset(); + } + } + + @Test + public void dispose2CurrentThread() throws Exception { + ExecutorService exec = Executors.newSingleThreadExecutor(); + try { + + InstantPeriodicTask task = new InstantPeriodicTask(new Runnable() { + @Override + public void run() { + throw new TestException(); + } + }, exec); + + task.runner = Thread.currentThread(); + + task.setFirst(new FutureTask(Functions.EMPTY_RUNNABLE, null)); + task.setRest(new FutureTask(Functions.EMPTY_RUNNABLE, null)); + + assertFalse(task.isDisposed()); + + task.dispose(); + + assertTrue(task.isDisposed()); + + task.dispose(); + + assertTrue(task.isDisposed()); + } finally { + exec.shutdownNow(); + RxJavaPlugins.reset(); + } + } + + @Test + public void dispose3() throws Exception { + ExecutorService exec = Executors.newSingleThreadExecutor(); + try { + + InstantPeriodicTask task = new InstantPeriodicTask(new Runnable() { + @Override + public void run() { + throw new TestException(); + } + }, exec); + + task.dispose(); + + FutureTask f1 = new FutureTask(Functions.EMPTY_RUNNABLE, null); + task.setFirst(f1); + + assertTrue(f1.isCancelled()); + + FutureTask f2 = new FutureTask(Functions.EMPTY_RUNNABLE, null); + task.setRest(f2); + + assertTrue(f2.isCancelled()); + } finally { + exec.shutdownNow(); + RxJavaPlugins.reset(); + } + } + + @Test + public void disposeOnCurrentThread() throws Exception { + ExecutorService exec = Executors.newSingleThreadExecutor(); + try { + + InstantPeriodicTask task = new InstantPeriodicTask(new Runnable() { + @Override + public void run() { + throw new TestException(); + } + }, exec); + + task.runner = Thread.currentThread(); + + task.dispose(); + + FutureTask f1 = new FutureTask(Functions.EMPTY_RUNNABLE, null); + task.setFirst(f1); + + assertTrue(f1.isCancelled()); + + FutureTask f2 = new FutureTask(Functions.EMPTY_RUNNABLE, null); + task.setRest(f2); + + assertTrue(f2.isCancelled()); + } finally { + exec.shutdownNow(); + RxJavaPlugins.reset(); + } + } + + @Test + public void firstCancelRace() throws Exception { + ExecutorService exec = Executors.newSingleThreadExecutor(); + try { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final InstantPeriodicTask task = new InstantPeriodicTask(new Runnable() { + @Override + public void run() { + throw new TestException(); + } + }, exec); + + final FutureTask f1 = new FutureTask(Functions.EMPTY_RUNNABLE, null); + Runnable r1 = new Runnable() { + @Override + public void run() { + task.setFirst(f1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + task.dispose(); + } + }; + + TestHelper.race(r1, r2); + + assertTrue(f1.isCancelled()); + assertTrue(task.isDisposed()); + } + } finally { + exec.shutdownNow(); + RxJavaPlugins.reset(); + } + } + + @Test + public void restCancelRace() throws Exception { + ExecutorService exec = Executors.newSingleThreadExecutor(); + try { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final InstantPeriodicTask task = new InstantPeriodicTask(new Runnable() { + @Override + public void run() { + throw new TestException(); + } + }, exec); + + final FutureTask f1 = new FutureTask(Functions.EMPTY_RUNNABLE, null); + Runnable r1 = new Runnable() { + @Override + public void run() { + task.setRest(f1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + task.dispose(); + } + }; + + TestHelper.race(r1, r2); + + assertTrue(f1.isCancelled()); + assertTrue(task.isDisposed()); + } + } finally { + exec.shutdownNow(); + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java b/src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java index 52ccdad85c..bbf97be11d 100644 --- a/src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java +++ b/src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java @@ -13,20 +13,24 @@ package io.reactivex.internal.schedulers; -import static io.reactivex.Flowable.just; -import static io.reactivex.Flowable.merge; +import static io.reactivex.Flowable.*; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.Assert.*; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Scheduler; +import io.reactivex.*; +import io.reactivex.Scheduler.Worker; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; -import io.reactivex.schedulers.Schedulers; -import io.reactivex.schedulers.TestScheduler; +import io.reactivex.internal.schedulers.SchedulerWhen.*; +import io.reactivex.observers.DisposableCompletableObserver; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.*; import io.reactivex.subscribers.TestSubscriber; public class SchedulerWhenTest { @@ -219,4 +223,174 @@ public Completable apply(Flowable> t) { merge(just(just(1).subscribeOn(limited).observeOn(comp)).repeat(1000)).blockingSubscribe(); } + + @Test + public void subscribedDisposable() { + SchedulerWhen.SUBSCRIBED.dispose(); + assertFalse(SchedulerWhen.SUBSCRIBED.isDisposed()); + } + + @Test(expected = TestException.class) + public void combineCrashInConstructor() { + new SchedulerWhen(new Function>, Completable>() { + @Override + public Completable apply(Flowable> v) + throws Exception { + throw new TestException(); + } + }, Schedulers.single()); + } + + @Test + public void disposed() { + SchedulerWhen sw = new SchedulerWhen(new Function>, Completable>() { + @Override + public Completable apply(Flowable> v) + throws Exception { + return Completable.never(); + } + }, Schedulers.single()); + + assertFalse(sw.isDisposed()); + + sw.dispose(); + + assertTrue(sw.isDisposed()); + } + + @Test + public void scheduledActiondisposedSetRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final ScheduledAction sa = new ScheduledAction() { + + private static final long serialVersionUID = -672980251643733156L; + + @Override + protected Disposable callActual(Worker actualWorker, + CompletableObserver actionCompletable) { + return Disposables.empty(); + } + + }; + + assertFalse(sa.isDisposed()); + + Runnable r1 = new Runnable() { + @Override + public void run() { + sa.dispose(); + } + }; + + TestHelper.race(r1, r1); + + assertTrue(sa.isDisposed()); + } + } + + @Test + public void scheduledActionStates() { + final AtomicInteger count = new AtomicInteger(); + ScheduledAction sa = new ScheduledAction() { + + private static final long serialVersionUID = -672980251643733156L; + + @Override + protected Disposable callActual(Worker actualWorker, + CompletableObserver actionCompletable) { + count.incrementAndGet(); + return Disposables.empty(); + } + + }; + + assertFalse(sa.isDisposed()); + + sa.dispose(); + + assertTrue(sa.isDisposed()); + + sa.dispose(); + + assertTrue(sa.isDisposed()); + + // should not run when disposed + sa.call(Schedulers.single().createWorker(), null); + + assertEquals(0, count.get()); + + // should not run when already scheduled + sa.set(Disposables.empty()); + + sa.call(Schedulers.single().createWorker(), null); + + assertEquals(0, count.get()); + + // disposed while in call + sa = new ScheduledAction() { + + private static final long serialVersionUID = -672980251643733156L; + + @Override + protected Disposable callActual(Worker actualWorker, + CompletableObserver actionCompletable) { + count.incrementAndGet(); + dispose(); + return Disposables.empty(); + } + + }; + + sa.call(Schedulers.single().createWorker(), null); + + assertEquals(1, count.get()); + } + + @Test + public void onCompleteActionRunCrash() { + final AtomicInteger count = new AtomicInteger(); + + OnCompletedAction a = new OnCompletedAction(new Runnable() { + @Override + public void run() { + throw new TestException(); + } + }, new DisposableCompletableObserver() { + + @Override + public void onComplete() { + count.incrementAndGet(); + } + + @Override + public void onError(Throwable e) { + count.decrementAndGet(); + e.printStackTrace(); + } + }); + + try { + a.run(); + fail("Should have thrown"); + } catch (TestException expected) { + + } + + assertEquals(1, count.get()); + } + + @Test + public void queueWorkerDispose() { + QueueWorker qw = new QueueWorker(PublishProcessor.create(), Schedulers.single().createWorker()); + + assertFalse(qw.isDisposed()); + + qw.dispose(); + + assertTrue(qw.isDisposed()); + + qw.dispose(); + + assertTrue(qw.isDisposed()); + } } diff --git a/src/test/java/io/reactivex/internal/subscribers/QueueDrainSubscriberTest.java b/src/test/java/io/reactivex/internal/subscribers/QueueDrainSubscriberTest.java new file mode 100644 index 0000000000..d0360d6cd9 --- /dev/null +++ b/src/test/java/io/reactivex/internal/subscribers/QueueDrainSubscriberTest.java @@ -0,0 +1,336 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.subscribers; + +import static org.junit.Assert.*; + +import java.util.List; + +import org.junit.Test; +import org.reactivestreams.*; + +import io.reactivex.TestHelper; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.MissingBackpressureException; +import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subscribers.TestSubscriber; + +public class QueueDrainSubscriberTest { + + static final QueueDrainSubscriber createUnordered(TestSubscriber ts, final Disposable d) { + return new QueueDrainSubscriber(ts, new SpscArrayQueue(4)) { + @Override + public void onNext(Integer t) { + fastPathEmitMax(t, false, d); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + public void onSubscribe(Subscription s) { + } + + @Override + public boolean accept(Subscriber a, Integer v) { + super.accept(a, v); + a.onNext(v); + return true; + } + }; + } + + static final QueueDrainSubscriber createOrdered(TestSubscriber ts, final Disposable d) { + return new QueueDrainSubscriber(ts, new SpscArrayQueue(4)) { + @Override + public void onNext(Integer t) { + fastPathOrderedEmitMax(t, false, d); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + public void onSubscribe(Subscription s) { + } + + @Override + public boolean accept(Subscriber a, Integer v) { + super.accept(a, v); + a.onNext(v); + return true; + } + }; + } + + static final QueueDrainSubscriber createUnorderedReject(TestSubscriber ts, final Disposable d) { + return new QueueDrainSubscriber(ts, new SpscArrayQueue(4)) { + @Override + public void onNext(Integer t) { + fastPathEmitMax(t, false, d); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + public void onSubscribe(Subscription s) { + } + + @Override + public boolean accept(Subscriber a, Integer v) { + super.accept(a, v); + a.onNext(v); + return false; + } + }; + } + + static final QueueDrainSubscriber createOrderedReject(TestSubscriber ts, final Disposable d) { + return new QueueDrainSubscriber(ts, new SpscArrayQueue(4)) { + @Override + public void onNext(Integer t) { + fastPathOrderedEmitMax(t, false, d); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + public void onSubscribe(Subscription s) { + } + + @Override + public boolean accept(Subscriber a, Integer v) { + super.accept(a, v); + a.onNext(v); + return false; + } + }; + } + + @Test + public void unorderedFastPathNoRequest() { + TestSubscriber ts = new TestSubscriber(0); + Disposable d = Disposables.empty(); + QueueDrainSubscriber qd = createUnordered(ts, d); + ts.onSubscribe(new BooleanSubscription()); + + qd.onNext(1); + + ts.assertFailure(MissingBackpressureException.class); + + assertTrue(d.isDisposed()); + } + + @Test + public void orderedFastPathNoRequest() { + TestSubscriber ts = new TestSubscriber(0); + Disposable d = Disposables.empty(); + QueueDrainSubscriber qd = createOrdered(ts, d); + ts.onSubscribe(new BooleanSubscription()); + + qd.onNext(1); + + ts.assertFailure(MissingBackpressureException.class); + + assertTrue(d.isDisposed()); + } + + @Test + public void acceptBadRequest() { + TestSubscriber ts = new TestSubscriber(0); + Disposable d = Disposables.empty(); + QueueDrainSubscriber qd = createUnordered(ts, d); + ts.onSubscribe(new BooleanSubscription()); + + assertTrue(qd.accept(ts, 0)); + + List errors = TestHelper.trackPluginErrors(); + try { + qd.requested(-1); + TestHelper.assertError(errors, 0, IllegalArgumentException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void unorderedFastPathRequest1() { + TestSubscriber ts = new TestSubscriber(1); + Disposable d = Disposables.empty(); + QueueDrainSubscriber qd = createUnordered(ts, d); + ts.onSubscribe(new BooleanSubscription()); + + qd.requested(1); + + qd.onNext(1); + + ts.assertValuesOnly(1); + } + + @Test + public void orderedFastPathRequest1() { + TestSubscriber ts = new TestSubscriber(1); + Disposable d = Disposables.empty(); + QueueDrainSubscriber qd = createOrdered(ts, d); + ts.onSubscribe(new BooleanSubscription()); + + qd.requested(1); + + qd.onNext(1); + + ts.assertValuesOnly(1); + } + + @Test + public void unorderedSlowPath() { + TestSubscriber ts = new TestSubscriber(1); + Disposable d = Disposables.empty(); + QueueDrainSubscriber qd = createUnordered(ts, d); + ts.onSubscribe(new BooleanSubscription()); + + qd.enter(); + qd.onNext(1); + + ts.assertEmpty(); + } + + @Test + public void orderedSlowPath() { + TestSubscriber ts = new TestSubscriber(1); + Disposable d = Disposables.empty(); + QueueDrainSubscriber qd = createOrdered(ts, d); + ts.onSubscribe(new BooleanSubscription()); + + qd.enter(); + qd.onNext(1); + + ts.assertEmpty(); + } + + @Test + public void orderedSlowPathNonEmptyQueue() { + TestSubscriber ts = new TestSubscriber(1); + Disposable d = Disposables.empty(); + QueueDrainSubscriber qd = createOrdered(ts, d); + ts.onSubscribe(new BooleanSubscription()); + + qd.queue.offer(0); + qd.requested(2); + qd.onNext(1); + + ts.assertValuesOnly(0, 1); + } + + @Test + public void unorderedOnNextRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + TestSubscriber ts = new TestSubscriber(1); + Disposable d = Disposables.empty(); + final QueueDrainSubscriber qd = createUnordered(ts, d); + ts.onSubscribe(new BooleanSubscription()); + + qd.requested(Long.MAX_VALUE); + Runnable r1 = new Runnable() { + @Override + public void run() { + qd.onNext(1); + } + }; + + TestHelper.race(r1, r1); + + ts.assertValuesOnly(1, 1); + } + } + + @Test + public void orderedOnNextRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + TestSubscriber ts = new TestSubscriber(1); + Disposable d = Disposables.empty(); + final QueueDrainSubscriber qd = createOrdered(ts, d); + ts.onSubscribe(new BooleanSubscription()); + + qd.requested(Long.MAX_VALUE); + Runnable r1 = new Runnable() { + @Override + public void run() { + qd.onNext(1); + } + }; + + TestHelper.race(r1, r1); + + ts.assertValuesOnly(1, 1); + } + } + + @Test + public void unorderedFastPathReject() { + TestSubscriber ts = new TestSubscriber(1); + Disposable d = Disposables.empty(); + QueueDrainSubscriber qd = createUnorderedReject(ts, d); + ts.onSubscribe(new BooleanSubscription()); + + qd.requested(1); + + qd.onNext(1); + + ts.assertValuesOnly(1); + + assertEquals(1, qd.requested()); + } + + @Test + public void orderedFastPathReject() { + TestSubscriber ts = new TestSubscriber(1); + Disposable d = Disposables.empty(); + QueueDrainSubscriber qd = createOrderedReject(ts, d); + ts.onSubscribe(new BooleanSubscription()); + + qd.requested(1); + + qd.onNext(1); + + ts.assertValuesOnly(1); + + assertEquals(1, qd.requested()); + } +} diff --git a/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java b/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java index ba52d0c806..fd2677207f 100644 --- a/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java +++ b/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java @@ -13,31 +13,26 @@ package io.reactivex.processors; -import io.reactivex.Flowable; -import io.reactivex.Scheduler; -import io.reactivex.TestHelper; -import io.reactivex.exceptions.MissingBackpressureException; -import io.reactivex.exceptions.TestException; -import io.reactivex.functions.Function; -import io.reactivex.internal.subscriptions.BooleanSubscription; -import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.schedulers.Schedulers; -import io.reactivex.subscribers.DefaultSubscriber; -import io.reactivex.subscribers.TestSubscriber; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.InOrder; -import org.mockito.Mockito; -import org.reactivestreams.Subscriber; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.*; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import org.junit.*; +import org.mockito.*; +import org.reactivestreams.Subscriber; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.BehaviorProcessor.BehaviorSubscription; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.*; public class BehaviorProcessorTest extends FlowableProcessorTest { @@ -369,7 +364,7 @@ public void testTakeOneSubscriber() { // FIXME RS subscribers are not allowed to throw // @Test // public void testOnErrorThrowsDoesntPreventDelivery() { -// BehaviorSubject ps = BehaviorSubject.create(); +// BehaviorProcessor ps = BehaviorProcessor.create(); // // ps.subscribe(); // TestSubscriber ts = new TestSubscriber(); @@ -391,7 +386,7 @@ public void testTakeOneSubscriber() { // */ // @Test // public void testOnErrorThrowsDoesntPreventDelivery2() { -// BehaviorSubject ps = BehaviorSubject.create(); +// BehaviorProcessor ps = BehaviorProcessor.create(); // // ps.subscribe(); // ps.subscribe(); @@ -845,4 +840,129 @@ public void run() { } } } + + @Test + public void behaviorDisposableDisposeState() { + BehaviorProcessor bp = BehaviorProcessor.create(); + bp.onNext(1); + + TestSubscriber ts = new TestSubscriber(); + + BehaviorSubscription bs = new BehaviorSubscription(ts, bp); + ts.onSubscribe(bs); + + assertFalse(bs.cancelled); + + bs.cancel(); + + assertTrue(bs.cancelled); + + bs.cancel(); + + assertTrue(bs.cancelled); + + assertTrue(bs.test(2)); + + bs.emitFirst(); + + ts.assertEmpty(); + + bs.emitNext(2, 0); + } + + @Test + public void emitFirstDisposeRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + BehaviorProcessor bp = BehaviorProcessor.create(); + bp.onNext(1); + + TestSubscriber ts = new TestSubscriber(); + + final BehaviorSubscription bs = new BehaviorSubscription(ts, bp); + ts.onSubscribe(bs); + + Runnable r1 = new Runnable() { + @Override + public void run() { + bs.emitFirst(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + bs.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void emitNextDisposeRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + BehaviorProcessor bp = BehaviorProcessor.create(); + bp.onNext(1); + + TestSubscriber ts = new TestSubscriber(); + + final BehaviorSubscription bs = new BehaviorSubscription(ts, bp); + ts.onSubscribe(bs); + + Runnable r1 = new Runnable() { + @Override + public void run() { + bs.emitNext(2, 0); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + bs.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void emittingEmitNext() { + BehaviorProcessor bp = BehaviorProcessor.create(); + bp.onNext(1); + + TestSubscriber ts = new TestSubscriber(); + + final BehaviorSubscription bs = new BehaviorSubscription(ts, bp); + ts.onSubscribe(bs); + + bs.emitting = true; + bs.emitNext(2, 1); + bs.emitNext(3, 2); + + assertNotNull(bs.queue); + } + + @Test + public void badRequest() { + List errors = TestHelper.trackPluginErrors(); + try { + BehaviorProcessor bp = BehaviorProcessor.create(); + bp.onNext(1); + + TestSubscriber ts = new TestSubscriber(); + + final BehaviorSubscription bs = new BehaviorSubscription(ts, bp); + ts.onSubscribe(bs); + + bs.request(-1); + + TestHelper.assertError(errors, 0, IllegalArgumentException.class); + } finally { + RxJavaPlugins.reset(); + } + } + } diff --git a/src/test/java/io/reactivex/subjects/BehaviorSubjectTest.java b/src/test/java/io/reactivex/subjects/BehaviorSubjectTest.java index cc0b6ae05d..1d57599d16 100644 --- a/src/test/java/io/reactivex/subjects/BehaviorSubjectTest.java +++ b/src/test/java/io/reactivex/subjects/BehaviorSubjectTest.java @@ -31,6 +31,7 @@ import io.reactivex.observers.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.BehaviorSubject.BehaviorDisposable; public class BehaviorSubjectTest extends SubjectTest { @@ -771,4 +772,108 @@ public void run() { ts.assertFailure(TestException.class); } } + + @Test + public void behaviorDisposableDisposeState() { + BehaviorSubject bs = BehaviorSubject.create(); + bs.onNext(1); + + TestObserver to = new TestObserver(); + + BehaviorDisposable bd = new BehaviorDisposable(to, bs); + to.onSubscribe(bd); + + assertFalse(bd.isDisposed()); + + bd.dispose(); + + assertTrue(bd.isDisposed()); + + bd.dispose(); + + assertTrue(bd.isDisposed()); + + assertTrue(bd.test(2)); + + bd.emitFirst(); + + to.assertEmpty(); + + bd.emitNext(2, 0); + } + + @Test + public void emitFirstDisposeRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + BehaviorSubject bs = BehaviorSubject.create(); + bs.onNext(1); + + TestObserver to = new TestObserver(); + + final BehaviorDisposable bd = new BehaviorDisposable(to, bs); + to.onSubscribe(bd); + + Runnable r1 = new Runnable() { + @Override + public void run() { + bd.emitFirst(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + bd.dispose(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void emitNextDisposeRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + BehaviorSubject bs = BehaviorSubject.create(); + bs.onNext(1); + + TestObserver to = new TestObserver(); + + final BehaviorDisposable bd = new BehaviorDisposable(to, bs); + to.onSubscribe(bd); + + Runnable r1 = new Runnable() { + @Override + public void run() { + bd.emitNext(2, 0); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + bd.dispose(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void emittingEmitNext() { + BehaviorSubject bs = BehaviorSubject.create(); + bs.onNext(1); + + TestObserver to = new TestObserver(); + + final BehaviorDisposable bd = new BehaviorDisposable(to, bs); + to.onSubscribe(bd); + + bd.emitting = true; + bd.emitNext(2, 1); + bd.emitNext(3, 2); + + assertNotNull(bd.queue); + } }