Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Window with Observable: fixed unsubscription and behavior #3039

Merged
merged 1 commit into from
Jul 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -9060,7 +9060,7 @@ public final <U, R> Observable<R> withLatestFrom(Observable<? extends U> other,
* @see <a href="http://reactivex.io/documentation/operators/window.html">ReactiveX operators documentation: Window</a>
*/
public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector) {
return lift(new OperatorWindowWithObservable<T, TClosing>(closingSelector));
return lift(new OperatorWindowWithObservableFactory<T, TClosing>(closingSelector));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
*/
package rx.internal.operators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import rx.Observable;
import java.util.*;

import rx.*;
import rx.Observable.Operator;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Func0;
import rx.observers.SerializedSubscriber;
import rx.observers.Subscribers;

/**
* Creates non-overlapping windows of items where each window is terminated by
Expand All @@ -34,36 +31,21 @@
* @param <U> the boundary value type
*/
public final class OperatorWindowWithObservable<T, U> implements Operator<Observable<T>, T> {
final Func0<? extends Observable<? extends U>> otherFactory;
final Observable<U> other;

public OperatorWindowWithObservable(Func0<? extends Observable<? extends U>> otherFactory) {
this.otherFactory = otherFactory;
}
public OperatorWindowWithObservable(final Observable<U> other) {
this.otherFactory = new Func0<Observable<U>>() {

@Override
public Observable<U> call() {
return other;
}

};
this.other = other;
}

@Override
public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {

Observable<? extends U> other;
try {
other = otherFactory.call();
} catch (Throwable e) {
child.onError(e);
return Subscribers.empty();
}

SourceSubscriber<T> sub = new SourceSubscriber<T>(child);
BoundarySubscriber<T, U> bs = new BoundarySubscriber<T, U>(child, sub);

child.add(sub);
child.add(bs);

sub.replaceWindow();

other.unsafeSubscribe(bs);
Expand All @@ -88,7 +70,6 @@ static final class SourceSubscriber<T> extends Subscriber<T> {
List<Object> queue;

public SourceSubscriber(Subscriber<? super Observable<T>> child) {
super(child);
this.child = new SerializedSubscriber<Observable<T>>(child);
this.guard = new Object();
}
Expand Down Expand Up @@ -288,7 +269,6 @@ void error(Throwable e) {
static final class BoundarySubscriber<T, U> extends Subscriber<U> {
final SourceSubscriber<T> sub;
public BoundarySubscriber(Subscriber<?> child, SourceSubscriber<T> sub) {
super(child);
this.sub = sub;
}

Expand Down
Loading