From 758fe33b8d6031209a39fe5f2a87cd27240a443c Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 19 Feb 2015 10:22:24 +0100 Subject: [PATCH 1/2] Operator WithLatestFrom --- src/main/java/rx/Observable.java | 24 ++ .../operators/OperatorWithLatestFrom.java | 103 ++++++ .../rx/observers/SerializedSubscriber.java | 12 +- .../operators/OperatorWithLatestFromTest.java | 298 ++++++++++++++++++ 4 files changed, 436 insertions(+), 1 deletion(-) create mode 100644 src/main/java/rx/internal/operators/OperatorWithLatestFrom.java create mode 100644 src/test/java/rx/internal/operators/OperatorWithLatestFromTest.java diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 3520f5d808..9c602bdd07 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -8769,6 +8769,30 @@ public final Observable unsubscribeOn(Scheduler scheduler) { return lift(new OperatorUnsubscribeOn(scheduler)); } + /** + * Merges the specified observable sequence into this Observable sequence by using the resultSelector + * function only when the source observable sequence (this instance) produces an element. + *
+     * ----A-------B------C----->  o1
+     *
+     * --0----1-2----3-4-------->  o2
+     *
+     *     |       |      |
+     *     V       V      V
+     *
+     *   (A,0)   (B,2)  (C,4)
+     * 
+ * @param other the other observable sequence + * @param resultSelector the function to call when this Observable emits an element and the other + * observable sequence has already emitted a value. + * @return an Observable that merges the specified observable sequence into this Observable sequence + * by using the resultSelector function only when the source observable sequence + * (this instance) produces an element + */ + public final Observable withLatestFrom(Observable other, Func2 resultSelector) { + return lift(new OperatorWithLatestFrom(other, resultSelector)); + } + /** * Returns an Observable that emits windows of items it collects from the source Observable. The resulting * Observable emits connected, non-overlapping windows. It emits the current window and opens a new one diff --git a/src/main/java/rx/internal/operators/OperatorWithLatestFrom.java b/src/main/java/rx/internal/operators/OperatorWithLatestFrom.java new file mode 100644 index 0000000000..4bf610b6b1 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorWithLatestFrom.java @@ -0,0 +1,103 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.operators; + +import java.util.concurrent.atomic.AtomicReference; + +import rx.*; +import rx.Observable.Operator; +import rx.functions.Func2; +import rx.observers.SerializedSubscriber; + +/** + * Combines values from two sources only when the main source emits. + * @param the element type of the main observable + * @param the element type of the other observable that is merged into the main + * @param the result element type + */ +public final class OperatorWithLatestFrom implements Operator { + final Func2 resultSelector; + final Observable other; + /** Indicates the other has not yet emitted a value. */ + static final Object EMPTY = new Object(); + + public OperatorWithLatestFrom(Observable other, Func2 resultSelector) { + this.other = other; + this.resultSelector = resultSelector; + } + @Override + public Subscriber call(Subscriber child) { + // onError and onCompleted may happen either from the main or from other. + final SerializedSubscriber s = new SerializedSubscriber(child, false); + child.add(s); + + final AtomicReference current = new AtomicReference(EMPTY); + + final Subscriber subscriber = new Subscriber(s, true) { + @Override + public void onNext(T t) { + Object o = current.get(); + if (o != EMPTY) { + try { + @SuppressWarnings("unchecked") + U u = (U)o; + R result = resultSelector.call(t, u); + + s.onNext(result); + } catch (Throwable e) { + onError(e); + return; + } + } + } + @Override + public void onError(Throwable e) { + s.onError(e); + s.unsubscribe(); + } + @Override + public void onCompleted() { + s.onCompleted(); + s.unsubscribe(); + } + }; + + Subscriber otherSubscriber = new Subscriber() { + @Override + public void onNext(U t) { + current.set(t); + } + @Override + public void onError(Throwable e) { + s.onError(e); + s.unsubscribe(); + } + @Override + public void onCompleted() { + if (current.get() == EMPTY) { + s.onCompleted(); + s.unsubscribe(); + } + } + }; + s.add(subscriber); + s.add(otherSubscriber); + + other.unsafeSubscribe(otherSubscriber); + + return subscriber; + } +} diff --git a/src/main/java/rx/observers/SerializedSubscriber.java b/src/main/java/rx/observers/SerializedSubscriber.java index c9bde2b6aa..e277506bcd 100644 --- a/src/main/java/rx/observers/SerializedSubscriber.java +++ b/src/main/java/rx/observers/SerializedSubscriber.java @@ -37,7 +37,17 @@ public class SerializedSubscriber extends Subscriber { private final Observer s; public SerializedSubscriber(Subscriber s) { - super(s); + this(s, true); + } + /** + * Constructor for wrapping and serializing a subscriber optionally sharing the same underlying subscription + * list. + * @param s the subscriber to wrap and serialize + * @param shareSubscriptions if {@code true}, the same subscription list is shared between this + * subscriber and {@code s}. + */ + public SerializedSubscriber(Subscriber s, boolean shareSubscriptions) { + super(s, shareSubscriptions); this.s = new SerializedObserver(s); } diff --git a/src/test/java/rx/internal/operators/OperatorWithLatestFromTest.java b/src/test/java/rx/internal/operators/OperatorWithLatestFromTest.java new file mode 100644 index 0000000000..a172158115 --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorWithLatestFromTest.java @@ -0,0 +1,298 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.operators; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.*; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observer; +import rx.exceptions.TestException; +import rx.functions.Func2; +import rx.observers.TestSubscriber; +import rx.subjects.PublishSubject; + +public class OperatorWithLatestFromTest { + static final Func2 COMBINER = new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + return (t1 << 8) + t2; + } + }; + static final Func2 COMBINER_ERROR = new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + throw new TestException("Forced failure"); + } + }; + @Test + public void testSimple() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + Observable result = source.withLatestFrom(other, COMBINER); + + result.subscribe(o); + + source.onNext(1); + inOrder.verify(o, never()).onNext(anyInt()); + + other.onNext(1); + inOrder.verify(o, never()).onNext(anyInt()); + + source.onNext(2); + inOrder.verify(o).onNext((2 << 8) + 1); + + other.onNext(2); + inOrder.verify(o, never()).onNext(anyInt()); + + other.onCompleted(); + inOrder.verify(o, never()).onCompleted(); + + source.onNext(3); + inOrder.verify(o).onNext((3 << 8) + 2); + + source.onCompleted(); + inOrder.verify(o).onCompleted(); + + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testEmptySource() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable result = source.withLatestFrom(other, COMBINER); + + TestSubscriber ts = new TestSubscriber(); + + result.subscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(other.hasObservers()); + + other.onNext(1); + + source.onCompleted(); + + ts.assertNoErrors(); + ts.assertTerminalEvent(); + assertEquals(0, ts.getOnNextEvents().size()); + + assertFalse(source.hasObservers()); + assertFalse(other.hasObservers()); + } + + @Test + public void testEmptyOther() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable result = source.withLatestFrom(other, COMBINER); + + TestSubscriber ts = new TestSubscriber(); + + result.subscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(other.hasObservers()); + + source.onNext(1); + + source.onCompleted(); + + ts.assertNoErrors(); + ts.assertTerminalEvent(); + assertEquals(0, ts.getOnNextEvents().size()); + + assertFalse(source.hasObservers()); + assertFalse(other.hasObservers()); + } + + + @Test + public void testUnsubscription() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable result = source.withLatestFrom(other, COMBINER); + + TestSubscriber ts = new TestSubscriber(); + + result.subscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(other.hasObservers()); + + other.onNext(1); + source.onNext(1); + + ts.unsubscribe(); + + ts.assertReceivedOnNext(Arrays.asList((1 << 8) + 1)); + ts.assertNoErrors(); + assertEquals(0, ts.getOnCompletedEvents().size()); + + assertFalse(source.hasObservers()); + assertFalse(other.hasObservers()); + } + + @Test + public void testSourceThrows() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable result = source.withLatestFrom(other, COMBINER); + + TestSubscriber ts = new TestSubscriber(); + + result.subscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(other.hasObservers()); + + other.onNext(1); + source.onNext(1); + + source.onError(new TestException()); + + ts.assertTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList((1 << 8) + 1)); + assertEquals(1, ts.getOnErrorEvents().size()); + assertTrue(ts.getOnErrorEvents().get(0) instanceof TestException); + + assertFalse(source.hasObservers()); + assertFalse(other.hasObservers()); + } + @Test + public void testOtherThrows() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable result = source.withLatestFrom(other, COMBINER); + + TestSubscriber ts = new TestSubscriber(); + + result.subscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(other.hasObservers()); + + other.onNext(1); + source.onNext(1); + + other.onError(new TestException()); + + ts.assertTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList((1 << 8) + 1)); + assertEquals(1, ts.getOnErrorEvents().size()); + assertTrue(ts.getOnErrorEvents().get(0) instanceof TestException); + + assertFalse(source.hasObservers()); + assertFalse(other.hasObservers()); + } + + @Test + public void testFunctionThrows() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable result = source.withLatestFrom(other, COMBINER_ERROR); + + TestSubscriber ts = new TestSubscriber(); + + result.subscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(other.hasObservers()); + + other.onNext(1); + source.onNext(1); + + ts.assertTerminalEvent(); + assertEquals(0, ts.getOnNextEvents().size()); + assertEquals(1, ts.getOnErrorEvents().size()); + assertTrue(ts.getOnErrorEvents().get(0) instanceof TestException); + + assertFalse(source.hasObservers()); + assertFalse(other.hasObservers()); + } + + @Test + public void testNoDownstreamUnsubscribe() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable result = source.withLatestFrom(other, COMBINER); + + TestSubscriber ts = new TestSubscriber(); + + result.unsafeSubscribe(ts); + + source.onCompleted(); + + assertFalse(ts.isUnsubscribed()); + } + @Test + public void testBackpressure() { + Observable source = Observable.range(1, 10); + PublishSubject other = PublishSubject.create(); + + Observable result = source.withLatestFrom(other, COMBINER); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onStart() { + request(0); + } + }; + + result.subscribe(ts); + + ts.requestMore(1); + + ts.assertReceivedOnNext(Collections.emptyList()); + + other.onNext(1); + + ts.requestMore(1); + + ts.assertReceivedOnNext(Arrays.asList((2 << 8) + 1)); + + ts.requestMore(5); + ts.assertReceivedOnNext(Arrays.asList( + (2 << 8) + 1, (3 << 8) + 1, (4 << 8) + 1, (5 << 8) + 1, + (6 << 8) + 1, (7 << 8) + 1 + )); + + ts.unsubscribe(); + + assertFalse("Other has observers!", other.hasObservers()); + + ts.assertNoErrors(); + } +} From 844cc95842cbdcfc54fd00a87f8f4e8f5b552312 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 19 Feb 2015 13:43:47 +0100 Subject: [PATCH 2/2] Experimental annotation. --- src/main/java/rx/Observable.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 9c602bdd07..b2987239e5 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -8789,6 +8789,7 @@ public final Observable unsubscribeOn(Scheduler scheduler) { * by using the resultSelector function only when the source observable sequence * (this instance) produces an element */ + @Experimental public final Observable withLatestFrom(Observable other, Func2 resultSelector) { return lift(new OperatorWithLatestFrom(other, resultSelector)); }