Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Operator switchIfEmpty #2091

Merged
merged 3 commits into from
Feb 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3736,6 +3736,27 @@ public final Observable<T> defaultIfEmpty(T defaultValue) {
return lift(new OperatorDefaultIfEmpty<T>(defaultValue));
}

/**
* Returns an Observable that emits the items emitted by the source Observable or the items of an alternate Observable if the source Observable
* is empty.
* <p/>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchIfEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Beta:</b></dt>
* <dd>{@code switchIfEmpty} is currently in {@link rx.annotations.Beta} and subject to change.</dd>
* </dl>
*
* @param alternate
* the alternate Observable to subscribe to if the source does not emit any items
* @return an Observable that emits the items emitted by the source Observable or the items of an alternate Observable if the source Observable
* is empty.
*/
@Beta
public final Observable<T> switchIfEmpty(Observable<T> alternate) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add @Beta here

return lift(new OperatorSwitchIfEmpty<T>(alternate));
}

/**
* Returns an Observable that delays the subscription to and emissions from the souce Observable via another
* Observable on a per-item basis.
Expand Down
126 changes: 126 additions & 0 deletions src/main/java/rx/internal/operators/OperatorSwitchIfEmpty.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import rx.Observable;
import rx.Producer;
import rx.Subscriber;

import java.util.concurrent.atomic.AtomicLong;

/**
* If the Observable completes without emitting any items, subscribe to an alternate Observable. Allows for similar
* functionality to {@link rx.internal.operators.OperatorDefaultIfEmpty} except instead of one item being emitted when
* empty, the results of the given Observable will be emitted.
*/
public class OperatorSwitchIfEmpty<T> implements Observable.Operator<T, T> {
private final Observable<T> alternate;

public OperatorSwitchIfEmpty(Observable<T> alternate) {
this.alternate = alternate;
}

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child);
child.add(parent);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with directly adding this to the child is that the child will retain a reference to a now-dead switcher. Instead, I suggest having a SerialSubscription:

SerialSubscription ssub = new SerialSubscription();
SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child, ssub);
ssub.add(parent);
child.add(ssub);
return parent;

The subscribeToAlternate should now start with:

ssub.set(alternate.unsafeSubscribe(new Subscriber<T>() {

return parent;
}

private class SwitchIfEmptySubscriber extends Subscriber<T> {

boolean empty = true;
final AtomicLong consumerCapacity = new AtomicLong(0l);

private final Subscriber<? super T> child;

public SwitchIfEmptySubscriber(Subscriber<? super T> child) {
this.child = child;
}

@Override
public void setProducer(final Producer producer) {
super.setProducer(new Producer() {
@Override
public void request(long n) {
if (empty) {
consumerCapacity.set(n);
}
producer.request(n);
}
});
}

@Override
public void onCompleted() {
if (!empty) {
child.onCompleted();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edited: you should call unsubscribe() before subscribeToAlternate();. Here is the test case:

    @Test
    public void testSwitchShouldTriggerUnsubscribe() {
        final Subscription s = Subscriptions.empty();

        Observable.create(new Observable.OnSubscribe<Long>() {
            @Override
            public void call(final Subscriber<? super Long> subscriber) {
                subscriber.add(s);
                subscriber.onCompleted();
            }
        }).switchIfEmpty(Observable.<Long>never()).subscribe();
        assertTrue(s.isUnsubscribed());
    }

In addition, L38 should change to

SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber();
child.add(parent);
return child;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with returning child here is that when the first Observable completes, and is empty, those downstream will see the subscription as unsubscribed, no?

Also, backpressure won't work as parent is not what we are returning anymore so we won't see any requests, which then should be passed to the alternate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, a typo here. The return value should be parent.

} else if (!child.isUnsubscribed()) {
unsubscribe();
subscribeToAlternate();
}
}

private void subscribeToAlternate() {
child.add(alternate.unsafeSubscribe(new Subscriber<T>() {

@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {
@Override
public void request(long n) {
producer.request(n);
}
});
}

@Override
public void onStart() {
final long capacity = consumerCapacity.get();
if (capacity > 0) {
request(capacity);
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @akarnokd said, you need to override setProducer here, such as

            @Override
            public void setProducer(final Producer producer) {
                child.setProducer(new Producer() {
                    @Override
                    public void request(long n) {
                        producer.request(n);
                    }
                });
            }

So that child can be set to the new Producer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good spot I can look for an example test for this? All tests are green ATM so I want to make sure I have it covered...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a test:

    @Test
    public void testSwitchRequestAlternativeObservable() {
        final List<Integer> items = new ArrayList<Integer>();

        Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(new Subscriber<Integer>() {

            @Override
            public void onStart() {
                request(1);
            }

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                items.add(integer);
            }
        });
        assertEquals(Arrays.asList(1), items);
    }

And its output:

java.lang.AssertionError: 
Expected :[1]
Actual   :[1, 2, 3]

If you do nothing, unsafeSubscribe(new Subscriber()...) will request Long.MAX_VALUE.
See

if (toRequest == Long.MIN_VALUE) {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thank you very much for the help. I think I am starting to understand back pressure at least a little better now ;)

@Override
public void onCompleted() {
child.onCompleted();
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(T t) {
child.onNext(t);
}
}));
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(T t) {
empty = false;
child.onNext(t);
}
}
}
163 changes: 163 additions & 0 deletions src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import org.junit.Test;
import rx.Observable;
import rx.Producer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class OperatorSwitchIfEmptyTest {

@Test
public void testSwitchWhenNotEmpty() throws Exception {
final AtomicBoolean subscribed = new AtomicBoolean(false);
final Observable<Integer> observable = Observable.just(4).switchIfEmpty(Observable.just(2)
.doOnSubscribe(new Action0() {
@Override
public void call() {
subscribed.set(true);
}
}));

assertEquals(4, observable.toBlocking().single().intValue());
assertFalse(subscribed.get());
}

@Test
public void testSwitchWhenEmpty() throws Exception {
final Observable<Integer> observable = Observable.<Integer>empty().switchIfEmpty(Observable.from(Arrays.asList(42)));

assertEquals(42, observable.toBlocking().single().intValue());
}

@Test
public void testSwitchWithProducer() throws Exception {
final AtomicBoolean emitted = new AtomicBoolean(false);
Observable<Long> withProducer = Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(final Subscriber<? super Long> subscriber) {
subscriber.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0 && !emitted.get()) {
emitted.set(true);
subscriber.onNext(42L);
subscriber.onCompleted();
}
}
});
}
});

final Observable<Long> observable = Observable.<Long>empty().switchIfEmpty(withProducer);
assertEquals(42, observable.toBlocking().single().intValue());
}

@Test
public void testSwitchTriggerUnsubscribe() throws Exception {
final Subscription empty = Subscriptions.empty();

Observable<Long> withProducer = Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(final Subscriber<? super Long> subscriber) {
subscriber.add(empty);
subscriber.onNext(42L);
}
});

final Subscription sub = Observable.<Long>empty().switchIfEmpty(withProducer).lift(new Observable.Operator<Long, Long>() {
@Override
public Subscriber<? super Long> call(final Subscriber<? super Long> child) {
return new Subscriber<Long>(child) {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Long aLong) {
unsubscribe();
}
};
}
}).subscribe();


assertTrue(empty.isUnsubscribed());
assertTrue(sub.isUnsubscribed());
}

@Test
public void testSwitchShouldTriggerUnsubscribe() {
final Subscription s = Subscriptions.empty();

Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(final Subscriber<? super Long> subscriber) {
subscriber.add(s);
subscriber.onCompleted();
}
}).switchIfEmpty(Observable.<Long>never()).subscribe();
assertTrue(s.isUnsubscribed());
}

@Test
public void testSwitchRequestAlternativeObservableWithBackpressure() {
final List<Integer> items = new ArrayList<Integer>();

Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(new Subscriber<Integer>() {

@Override
public void onStart() {
request(1);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer integer) {
items.add(integer);
}
});
assertEquals(Arrays.asList(1), items);
}
}