Skip to content

Commit

Permalink
Fix for back pressure on the alternate subscription.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Wenckus committed Dec 22, 2014
1 parent 6e6c771 commit f2a59c6
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
11 changes: 11 additions & 0 deletions src/main/java/rx/internal/operators/OperatorSwitchIfEmpty.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ public void onCompleted() {

private void subscribeToAlternate() {
child.add(alternate.unsafeSubscribe(new Subscriber<T>() {

@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {
@Override
public void request(long n) {
producer.request(n);
}
});
}

@Override
public void onStart() {
final long capacity = consumerCapacity.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -56,13 +58,15 @@ public void testSwitchWhenEmpty() throws Exception {

@Test
public void testSwitchWithProducer() throws Exception {
final AtomicBoolean emitted = new AtomicBoolean(false);
Observable<Long> withProducer = Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(final Subscriber<? super Long> subscriber) {
subscriber.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0) {
if (n > 0 && !emitted.get()) {
emitted.set(true);
subscriber.onNext(42L);
subscriber.onCompleted();
}
Expand Down Expand Up @@ -127,4 +131,33 @@ public void call(final Subscriber<? super Long> subscriber) {
}).switchIfEmpty(Observable.<Long>never()).subscribe();
assertTrue(s.isUnsubscribed());
}

@Test
public void testSwitchRequestAlternativeObservableWithBackpressure() {
final List<Integer> items = new ArrayList<Integer>();

Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(new Subscriber<Integer>() {

@Override
public void onStart() {
request(1);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer integer) {
items.add(integer);
}
});
assertEquals(Arrays.asList(1), items);
}
}

0 comments on commit f2a59c6

Please sign in to comment.