diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java index 30c9443319..07f7e8e7d4 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java @@ -16,6 +16,7 @@ import java.util.Iterator; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.NonNull; import io.reactivex.annotations.Nullable; import org.reactivestreams.*; @@ -37,8 +38,10 @@ public final class FlowableCombineLatest extends Flowable { + @Nullable final Publisher[] array; + @Nullable final Iterable> iterable; final Function combiner; @@ -47,8 +50,8 @@ public final class FlowableCombineLatest final boolean delayErrors; - public FlowableCombineLatest(Publisher[] array, - Function combiner, + public FlowableCombineLatest(@NonNull Publisher[] array, + @NonNull Function combiner, int bufferSize, boolean delayErrors) { this.array = array; this.iterable = null; @@ -57,8 +60,8 @@ public FlowableCombineLatest(Publisher[] array, this.delayErrors = delayErrors; } - public FlowableCombineLatest(Iterable> iterable, - Function combiner, + public FlowableCombineLatest(@NonNull Iterable> iterable, + @NonNull Function combiner, int bufferSize, boolean delayErrors) { this.array = null; this.iterable = iterable; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java index 629a50a4ac..e130c7e775 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java @@ -15,6 +15,8 @@ import java.util.Arrays; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.disposables.Disposable; @@ -33,21 +35,22 @@ * @param the output type */ public final class FlowableWithLatestFromMany extends AbstractFlowableWithUpstream { - + @Nullable final Publisher[] otherArray; + @Nullable final Iterable> otherIterable; final Function combiner; - public FlowableWithLatestFromMany(Publisher source, Publisher[] otherArray, Function combiner) { + public FlowableWithLatestFromMany(@NonNull Publisher source, @NonNull Publisher[] otherArray, Function combiner) { super(source); this.otherArray = otherArray; this.otherIterable = null; this.combiner = combiner; } - public FlowableWithLatestFromMany(Publisher source, Iterable> otherIterable, Function combiner) { + public FlowableWithLatestFromMany(@NonNull Publisher source, @NonNull Iterable> otherIterable, @NonNull Function combiner) { super(source); this.otherArray = null; this.otherIterable = otherIterable; diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java index af3027ca00..21cee4e184 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java @@ -101,7 +101,7 @@ public void onSuccess(T value) { this.it = iter; - if (outputFused && iter != null) { + if (outputFused) { a.onNext(null); a.onComplete(); return; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java index 21d5c8f975..dd58b0123f 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java @@ -16,6 +16,8 @@ import java.util.concurrent.atomic.*; import io.reactivex.*; +import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.Nullable; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; @@ -33,20 +35,23 @@ */ public final class ObservableWithLatestFromMany extends AbstractObservableWithUpstream { + @Nullable final ObservableSource[] otherArray; + @Nullable final Iterable> otherIterable; + @NonNull final Function combiner; - public ObservableWithLatestFromMany(ObservableSource source, ObservableSource[] otherArray, Function combiner) { + public ObservableWithLatestFromMany(@NonNull ObservableSource source, @NonNull ObservableSource[] otherArray, @NonNull Function combiner) { super(source); this.otherArray = otherArray; this.otherIterable = null; this.combiner = combiner; } - public ObservableWithLatestFromMany(ObservableSource source, Iterable> otherIterable, Function combiner) { + public ObservableWithLatestFromMany(@NonNull ObservableSource source, @NonNull Iterable> otherIterable, @NonNull Function combiner) { super(source); this.otherArray = null; this.otherIterable = otherIterable; diff --git a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java index 5faaa555a5..f2a4dcf7d3 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java @@ -199,7 +199,7 @@ public Disposable schedule(@NonNull Runnable action) { return EmptyDisposable.INSTANCE; } - return poolWorker.scheduleActual(action, 0, null, serial); + return poolWorker.scheduleActual(action, 0, TimeUnit.MILLISECONDS, serial); } @NonNull @Override diff --git a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java index ea45d78e60..12f121135b 100644 --- a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java @@ -17,6 +17,7 @@ import io.reactivex.Scheduler; import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.Nullable; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.*; import io.reactivex.plugins.RxJavaPlugins; @@ -106,7 +107,8 @@ public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDel * @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled * @return the ScheduledRunnable instance */ - public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) { + @NonNull + public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); @@ -126,7 +128,9 @@ public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, Time } sr.setFuture(f); } catch (RejectedExecutionException ex) { - parent.remove(sr); + if (parent != null) { + parent.remove(sr); + } RxJavaPlugins.onError(ex); } diff --git a/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java b/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java index 74cf9e181c..0fde9e592b 100644 --- a/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java @@ -115,4 +115,22 @@ public void run() { assertEquals(0, calls[0]); } + + /** + * Regression test to ensure there is no NPE when the worker has been disposed + */ + @Test + public void npeRegression() throws Exception { + Scheduler s = getScheduler(); + NewThreadWorker w = (NewThreadWorker) s.createWorker(); + w.dispose(); + + //This method used to throw a NPE when the worker has been disposed and the parent is null + w.scheduleActual(new Runnable() { + @Override + public void run() { + } + }, 0, TimeUnit.MILLISECONDS, null); + + } }