Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OperationSwitch notify onComplete() too early. #443

Merged
merged 3 commits into from
Oct 22, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 175 additions & 44 deletions rxjava-core/src/main/java/rx/operators/OperationSwitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -30,24 +34,30 @@
import rx.Observer;
import rx.Subscription;
import rx.concurrency.TestScheduler;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func1;

/**
* Transforms an Observable that emits Observables into a single Observable that emits the items
* emitted by the most recently published of those Observables.
* Transforms an Observable that emits Observables into a single Observable that
* emits the items emitted by the most recently published of those Observables.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/switchDo.png">
* <img width="640" src=
* "https://github.com/Netflix/RxJava/wiki/images/rx-operators/switchDo.png">
*/
public final class OperationSwitch {

/**
* This function transforms an {@link Observable} sequence of {@link Observable} sequences into a single {@link Observable} sequence which produces values from the most recently published
* {@link Observable}.
* This function transforms an {@link Observable} sequence of
* {@link Observable} sequences into a single {@link Observable} sequence
* which produces values from the most recently published {@link Observable}
* .
*
* @param sequences
* The {@link Observable} sequence consisting of {@link Observable} sequences.
* The {@link Observable} sequence consisting of
* {@link Observable} sequences.
* @return A {@link Func1} which does this transformation.
*/
public static <T> OnSubscribeFunc<T> switchDo(final Observable<? extends Observable<? extends T>> sequences) {
Expand All @@ -69,69 +79,113 @@ public Switch(Observable<? extends Observable<? extends T>> sequences) {

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
SafeObservableSubscription subscription = new SafeObservableSubscription();
subscription.wrap(sequences.subscribe(new SwitchObserver<T>(observer, subscription)));
return subscription;
SafeObservableSubscription parent;
parent = new SafeObservableSubscription();

MultipleAssignmentSubscription child;
child = new MultipleAssignmentSubscription();

parent.wrap(sequences.subscribe(new SwitchObserver<T>(observer, parent, child)));

return new CompositeSubscription(parent, child);
}
}

private static class SwitchObserver<T> implements Observer<Observable<? extends T>> {

private final Observer<? super T> observer;
private final SafeObservableSubscription parent;
private final AtomicReference<Subscription> subsequence = new AtomicReference<Subscription>();
private final Object gate;
private final Observer<? super T> observer;
private final SafeObservableSubscription parent;
private final MultipleAssignmentSubscription child;
private long latest;
private boolean stopped;
private boolean hasLatest;

public SwitchObserver(Observer<? super T> observer, SafeObservableSubscription parent) {
public SwitchObserver(Observer<? super T> observer, SafeObservableSubscription parent,
MultipleAssignmentSubscription child) {
this.observer = observer;
this.parent = parent;
}

@Override
public void onCompleted() {
unsubscribeFromSubSequence();
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
unsubscribeFromSubSequence();
observer.onError(e);
this.child = child;
this.gate = new Object();
}

@Override
public void onNext(Observable<? extends T> args) {
unsubscribeFromSubSequence();
final long id;
synchronized (gate) {
id = ++latest;
this.hasLatest = true;
}

subsequence.set(args.subscribe(new Observer<T>() {
final SafeObservableSubscription sub;
sub = new SafeObservableSubscription();
sub.wrap(args.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
// Do nothing.
public void onNext(T args) {
synchronized (gate) {
if (latest == id) {
SwitchObserver.this.observer.onNext(args);
}
}
}

@Override
public void onError(Throwable e) {
parent.unsubscribe();
observer.onError(e);
synchronized (gate) {
sub.unsubscribe();
if (latest == id) {
SwitchObserver.this.observer.onError(e);
SwitchObserver.this.parent.unsubscribe();
}
}
}

@Override
public void onNext(T args) {
observer.onNext(args);
public void onCompleted() {
synchronized (gate) {
sub.unsubscribe();
if (latest == id) {
SwitchObserver.this.hasLatest = false;
}

if (stopped) {
SwitchObserver.this.observer.onCompleted();
SwitchObserver.this.parent.unsubscribe();
}

}
}

}));

this.child.setSubscription(sub);
}

private void unsubscribeFromSubSequence() {
Subscription previousSubscription = subsequence.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
@Override
public void onError(Throwable e) {
synchronized (gate) {
this.observer.onError(e);
}

this.parent.unsubscribe();
}

@Override
public void onCompleted() {
synchronized (gate) {
this.stopped = true;
if (!this.hasLatest) {
this.observer.onCompleted();
this.parent.unsubscribe();
}
}
}

}

public static class UnitTest {

private TestScheduler scheduler;
private TestScheduler scheduler;
private Observer<String> observer;

@Before
Expand All @@ -141,6 +195,83 @@ public void before() {
observer = mock(Observer.class);
}

@Test
public void testSwitchWhenOuterCompleteBeforeInner() {
Observable<Observable<String>> source = Observable.create(new OnSubscribeFunc<Observable<String>>() {
@Override
public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
publishNext(observer, 50, Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
publishNext(observer, 70, "one");
publishNext(observer, 100, "two");
publishCompleted(observer, 200);
return Subscriptions.empty();
}
}));
publishCompleted(observer, 60);

return Subscriptions.empty();
}
});

Observable<String> sampled = Observable.create(OperationSwitch.switchDo(source));
sampled.subscribe(observer);

InOrder inOrder = inOrder(observer);

scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(2)).onNext(anyString());
inOrder.verify(observer, times(1)).onCompleted();
}

@Test
public void testSwitchWhenInnerCompleteBeforeOuter() {
Observable<Observable<String>> source = Observable.create(new OnSubscribeFunc<Observable<String>>() {
@Override
public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
publishNext(observer, 10, Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
publishNext(observer, 0, "one");
publishNext(observer, 10, "two");
publishCompleted(observer, 20);
return Subscriptions.empty();
}
}));

publishNext(observer, 100, Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
publishNext(observer, 0, "three");
publishNext(observer, 10, "four");
publishCompleted(observer, 20);
return Subscriptions.empty();
}
}));
publishCompleted(observer, 200);

return Subscriptions.empty();
}
});

Observable<String> sampled = Observable.create(OperationSwitch.switchDo(source));
sampled.subscribe(observer);

InOrder inOrder = inOrder(observer);

scheduler.advanceTimeTo(150, TimeUnit.MILLISECONDS);
inOrder.verify(observer, never()).onCompleted();
inOrder.verify(observer, times(1)).onNext("one");
inOrder.verify(observer, times(1)).onNext("two");
inOrder.verify(observer, times(1)).onNext("three");
inOrder.verify(observer, times(1)).onNext("four");

scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS);
inOrder.verify(observer, never()).onNext(anyString());
inOrder.verify(observer, times(1)).onCompleted();
}

@Test
public void testSwitchWithComplete() {
Observable<Observable<String>> source = Observable.create(new OnSubscribeFunc<Observable<String>>() {
Expand All @@ -149,7 +280,7 @@ public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
publishNext(observer, 50, Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
publishNext(observer, 50, "one");
publishNext(observer, 60, "one");
publishNext(observer, 100, "two");
return Subscriptions.empty();
}
Expand Down Expand Up @@ -196,8 +327,8 @@ public Subscription onSubscribe(Observer<? super String> observer) {
verify(observer, never()).onError(any(Throwable.class));

scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS);
inOrder.verify(observer, never()).onNext(anyString());
verify(observer, times(1)).onCompleted();
inOrder.verify(observer, times(1)).onNext("four");
verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}

Expand Down