-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Operator switchIfEmpty #2091
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,126 @@ | ||||
/** | ||||
* Copyright 2014 Netflix, Inc. | ||||
* | ||||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||||
* you may not use this file except in compliance with the License. | ||||
* You may obtain a copy of the License at | ||||
* | ||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||
* | ||||
* Unless required by applicable law or agreed to in writing, software | ||||
* distributed under the License is distributed on an "AS IS" BASIS, | ||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
* See the License for the specific language governing permissions and | ||||
* limitations under the License. | ||||
*/ | ||||
package rx.internal.operators; | ||||
|
||||
import rx.Observable; | ||||
import rx.Producer; | ||||
import rx.Subscriber; | ||||
|
||||
import java.util.concurrent.atomic.AtomicLong; | ||||
|
||||
/** | ||||
* If the Observable completes without emitting any items, subscribe to an alternate Observable. Allows for similar | ||||
* functionality to {@link rx.internal.operators.OperatorDefaultIfEmpty} except instead of one item being emitted when | ||||
* empty, the results of the given Observable will be emitted. | ||||
*/ | ||||
public class OperatorSwitchIfEmpty<T> implements Observable.Operator<T, T> { | ||||
private final Observable<T> alternate; | ||||
|
||||
public OperatorSwitchIfEmpty(Observable<T> alternate) { | ||||
this.alternate = alternate; | ||||
} | ||||
|
||||
@Override | ||||
public Subscriber<? super T> call(Subscriber<? super T> child) { | ||||
final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child); | ||||
child.add(parent); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem with directly adding this to the child is that the child will retain a reference to a now-dead switcher. Instead, I suggest having a SerialSubscription ssub = new SerialSubscription();
SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child, ssub);
ssub.add(parent);
child.add(ssub);
return parent; The ssub.set(alternate.unsafeSubscribe(new Subscriber<T>() { |
||||
return parent; | ||||
} | ||||
|
||||
private class SwitchIfEmptySubscriber extends Subscriber<T> { | ||||
|
||||
boolean empty = true; | ||||
final AtomicLong consumerCapacity = new AtomicLong(0l); | ||||
|
||||
private final Subscriber<? super T> child; | ||||
|
||||
public SwitchIfEmptySubscriber(Subscriber<? super T> child) { | ||||
this.child = child; | ||||
} | ||||
|
||||
@Override | ||||
public void setProducer(final Producer producer) { | ||||
super.setProducer(new Producer() { | ||||
@Override | ||||
public void request(long n) { | ||||
if (empty) { | ||||
consumerCapacity.set(n); | ||||
} | ||||
producer.request(n); | ||||
} | ||||
}); | ||||
} | ||||
|
||||
@Override | ||||
public void onCompleted() { | ||||
if (!empty) { | ||||
child.onCompleted(); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Edited: you should call @Test
public void testSwitchShouldTriggerUnsubscribe() {
final Subscription s = Subscriptions.empty();
Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(final Subscriber<? super Long> subscriber) {
subscriber.add(s);
subscriber.onCompleted();
}
}).switchIfEmpty(Observable.<Long>never()).subscribe();
assertTrue(s.isUnsubscribed());
} In addition, L38 should change to SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber();
child.add(parent);
return child; There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue with returning child here is that when the first Observable completes, and is empty, those downstream will see the subscription as unsubscribed, no? Also, backpressure won't work as parent is not what we are returning anymore so we won't see any requests, which then should be passed to the alternate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, a typo here. The return value should be parent. |
||||
} else if (!child.isUnsubscribed()) { | ||||
unsubscribe(); | ||||
subscribeToAlternate(); | ||||
} | ||||
} | ||||
|
||||
private void subscribeToAlternate() { | ||||
child.add(alternate.unsafeSubscribe(new Subscriber<T>() { | ||||
|
||||
@Override | ||||
public void setProducer(final Producer producer) { | ||||
child.setProducer(new Producer() { | ||||
@Override | ||||
public void request(long n) { | ||||
producer.request(n); | ||||
} | ||||
}); | ||||
} | ||||
|
||||
@Override | ||||
public void onStart() { | ||||
final long capacity = consumerCapacity.get(); | ||||
if (capacity > 0) { | ||||
request(capacity); | ||||
} | ||||
} | ||||
|
||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As @akarnokd said, you need to override @Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {
@Override
public void request(long n) {
producer.request(n);
}
});
} So that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a good spot I can look for an example test for this? All tests are green ATM so I want to make sure I have it covered... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is a test: @Test
public void testSwitchRequestAlternativeObservable() {
final List<Integer> items = new ArrayList<Integer>();
Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(1);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
items.add(integer);
}
});
assertEquals(Arrays.asList(1), items);
} And its output:
If you do nothing, RxJava/src/main/java/rx/Subscriber.java Line 136 in 3dc4a31
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great, thank you very much for the help. I think I am starting to understand back pressure at least a little better now ;) |
||||
@Override | ||||
public void onCompleted() { | ||||
child.onCompleted(); | ||||
} | ||||
|
||||
@Override | ||||
public void onError(Throwable e) { | ||||
child.onError(e); | ||||
} | ||||
|
||||
@Override | ||||
public void onNext(T t) { | ||||
child.onNext(t); | ||||
} | ||||
})); | ||||
} | ||||
|
||||
@Override | ||||
public void onError(Throwable e) { | ||||
child.onError(e); | ||||
} | ||||
|
||||
@Override | ||||
public void onNext(T t) { | ||||
empty = false; | ||||
child.onNext(t); | ||||
} | ||||
} | ||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
/** | ||
* Copyright 2014 Netflix, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package rx.internal.operators; | ||
|
||
import org.junit.Test; | ||
import rx.Observable; | ||
import rx.Producer; | ||
import rx.Subscriber; | ||
import rx.Subscription; | ||
import rx.functions.Action0; | ||
import rx.subscriptions.Subscriptions; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertFalse; | ||
import static org.junit.Assert.assertTrue; | ||
|
||
public class OperatorSwitchIfEmptyTest { | ||
|
||
@Test | ||
public void testSwitchWhenNotEmpty() throws Exception { | ||
final AtomicBoolean subscribed = new AtomicBoolean(false); | ||
final Observable<Integer> observable = Observable.just(4).switchIfEmpty(Observable.just(2) | ||
.doOnSubscribe(new Action0() { | ||
@Override | ||
public void call() { | ||
subscribed.set(true); | ||
} | ||
})); | ||
|
||
assertEquals(4, observable.toBlocking().single().intValue()); | ||
assertFalse(subscribed.get()); | ||
} | ||
|
||
@Test | ||
public void testSwitchWhenEmpty() throws Exception { | ||
final Observable<Integer> observable = Observable.<Integer>empty().switchIfEmpty(Observable.from(Arrays.asList(42))); | ||
|
||
assertEquals(42, observable.toBlocking().single().intValue()); | ||
} | ||
|
||
@Test | ||
public void testSwitchWithProducer() throws Exception { | ||
final AtomicBoolean emitted = new AtomicBoolean(false); | ||
Observable<Long> withProducer = Observable.create(new Observable.OnSubscribe<Long>() { | ||
@Override | ||
public void call(final Subscriber<? super Long> subscriber) { | ||
subscriber.setProducer(new Producer() { | ||
@Override | ||
public void request(long n) { | ||
if (n > 0 && !emitted.get()) { | ||
emitted.set(true); | ||
subscriber.onNext(42L); | ||
subscriber.onCompleted(); | ||
} | ||
} | ||
}); | ||
} | ||
}); | ||
|
||
final Observable<Long> observable = Observable.<Long>empty().switchIfEmpty(withProducer); | ||
assertEquals(42, observable.toBlocking().single().intValue()); | ||
} | ||
|
||
@Test | ||
public void testSwitchTriggerUnsubscribe() throws Exception { | ||
final Subscription empty = Subscriptions.empty(); | ||
|
||
Observable<Long> withProducer = Observable.create(new Observable.OnSubscribe<Long>() { | ||
@Override | ||
public void call(final Subscriber<? super Long> subscriber) { | ||
subscriber.add(empty); | ||
subscriber.onNext(42L); | ||
} | ||
}); | ||
|
||
final Subscription sub = Observable.<Long>empty().switchIfEmpty(withProducer).lift(new Observable.Operator<Long, Long>() { | ||
@Override | ||
public Subscriber<? super Long> call(final Subscriber<? super Long> child) { | ||
return new Subscriber<Long>(child) { | ||
@Override | ||
public void onCompleted() { | ||
|
||
} | ||
|
||
@Override | ||
public void onError(Throwable e) { | ||
|
||
} | ||
|
||
@Override | ||
public void onNext(Long aLong) { | ||
unsubscribe(); | ||
} | ||
}; | ||
} | ||
}).subscribe(); | ||
|
||
|
||
assertTrue(empty.isUnsubscribed()); | ||
assertTrue(sub.isUnsubscribed()); | ||
} | ||
|
||
@Test | ||
public void testSwitchShouldTriggerUnsubscribe() { | ||
final Subscription s = Subscriptions.empty(); | ||
|
||
Observable.create(new Observable.OnSubscribe<Long>() { | ||
@Override | ||
public void call(final Subscriber<? super Long> subscriber) { | ||
subscriber.add(s); | ||
subscriber.onCompleted(); | ||
} | ||
}).switchIfEmpty(Observable.<Long>never()).subscribe(); | ||
assertTrue(s.isUnsubscribed()); | ||
} | ||
|
||
@Test | ||
public void testSwitchRequestAlternativeObservableWithBackpressure() { | ||
final List<Integer> items = new ArrayList<Integer>(); | ||
|
||
Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(new Subscriber<Integer>() { | ||
|
||
@Override | ||
public void onStart() { | ||
request(1); | ||
} | ||
|
||
@Override | ||
public void onCompleted() { | ||
|
||
} | ||
|
||
@Override | ||
public void onError(Throwable e) { | ||
|
||
} | ||
|
||
@Override | ||
public void onNext(Integer integer) { | ||
items.add(integer); | ||
} | ||
}); | ||
assertEquals(Arrays.asList(1), items); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to add
@Beta
here