Skip to content

Commit

Permalink
Merge pull request #2760 from akarnokd/WithLatestFrom
Browse files Browse the repository at this point in the history
Operator: WithLatestFrom
  • Loading branch information
benjchristensen committed Feb 21, 2015
2 parents 48e4db0 + 844cc95 commit c41b37d
Show file tree
Hide file tree
Showing 4 changed files with 437 additions and 1 deletion.
25 changes: 25 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8804,6 +8804,31 @@ public final Observable<T> unsubscribeOn(Scheduler scheduler) {
return lift(new OperatorUnsubscribeOn<T>(scheduler));
}

/**
* Merges the specified observable sequence into this Observable sequence by using the resultSelector
* function only when the source observable sequence (this instance) produces an element.
* <code><pre>
* ----A-------B------C-----> o1
*
* --0----1-2----3-4--------> o2
*
* | | |
* V V V
*
* (A,0) (B,2) (C,4)
* </pre></code>
* @param other the other observable sequence
* @param resultSelector the function to call when this Observable emits an element and the other
* observable sequence has already emitted a value.
* @return an Observable that merges the specified observable sequence into this Observable sequence
* by using the resultSelector function only when the source observable sequence
* (this instance) produces an element
*/
@Experimental
public final <U, R> Observable<R> withLatestFrom(Observable<? extends U> other, Func2<? super T, ? super U, ? extends R> resultSelector) {
return lift(new OperatorWithLatestFrom<T, U, R>(other, resultSelector));
}

/**
* Returns an Observable that emits windows of items it collects from the source Observable. The resulting
* Observable emits connected, non-overlapping windows. It emits the current window and opens a new one
Expand Down
103 changes: 103 additions & 0 deletions src/main/java/rx/internal/operators/OperatorWithLatestFrom.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicReference;

import rx.*;
import rx.Observable.Operator;
import rx.functions.Func2;
import rx.observers.SerializedSubscriber;

/**
* Combines values from two sources only when the main source emits.
* @param <T> the element type of the main observable
* @param <U> the element type of the other observable that is merged into the main
* @param <R> the result element type
*/
public final class OperatorWithLatestFrom<T, U, R> implements Operator<R, T> {
final Func2<? super T, ? super U, ? extends R> resultSelector;
final Observable<? extends U> other;
/** Indicates the other has not yet emitted a value. */
static final Object EMPTY = new Object();

public OperatorWithLatestFrom(Observable<? extends U> other, Func2<? super T, ? super U, ? extends R> resultSelector) {
this.other = other;
this.resultSelector = resultSelector;
}
@Override
public Subscriber<? super T> call(Subscriber<? super R> child) {
// onError and onCompleted may happen either from the main or from other.
final SerializedSubscriber<R> s = new SerializedSubscriber<R>(child, false);
child.add(s);

final AtomicReference<Object> current = new AtomicReference<Object>(EMPTY);

final Subscriber<T> subscriber = new Subscriber<T>(s, true) {
@Override
public void onNext(T t) {
Object o = current.get();
if (o != EMPTY) {
try {
@SuppressWarnings("unchecked")
U u = (U)o;
R result = resultSelector.call(t, u);

s.onNext(result);
} catch (Throwable e) {
onError(e);
return;
}
}
}
@Override
public void onError(Throwable e) {
s.onError(e);
s.unsubscribe();
}
@Override
public void onCompleted() {
s.onCompleted();
s.unsubscribe();
}
};

Subscriber<U> otherSubscriber = new Subscriber<U>() {
@Override
public void onNext(U t) {
current.set(t);
}
@Override
public void onError(Throwable e) {
s.onError(e);
s.unsubscribe();
}
@Override
public void onCompleted() {
if (current.get() == EMPTY) {
s.onCompleted();
s.unsubscribe();
}
}
};
s.add(subscriber);
s.add(otherSubscriber);

other.unsafeSubscribe(otherSubscriber);

return subscriber;
}
}
12 changes: 11 additions & 1 deletion src/main/java/rx/observers/SerializedSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,17 @@ public class SerializedSubscriber<T> extends Subscriber<T> {
private final Observer<T> s;

public SerializedSubscriber(Subscriber<? super T> s) {
super(s);
this(s, true);
}
/**
* Constructor for wrapping and serializing a subscriber optionally sharing the same underlying subscription
* list.
* @param s the subscriber to wrap and serialize
* @param shareSubscriptions if {@code true}, the same subscription list is shared between this
* subscriber and {@code s}.
*/
public SerializedSubscriber(Subscriber<? super T> s, boolean shareSubscriptions) {
super(s, shareSubscriptions);
this.s = new SerializedObserver<T>(s);
}

Expand Down
Loading

0 comments on commit c41b37d

Please sign in to comment.