Skip to content

Commit

Permalink
Merge pull request #2655 from akarnokd/Issue2654
Browse files Browse the repository at this point in the history
SwitchOnNext: fix upstream producer replacing the ops own producer
  • Loading branch information
akarnokd committed Feb 16, 2015
2 parents bc1ed77 + 2c2051f commit 94de8ee
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 8 deletions.
5 changes: 3 additions & 2 deletions src/main/java/rx/internal/operators/OperatorSwitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public static <T> OperatorSwitch<T> instance() {
private OperatorSwitch() { }
@Override
public Subscriber<? super Observable<? extends T>> call(final Subscriber<? super T> child) {
return new SwitchSubscriber<T>(child);
SwitchSubscriber<T> sws = new SwitchSubscriber<T>(child);
child.add(sws);
return sws;
}

private static final class SwitchSubscriber<T> extends Subscriber<Observable<? extends T>> {
Expand All @@ -75,7 +77,6 @@ private static final class SwitchSubscriber<T> extends Subscriber<Observable<? e
volatile boolean infinite = false;

public SwitchSubscriber(Subscriber<? super T> child) {
super(child);
s = new SerializedSubscriber<T>(child);
ssub = new SerialSubscription();
child.add(ssub);
Expand Down
56 changes: 50 additions & 6 deletions src/test/java/rx/internal/operators/OperatorSwitchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,25 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;

import rx.*;
import rx.Observable;
import rx.Observer;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.exceptions.TestException;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;

Expand Down Expand Up @@ -530,4 +532,46 @@ public void call(final Subscriber<? super Observable<Integer>> subscriber) {
).take(1).subscribe();
assertTrue("Switch doesn't propagate 'unsubscribe'", isUnsubscribed.get());
}
/** The upstream producer hijacked the switch producer stopping the requests aimed at the inner observables. */
@Test
public void testIssue2654() {
Observable<String> oneItem = Observable.just("Hello").mergeWith(Observable.<String>never());

Observable<String> src = oneItem.switchMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(final String s) {
return Observable.just(s)
.mergeWith(Observable.interval(10, TimeUnit.MILLISECONDS)
.map(new Func1<Long, String>() {
@Override
public String call(Long i) {
return s + " " + i;
}
})).take(250);
}
})
.share()
;

TestSubscriber<String> ts = new TestSubscriber<String>() {
@Override
public void onNext(String t) {
super.onNext(t);
if (getOnNextEvents().size() == 250) {
onCompleted();
unsubscribe();
}
}
};
src.subscribe(ts);

ts.awaitTerminalEvent(10, TimeUnit.SECONDS);

System.out.println("> testIssue2654: " + ts.getOnNextEvents().size());

ts.assertTerminalEvent();
ts.assertNoErrors();

Assert.assertEquals(250, ts.getOnNextEvents().size());
}
}

0 comments on commit 94de8ee

Please sign in to comment.