Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator Window and other changes #1138

Merged
merged 2 commits into from
May 5, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7238,7 +7238,7 @@ public final Observable<T> unsubscribeOn(Scheduler scheduler) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector) {
return create(OperationWindow.window(this, closingSelector));
return lift(new OperatorWindowWithObservable<T, TClosing>(closingSelector));
}

/**
Expand All @@ -7255,7 +7255,7 @@ public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observa
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(int count) {
return create(OperationWindow.window(this, count));
return lift(new OperatorWindowWithSize<T>(count, count));
}

/**
Expand All @@ -7275,7 +7275,7 @@ public final Observable<Observable<T>> window(int count) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(int count, int skip) {
return create(OperationWindow.window(this, count, skip));
return lift(new OperatorWindowWithSize<T>(count, skip));
}

/**
Expand All @@ -7297,7 +7297,7 @@ public final Observable<Observable<T>> window(int count, int skip) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(long timespan, long timeshift, TimeUnit unit) {
return create(OperationWindow.window(this, timespan, timeshift, unit));
return lift(new OperatorWindowWithTime<T>(timespan, timeshift, unit, Integer.MAX_VALUE, Schedulers.computation()));
}

/**
Expand All @@ -7321,7 +7321,7 @@ public final Observable<Observable<T>> window(long timespan, long timeshift, Tim
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) {
return create(OperationWindow.window(this, timespan, timeshift, unit, scheduler));
return lift(new OperatorWindowWithTime<T>(timespan, timeshift, unit, Integer.MAX_VALUE, scheduler));
}

/**
Expand All @@ -7342,7 +7342,7 @@ public final Observable<Observable<T>> window(long timespan, long timeshift, Tim
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(long timespan, TimeUnit unit) {
return create(OperationWindow.window(this, timespan, unit));
return lift(new OperatorWindowWithTime<T>(timespan, timespan, unit, Integer.MAX_VALUE, Schedulers.computation()));
}

/**
Expand All @@ -7367,7 +7367,7 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, int count) {
return create(OperationWindow.window(this, timespan, unit, count));
return lift(new OperatorWindowWithTime<T>(timespan, timespan, unit, count, Schedulers.computation()));
}

/**
Expand All @@ -7394,7 +7394,7 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit, int
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, int count, Scheduler scheduler) {
return create(OperationWindow.window(this, timespan, unit, count, scheduler));
return lift(new OperatorWindowWithTime<T>(timespan, timespan, unit, count, scheduler));
}

/**
Expand All @@ -7417,7 +7417,7 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit, int
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, Scheduler scheduler) {
return create(OperationWindow.window(this, timespan, unit, scheduler));
return lift(new OperatorWindowWithTime<T>(timespan, timespan, unit, Integer.MAX_VALUE, scheduler));
}

/**
Expand All @@ -7437,7 +7437,7 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit, Sche
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
*/
public final <TOpening, TClosing> Observable<Observable<T>> window(Observable<? extends TOpening> windowOpenings, Func1<? super TOpening, ? extends Observable<? extends TClosing>> closingSelector) {
return create(OperationWindow.window(this, windowOpenings, closingSelector));
return lift(new OperatorWindowWithStartEndObservable<T, TOpening, TClosing>(windowOpenings, closingSelector));
}

/**
Expand All @@ -7455,7 +7455,7 @@ public final <TOpening, TClosing> Observable<Observable<T>> window(Observable<?
* where the boundary of each window is determined by the items emitted from the {@code boundary} Observable
*/
public final <U> Observable<Observable<T>> window(Observable<U> boundary) {
return create(OperationWindow.window(this, boundary));
return lift(new OperatorWindowWithObservable<T, U>(boundary));
}

/**
Expand Down
113 changes: 57 additions & 56 deletions rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
package rx.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.observers.Subscribers;
import rx.subscriptions.Subscriptions;

/**
* A solution to the "time gap" problem that occurs with `groupBy` and `pivot` => https://github.com/Netflix/RxJava/issues/844
Expand All @@ -43,44 +47,72 @@
public class BufferUntilSubscriber<T> extends Observable<T> implements Observer<T> {

public static <T> BufferUntilSubscriber<T> create() {
return new BufferUntilSubscriber<T>(new AtomicReference<Observer<? super T>>(new BufferedObserver<T>()));
State<T> state = new State<T>();
return new BufferUntilSubscriber<T>(state);
}

private final AtomicReference<Observer<? super T>> observerRef;
/** The common state. */
static final class State<T> {
/** Lite notifications of type T. */
final NotificationLite<T> nl = NotificationLite.instance();
/** The first observer or the one which buffers until the first arrives. */
final AtomicReference<Observer<? super T>> observerRef = new AtomicReference<Observer<? super T>>(new BufferedObserver<T>());
/** Allow a single subscriber only. */
final AtomicBoolean first = new AtomicBoolean();
}

static final class OnSubscribeAction<T> implements OnSubscribe<T> {
final State<T> state;

private BufferUntilSubscriber(final AtomicReference<Observer<? super T>> observerRef) {
super(new OnSubscribe<T>() {
public OnSubscribeAction(State<T> state) {
this.state = state;
}

@Override
public void call(Subscriber<? super T> s) {
@Override
public void call(final Subscriber<? super T> s) {
if (state.first.compareAndSet(false, true)) {
// drain queued notifications before subscription
// we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer
BufferedObserver<T> buffered = (BufferedObserver<T>) observerRef.get();
Object o = null;
BufferedObserver<? super T> buffered = (BufferedObserver<? super T>)state.observerRef.get();
Object o;
while ((o = buffered.buffer.poll()) != null) {
emit(s, o);
state.nl.accept(s, o);
}
// register real observer for pass-thru ... and drain any further events received on first notification
observerRef.set(new PassThruObserver<T>(s, buffered.buffer, observerRef));
state.observerRef.set(new PassThruObserver<T>(s, buffered.buffer, state.observerRef));
s.add(Subscriptions.create(new Action0() {
@Override
public void call() {
state.observerRef.set(Subscribers.empty());
}
}));
} else {
s.onError(new IllegalStateException("Only one subscriber allowed!"));
}

});
this.observerRef = observerRef;
}

}
final State<T> state;

private BufferUntilSubscriber(State<T> state) {
super(new OnSubscribeAction<T>(state));
this.state = state;
}

@Override
public void onCompleted() {
observerRef.get().onCompleted();
state.observerRef.get().onCompleted();
}

@Override
public void onError(Throwable e) {
observerRef.get().onError(e);
state.observerRef.get().onError(e);
}

@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public void onNext(T t) {
observerRef.get().onNext(t);
state.observerRef.get().onNext(t);
}

/**
Expand All @@ -97,6 +129,7 @@ private static class PassThruObserver<T> extends Subscriber<T> {
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
private final ConcurrentLinkedQueue<Object> buffer;
private final AtomicReference<Observer<? super T>> observerRef;
private final NotificationLite<T> nl = NotificationLite.instance();

PassThruObserver(Observer<? super T> actual, ConcurrentLinkedQueue<Object> buffer, AtomicReference<Observer<? super T>> observerRef) {
this.actual = actual;
Expand All @@ -123,67 +156,35 @@ public void onNext(T t) {
}

private void drainIfNeededAndSwitchToActual() {
Object o = null;
Object o;
while ((o = buffer.poll()) != null) {
emit(this, o);
nl.accept(this, o);
}
// now we can safely change over to the actual and get rid of the pass-thru
observerRef.set(actual);
// but only if not unsubscribed
observerRef.compareAndSet(this, actual);
}

}

private static class BufferedObserver<T> extends Subscriber<T> {
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
private final NotificationLite<T> nl = NotificationLite.instance();

@Override
public void onCompleted() {
buffer.add(COMPLETE_SENTINEL);
buffer.add(nl.completed());
}

@Override
public void onError(Throwable e) {
buffer.add(new ErrorSentinel(e));
buffer.add(nl.error(e));
}

@Override
public void onNext(T t) {
if (t == null) {
buffer.add(NULL_SENTINEL);
} else {
buffer.add(t);
}
}

}

private final static <T> void emit(Observer<T> s, Object v) {
if (v instanceof Sentinel) {
if (v == NULL_SENTINEL) {
s.onNext(null);
} else if (v == COMPLETE_SENTINEL) {
s.onCompleted();
} else if (v instanceof ErrorSentinel) {
s.onError(((ErrorSentinel) v).e);
}
} else {
s.onNext((T) v);
buffer.add(nl.next(t));
}
}

private static class Sentinel {

}

private static Sentinel NULL_SENTINEL = new Sentinel();
private static Sentinel COMPLETE_SENTINEL = new Sentinel();

private static class ErrorSentinel extends Sentinel {
final Throwable e;

ErrorSentinel(Throwable e) {
this.e = e;
}
}

}
Loading