Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Operator switchIfEmpty #2091

Merged
merged 3 commits into from
Feb 3, 2015
Merged

Conversation

alexwen
Copy link

@alexwen alexwen commented Dec 19, 2014

Like defaultIfEmpty but subscribes to and emits the items in an Observable if the source is empty.

Fixes #1878

…nd emits the items in an Observable if the source is empty
@Override
public void onCompleted() {
if (!empty) {
child.onCompleted();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edited: you should call unsubscribe() before subscribeToAlternate();. Here is the test case:

    @Test
    public void testSwitchShouldTriggerUnsubscribe() {
        final Subscription s = Subscriptions.empty();

        Observable.create(new Observable.OnSubscribe<Long>() {
            @Override
            public void call(final Subscriber<? super Long> subscriber) {
                subscriber.add(s);
                subscriber.onCompleted();
            }
        }).switchIfEmpty(Observable.<Long>never()).subscribe();
        assertTrue(s.isUnsubscribed());
    }

In addition, L38 should change to

SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber();
child.add(parent);
return child;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with returning child here is that when the first Observable completes, and is empty, those downstream will see the subscription as unsubscribed, no?

Also, backpressure won't work as parent is not what we are returning anymore so we won't see any requests, which then should be passed to the alternate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, a typo here. The return value should be parent.

Ensure that we unsubscribe upstream "parent" when switching to alternate. That way upstream will trigger unsubscribe when the first Observable completes. Added test.
Child should contain downstream subscriptions - not parent.
@akarnokd
Copy link
Member

I think there is a backpressure issue with L78. Child has set its producer to the original source and its requests will not get routed to the alternative.

request(capacity);
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @akarnokd said, you need to override setProducer here, such as

            @Override
            public void setProducer(final Producer producer) {
                child.setProducer(new Producer() {
                    @Override
                    public void request(long n) {
                        producer.request(n);
                    }
                });
            }

So that child can be set to the new Producer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good spot I can look for an example test for this? All tests are green ATM so I want to make sure I have it covered...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a test:

    @Test
    public void testSwitchRequestAlternativeObservable() {
        final List<Integer> items = new ArrayList<Integer>();

        Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(new Subscriber<Integer>() {

            @Override
            public void onStart() {
                request(1);
            }

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                items.add(integer);
            }
        });
        assertEquals(Arrays.asList(1), items);
    }

And its output:

java.lang.AssertionError: 
Expected :[1]
Actual   :[1, 2, 3]

If you do nothing, unsafeSubscribe(new Subscriber()...) will request Long.MAX_VALUE.
See

if (toRequest == Long.MIN_VALUE) {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thank you very much for the help. I think I am starting to understand back pressure at least a little better now ;)

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child);
child.add(parent);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with directly adding this to the child is that the child will retain a reference to a now-dead switcher. Instead, I suggest having a SerialSubscription:

SerialSubscription ssub = new SerialSubscription();
SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child, ssub);
ssub.add(parent);
child.add(ssub);
return parent;

The subscribeToAlternate should now start with:

ssub.set(alternate.unsafeSubscribe(new Subscriber<T>() {

@akarnokd akarnokd mentioned this pull request Feb 3, 2015
@akarnokd akarnokd merged commit f2a59c6 into ReactiveX:1.x Feb 3, 2015
@alexwen alexwen deleted the switch_if_empty branch February 3, 2015 15:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants