Skip to content

Commit

Permalink
2.x: Improve coverage and fix small mistakes/untaken paths in operato…
Browse files Browse the repository at this point in the history
…rs (#5883)
  • Loading branch information
akarnokd authored Mar 4, 2018
1 parent f6f6d82 commit 0ea2c95
Show file tree
Hide file tree
Showing 46 changed files with 2,713 additions and 178 deletions.
9 changes: 5 additions & 4 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ public static <T> Flowable<T> merge(Publisher<? extends MaybeSource<? extends T>
public static <T> Flowable<T> merge(Publisher<? extends MaybeSource<? extends T>> sources, int maxConcurrency) {
ObjectHelper.requireNonNull(sources, "source is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), false, maxConcurrency, Flowable.bufferSize()));
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), false, maxConcurrency, 1));
}

/**
Expand Down Expand Up @@ -1222,12 +1222,11 @@ public static <T> Flowable<T> mergeDelayError(Iterable<? extends MaybeSource<? e
* {@code source} Publisher
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> mergeDelayError(Publisher<? extends MaybeSource<? extends T>> sources) {
return Flowable.fromPublisher(sources).flatMap((Function)MaybeToPublisher.instance(), true);
return mergeDelayError(sources, Integer.MAX_VALUE);
}


Expand Down Expand Up @@ -1267,7 +1266,9 @@ public static <T> Flowable<T> mergeDelayError(Publisher<? extends MaybeSource<?
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public static <T> Flowable<T> mergeDelayError(Publisher<? extends MaybeSource<? extends T>> sources, int maxConcurrency) {
return Flowable.fromPublisher(sources).flatMap((Function)MaybeToPublisher.instance(), true, maxConcurrency);
ObjectHelper.requireNonNull(sources, "source is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), true, maxConcurrency, 1));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ boolean addInner(InnerSubscriber<T, U> inner) {
void removeInner(InnerSubscriber<T, U> inner) {
for (;;) {
InnerSubscriber<?, ?>[] a = subscribers.get();
if (a == CANCELLED || a == EMPTY) {
int n = a.length;
if (n == 0) {
return;
}
int n = a.length;
int j = -1;
for (int i = 0; i < n; i++) {
if (a[i] == inner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.reactivex.functions.BiFunction;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Reduce a sequence of values, starting from a seed value and by using
Expand Down Expand Up @@ -78,28 +79,36 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T value) {
R v = this.value;
try {
this.value = ObjectHelper.requireNonNull(reducer.apply(v, value), "The reducer returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
onError(ex);
if (v != null) {
try {
this.value = ObjectHelper.requireNonNull(reducer.apply(v, value), "The reducer returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
onError(ex);
}
}
}

@Override
public void onError(Throwable e) {
value = null;
s = SubscriptionHelper.CANCELLED;
actual.onError(e);
if (value != null) {
value = null;
s = SubscriptionHelper.CANCELLED;
actual.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}

@Override
public void onComplete() {
R v = value;
value = null;
s = SubscriptionHelper.CANCELLED;
actual.onSuccess(v);
if (v != null) {
value = null;
s = SubscriptionHelper.CANCELLED;
actual.onSuccess(v);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public final class FlowableReplay<T> extends ConnectableFlowable<T> implements H
public static <U, R> Flowable<R> multicastSelector(
final Callable<? extends ConnectableFlowable<U>> connectableFactory,
final Function<? super Flowable<U>, ? extends Publisher<R>> selector) {
return Flowable.unsafeCreate(new MultiCastPublisher<R, U>(connectableFactory, selector));
return new MulticastFlowable<R, U>(connectableFactory, selector);
}

/**
Expand Down Expand Up @@ -1100,17 +1100,17 @@ Node getHead() {
}
}

static final class MultiCastPublisher<R, U> implements Publisher<R> {
static final class MulticastFlowable<R, U> extends Flowable<R> {
private final Callable<? extends ConnectableFlowable<U>> connectableFactory;
private final Function<? super Flowable<U>, ? extends Publisher<R>> selector;

MultiCastPublisher(Callable<? extends ConnectableFlowable<U>> connectableFactory, Function<? super Flowable<U>, ? extends Publisher<R>> selector) {
MulticastFlowable(Callable<? extends ConnectableFlowable<U>> connectableFactory, Function<? super Flowable<U>, ? extends Publisher<R>> selector) {
this.connectableFactory = connectableFactory;
this.selector = selector;
}

@Override
public void subscribe(Subscriber<? super R> child) {
protected void subscribeActual(Subscriber<? super R> child) {
ConnectableFlowable<U> co;
try {
co = ObjectHelper.requireNonNull(connectableFactory.call(), "The connectableFactory returned null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void onSubscribe(Subscription s) {
produced(1);
}
} else {
s.cancel();
a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests"));
return;
}
Expand Down Expand Up @@ -254,11 +255,6 @@ void next() {
}
}

@Override
public boolean accept(Subscriber<? super Flowable<T>> a, Object v) {
// not used by this operator
return false;
}
}

static final class WindowBoundaryInnerSubscriber<T, B> extends DisposableSubscriber<B> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,36 +323,22 @@ static final class WindowOperation<T, B> {
static final class OperatorWindowBoundaryOpenSubscriber<T, B> extends DisposableSubscriber<B> {
final WindowBoundaryMainSubscriber<T, B, ?> parent;

boolean done;

OperatorWindowBoundaryOpenSubscriber(WindowBoundaryMainSubscriber<T, B, ?> parent) {
this.parent = parent;
}

@Override
public void onNext(B t) {
if (done) {
return;
}
parent.open(t);
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
parent.error(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
parent.onComplete();
}
}
Expand All @@ -370,12 +356,8 @@ static final class OperatorWindowBoundaryCloseSubscriber<T, V> extends Disposabl

@Override
public void onNext(V t) {
if (done) {
return;
}
done = true;
cancel();
parent.close(this);
onComplete();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ public void onComplete() {
}
done = true;
parent.onComplete();
// parent.next();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,7 @@ public void onNext(T t) {
tm.dispose();
Disposable task = worker.schedulePeriodically(
new ConsumerIndexHolder(producerIndex, this), timespan, timespan, unit);
if (!timer.compareAndSet(tm, task)) {
task.dispose();
}
timer.replace(task);
}
} else {
window = null;
Expand Down Expand Up @@ -549,9 +547,7 @@ void drainLoop() {

Disposable task = worker.schedulePeriodically(
new ConsumerIndexHolder(producerIndex, this), timespan, timespan, unit);
if (!timer.compareAndSet(tm, task)) {
task.dispose();
}
timer.replace(task);
}

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,24 +199,6 @@ public Object apply(Object t) throws Exception {
}
}

static final class RepeatWhenOuterHandler
implements Function<Observable<Notification<Object>>, ObservableSource<?>> {
private final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler;

RepeatWhenOuterHandler(Function<? super Observable<Object>, ? extends ObservableSource<?>> handler) {
this.handler = handler;
}

@Override
public ObservableSource<?> apply(Observable<Notification<Object>> no) throws Exception {
return handler.apply(no.map(MapToInt.INSTANCE));
}
}

public static Function<Observable<Notification<Object>>, ObservableSource<?>> repeatWhenHandler(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler) {
return new RepeatWhenOuterHandler(handler);
}

public static <T> Callable<ConnectableObservable<T>> replayCallable(final Observable<T> parent) {
return new ReplayCallable<T>(parent);
}
Expand All @@ -237,42 +219,6 @@ public static <T, R> Function<Observable<T>, ObservableSource<R>> replayFunction
return new ReplayFunction<T, R>(selector, scheduler);
}

enum ErrorMapperFilter implements Function<Notification<Object>, Throwable>, Predicate<Notification<Object>> {
INSTANCE;

@Override
public Throwable apply(Notification<Object> t) throws Exception {
return t.getError();
}

@Override
public boolean test(Notification<Object> t) throws Exception {
return t.isOnError();
}
}

static final class RetryWhenInner
implements Function<Observable<Notification<Object>>, ObservableSource<?>> {
private final Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler;

RetryWhenInner(
Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler) {
this.handler = handler;
}

@Override
public ObservableSource<?> apply(Observable<Notification<Object>> no) throws Exception {
Observable<Throwable> map = no
.takeWhile(ErrorMapperFilter.INSTANCE)
.map(ErrorMapperFilter.INSTANCE);
return handler.apply(map);
}
}

public static <T> Function<Observable<Notification<Object>>, ObservableSource<?>> retryWhenHandler(final Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler) {
return new RetryWhenInner(handler);
}

static final class ZipIterableFunction<T, R>
implements Function<List<ObservableSource<? extends T>>, ObservableSource<? extends R>> {
private final Function<? super Object[], ? extends R> zipper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public void onNext(T value) {
@Override
public void onError(Throwable e) {
R v = value;
value = null;
if (v != null) {
value = null;
actual.onError(e);
} else {
RxJavaPlugins.onError(e);
Expand All @@ -100,8 +100,8 @@ public void onError(Throwable e) {
@Override
public void onComplete() {
R v = value;
value = null;
if (v != null) {
value = null;
actual.onSuccess(v);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,8 @@ static final class OperatorWindowBoundaryCloseObserver<T, V> extends DisposableO

@Override
public void onNext(V t) {
if (done) {
return;
}
done = true;
dispose();
parent.close(this);
onComplete();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ public void onComplete() {
}
done = true;
parent.onComplete();
// parent.next();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,14 @@ final class InstantPeriodicTask implements Callable<Void>, Disposable {

@Override
public Void call() throws Exception {
runner = Thread.currentThread();
try {
runner = Thread.currentThread();
try {
task.run();
setRest(executor.submit(this));
} catch (Throwable ex) {
RxJavaPlugins.onError(ex);
}
} finally {
task.run();
setRest(executor.submit(this));
runner = null;
} catch (Throwable ex) {
runner = null;
RxJavaPlugins.onError(ex);
}
return null;
}
Expand All @@ -86,6 +84,7 @@ void setFirst(Future<?> f) {
Future<?> current = first.get();
if (current == CANCELLED) {
f.cancel(runner != Thread.currentThread());
return;
}
if (first.compareAndSet(current, f)) {
return;
Expand All @@ -98,6 +97,7 @@ void setRest(Future<?> f) {
Future<?> current = rest.get();
if (current == CANCELLED) {
f.cancel(runner != Thread.currentThread());
return;
}
if (rest.compareAndSet(current, f)) {
return;
Expand Down
Loading

0 comments on commit 0ea2c95

Please sign in to comment.