-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Operator switchIfEmpty #2091
Conversation
…nd emits the items in an Observable if the source is empty
@Override | ||
public void onCompleted() { | ||
if (!empty) { | ||
child.onCompleted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Edited: you should call unsubscribe()
before subscribeToAlternate();
. Here is the test case:
@Test
public void testSwitchShouldTriggerUnsubscribe() {
final Subscription s = Subscriptions.empty();
Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(final Subscriber<? super Long> subscriber) {
subscriber.add(s);
subscriber.onCompleted();
}
}).switchIfEmpty(Observable.<Long>never()).subscribe();
assertTrue(s.isUnsubscribed());
}
In addition, L38 should change to
SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber();
child.add(parent);
return child;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue with returning child here is that when the first Observable completes, and is empty, those downstream will see the subscription as unsubscribed, no?
Also, backpressure won't work as parent is not what we are returning anymore so we won't see any requests, which then should be passed to the alternate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, a typo here. The return value should be parent.
Ensure that we unsubscribe upstream "parent" when switching to alternate. That way upstream will trigger unsubscribe when the first Observable completes. Added test. Child should contain downstream subscriptions - not parent.
I think there is a backpressure issue with L78. Child has set its producer to the original source and its requests will not get routed to the alternative. |
request(capacity); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @akarnokd said, you need to override setProducer
here, such as
@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {
@Override
public void request(long n) {
producer.request(n);
}
});
}
So that child
can be set to the new Producer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a good spot I can look for an example test for this? All tests are green ATM so I want to make sure I have it covered...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a test:
@Test
public void testSwitchRequestAlternativeObservable() {
final List<Integer> items = new ArrayList<Integer>();
Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(1);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
items.add(integer);
}
});
assertEquals(Arrays.asList(1), items);
}
And its output:
java.lang.AssertionError:
Expected :[1]
Actual :[1, 2, 3]
If you do nothing, unsafeSubscribe(new Subscriber()...)
will request Long.MAX_VALUE.
See
RxJava/src/main/java/rx/Subscriber.java
Line 136 in 3dc4a31
if (toRequest == Long.MIN_VALUE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, thank you very much for the help. I think I am starting to understand back pressure at least a little better now ;)
@Override | ||
public Subscriber<? super T> call(Subscriber<? super T> child) { | ||
final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child); | ||
child.add(parent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with directly adding this to the child is that the child will retain a reference to a now-dead switcher. Instead, I suggest having a SerialSubscription
:
SerialSubscription ssub = new SerialSubscription();
SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child, ssub);
ssub.add(parent);
child.add(ssub);
return parent;
The subscribeToAlternate
should now start with:
ssub.set(alternate.unsafeSubscribe(new Subscriber<T>() {
Like defaultIfEmpty but subscribes to and emits the items in an Observable if the source is empty.
Fixes #1878