Skip to content

Commit

Permalink
New Bind Signature and GroupBy Operator
Browse files Browse the repository at this point in the history
- Changed `bind` signature to match the variant discussed at #746 (comment)
- Updated code to new signature.
- Re-implemented GroupBy operator with `bind`
  • Loading branch information
benjchristensen committed Jan 21, 2014
1 parent 723d935 commit 02ccc4d
Show file tree
Hide file tree
Showing 28 changed files with 530 additions and 544 deletions.
105 changes: 42 additions & 63 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationFlatMap;
import rx.operators.OperationGroupBy;
import rx.operators.OperatorGroupBy;
import rx.operators.OperationGroupByUntil;
import rx.operators.OperationGroupJoin;
import rx.operators.OperationInterval;
Expand Down Expand Up @@ -159,7 +159,7 @@
*/
public class Observable<T> {

final Action2<Observer<? super T>, OperatorSubscription> f;
final Action1<Operator<? super T>> f;

/**
* Observable with Function to execute when subscribed to.
Expand All @@ -171,7 +171,7 @@ public class Observable<T> {
* @param onSubscribe
* {@link OnSubscribeFunc} to be executed when {@link #subscribe(Observer)} is called
*/
protected Observable(Action2<Observer<? super T>, OperatorSubscription> f) {
protected Observable(Action1<Operator<? super T>> f) {
this.f = f;
}

Expand All @@ -189,31 +189,6 @@ public static interface OnSubscribeFunc<T> extends Function {
public Subscription onSubscribe(Observer<? super T> t1);
}

public static class OperatorSubscription implements Subscription {

private final CompositeSubscription cs = new CompositeSubscription();

@Override
public void unsubscribe() {
cs.unsubscribe();
}

public static OperatorSubscription create(Subscription s) {
OperatorSubscription _s = new OperatorSubscription();
_s.add(s);
return _s;
}

public boolean isUnsubscribed() {
return cs.isUnsubscribed();
}

public void add(Subscription s) {
cs.add(s);
}

}

private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

/**
Expand Down Expand Up @@ -260,27 +235,27 @@ protected Observable(Action2<Observer<? super T>, OperatorSubscription> f) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#create">RxJava Wiki: create()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.create.aspx">MSDN: Observable.Create</a>
*/
public final static <T> Observable<T> create(final Action2<Observer<? super T>, OperatorSubscription> f) {
public final static <T> Observable<T> create(final Action1<Operator<? super T>> f) {
return new Observable<T>(f);
}

public final static <T> Observable<T> create(final OnSubscribeFunc<T> func) {
return new Observable<T>(new Action2<Observer<? super T>, OperatorSubscription>() {
return new Observable<T>(new Action1<Operator<? super T>>() {

@Override
public void call(Observer<? super T> o, OperatorSubscription s) {
s.add(func.onSubscribe(o));
public void call(Operator<? super T> o) {
o.add(func.onSubscribe(o));
}

});
}

public <R> Observable<R> bind(final Func2<Observer<? super R>, OperatorSubscription, Observer<? super T>> bind) {
return new Observable<R>(new Action2<Observer<? super R>, OperatorSubscription>() {
public <R> Observable<R> bind(final Func1<Operator<? super R>, Operator<? super T>> bind) {
return new Observable<R>(new Action1<Operator<? super R>>() {

@Override
public void call(Observer<? super R> o, OperatorSubscription s) {
f.call(bind.call(o, s), s);
public void call(Operator<? super R> o) {
subscribe(bind.call(o));
}
});
}
Expand Down Expand Up @@ -2878,7 +2853,7 @@ public final static <T> Observable<T> synchronize(Observable<T> source) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229435.aspx">MSDN: Observable.Timer</a>
*/
public final static Observable<Long> timer(long initialDelay, long period, TimeUnit unit) {
return timer(initialDelay, period, unit, Schedulers.threadPoolForComputation());
return timer(initialDelay, period, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -2917,7 +2892,7 @@ public final static Observable<Long> timer(long initialDelay, long period, TimeU
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#timer">RxJava wiki: timer()</a>
*/
public final static Observable<Long> timer(long delay, TimeUnit unit) {
return timer(delay, unit, Schedulers.threadPoolForComputation());
return timer(delay, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -4299,7 +4274,7 @@ public final <U> Observable<T> delay(Func1<? super T, ? extends Observable<U>> i
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229810.aspx">MSDN: Observable.Delay</a>
*/
public final Observable<T> delay(long delay, TimeUnit unit) {
return OperationDelay.delay(this, delay, unit, Schedulers.threadPoolForComputation());
return OperationDelay.delay(this, delay, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -4336,7 +4311,7 @@ public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler)
* amount
*/
public final Observable<T> delaySubscription(long delay, TimeUnit unit) {
return delaySubscription(delay, unit, Schedulers.threadPoolForComputation());
return delaySubscription(delay, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -4795,7 +4770,7 @@ public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? e
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupBy</a>
*/
public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector) {
return create(OperationGroupBy.groupBy(this, keySelector));
return bind(new OperatorGroupBy<K, T>(keySelector));
}

/**
Expand All @@ -4819,7 +4794,7 @@ public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupBy</a>
*/
public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector) {
return create(OperationGroupBy.groupBy(this, keySelector, elementSelector));
return null;

This comment has been minimized.

Copy link
@daveray

daveray Feb 20, 2014

Contributor

Will this be reinstated?

This comment has been minimized.

Copy link
@benjchristensen

benjchristensen Feb 20, 2014

Author Member

Ha, that's funny that it is still sitting there returning null. Good catch!

I remember why I didn't do this ... I was trying to understand why we need this special groupBy when I can just map over the results.

Any good reason to keep this?

This comment has been minimized.

Copy link
@benjchristensen

benjchristensen Feb 20, 2014

Author Member
}

/**
Expand Down Expand Up @@ -5936,7 +5911,7 @@ public final Subject<T, T> call() {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228952.aspx">MSDN: Observable.Replay</a>
*/
public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, int bufferSize, long time, TimeUnit unit) {
return replay(selector, bufferSize, time, unit, Schedulers.threadPoolForComputation());
return replay(selector, bufferSize, time, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -6036,7 +6011,7 @@ public final Subject<T, T> call() {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229526.aspx">MSDN: Observable.Replay</a>
*/
public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, long time, TimeUnit unit) {
return replay(selector, time, unit, Schedulers.threadPoolForComputation());
return replay(selector, time, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -6139,7 +6114,7 @@ public final ConnectableObservable<T> replay(int bufferSize) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229874.aspx">MSDN: Observable.Replay</a>
*/
public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit) {
return replay(bufferSize, time, unit, Schedulers.threadPoolForComputation());
return replay(bufferSize, time, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -6209,7 +6184,7 @@ public final ConnectableObservable<T> replay(int bufferSize, Scheduler scheduler
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229232.aspx">MSDN: Observable.Replay</a>
*/
public final ConnectableObservable<T> replay(long time, TimeUnit unit) {
return replay(time, unit, Schedulers.threadPoolForComputation());
return replay(time, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -6516,7 +6491,7 @@ public final Observable<T> skip(int num) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#skip">RxJava Wiki: skip()</a>
*/
public final Observable<T> skip(long time, TimeUnit unit) {
return skip(time, unit, Schedulers.threadPoolForComputation());
return skip(time, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -6578,7 +6553,7 @@ public final Observable<T> skipLast(int count) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211750.aspx">MSDN: Observable.SkipLast</a>
*/
public final Observable<T> skipLast(long time, TimeUnit unit) {
return skipLast(time, unit, Schedulers.threadPoolForComputation());
return skipLast(time, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -6929,8 +6904,8 @@ public final Observable<T> startWith(T[] values, Scheduler scheduler) {
}

// TODO should this be called `observe` instead of `subscribe`?
public final void subscribe(Observer<? super T> o, Func0<OperatorSubscription> sf) {
f.call(o, sf.call());
public final void subscribe(Operator<? super T> o) {
f.call(o);
}

/**
Expand Down Expand Up @@ -7166,7 +7141,7 @@ public final Subscription subscribe(final Action1<? super T> onNext, Scheduler s
*/
public final Subscription subscribe(Observer<? super T> observer) {
// allow the hook to intercept and/or decorate
Action2<Observer<? super T>, OperatorSubscription> onSubscribeFunction = hook.onSubscribeStart(this, f);
Action1<Operator<? super T>> onSubscribeFunction = hook.onSubscribeStart(this, f);
// validate and proceed
if (observer == null) {
throw new IllegalArgumentException("observer can not be null");
Expand All @@ -7176,17 +7151,20 @@ public final Subscription subscribe(Observer<? super T> observer) {
// the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception
}
try {
OperatorSubscription os = new OperatorSubscription();
Operator<? super T> op = null;
/**
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
if (isInternalImplementation(observer)) {
onSubscribeFunction.call(observer, os);
op = Operator.create(observer, new CompositeSubscription());
onSubscribeFunction.call(op);
} else {
SafeObservableSubscription subscription = new SafeObservableSubscription(os);
onSubscribeFunction.call(new SafeObserver<T>(subscription, observer), os);
// TODO this doesn't seem correct any longer with the Operator and injecting of CompositeSubscription
SafeObservableSubscription subscription = new SafeObservableSubscription(op);
op = Operator.create(new SafeObserver<T>(subscription, observer), new CompositeSubscription());
onSubscribeFunction.call(op);
}
return hook.onSubscribeReturn(this, os);
return hook.onSubscribeReturn(this, op);
} catch (OnErrorNotImplementedException e) {
// special handling when onError is not implemented ... we just rethrow
throw e;
Expand Down Expand Up @@ -7426,7 +7404,7 @@ public final Observable<T> take(final int num) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#take">RxJava Wiki: take()</a>
*/
public final Observable<T> take(long time, TimeUnit unit) {
return take(time, unit, Schedulers.threadPoolForComputation());
return take(time, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -7517,7 +7495,7 @@ public final Observable<T> takeLast(final int count) {
* were emitted in a specified window of time before the Observable completed.
*/
public final Observable<T> takeLast(int count, long time, TimeUnit unit) {
return takeLast(count, time, unit, Schedulers.threadPoolForComputation());
return takeLast(count, time, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -7560,7 +7538,7 @@ public final Observable<T> takeLast(int count, long time, TimeUnit unit, Schedul
* the window of time before the Observable completed specified by {@code time}
*/
public final Observable<T> takeLast(long time, TimeUnit unit) {
return takeLast(time, unit, Schedulers.threadPoolForComputation());
return takeLast(time, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -8735,10 +8713,10 @@ public final <T2, R> Observable<R> zip(Observable<? extends T2> other, Func2<? s
*/
private static class NeverObservable<T> extends Observable<T> {
public NeverObservable() {
super(new Action2<Observer<? super T>, OperatorSubscription>() {
super(new Action1<Operator<? super T>>() {

@Override
public void call(Observer<? super T> observer, OperatorSubscription t2) {
public void call(Operator<? super T> observer) {
// do nothing
}

Expand All @@ -8756,7 +8734,7 @@ public void call(Observer<? super T> observer, OperatorSubscription t2) {
private static class ThrowObservable<T> extends Observable<T> {

public ThrowObservable(final Throwable exception) {
super(new Action2<Observer<? super T>, OperatorSubscription>() {
super(new Action1<Operator<? super T>>() {

/**
* Accepts an {@link Observer} and calls its {@link Observer#onError onError} method.
Expand All @@ -8766,14 +8744,15 @@ public ThrowObservable(final Throwable exception) {
* @return a reference to the subscription
*/
@Override
public void call(Observer<? super T> observer, OperatorSubscription t2) {
public void call(Operator<? super T> observer) {
observer.onError(exception);
}

});
}
}

@SuppressWarnings("rawtypes")
private final static ConcurrentHashMap<Class, Boolean> internalClassMap = new ConcurrentHashMap<Class, Boolean>();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

import rx.Observable;
import rx.Observer;
import rx.Operator;
import rx.Subscription;
import rx.operators.OperationRefCount;
import rx.util.functions.Action2;
import rx.util.functions.Action1;

/**
* A ConnectableObservable resembles an ordinary {@link Observable}, except that it does not begin
Expand All @@ -38,7 +39,7 @@

public abstract class ConnectableObservable<T> extends Observable<T> {

protected ConnectableObservable(Action2<Observer<? super T>, OperatorSubscription> onSubscribe) {
protected ConnectableObservable(Action1<Operator<? super T>> onSubscribe) {
super(onSubscribe);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package rx.observables;

import rx.Observable;
import rx.Observer;
import rx.util.functions.Action2;
import rx.Operator;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

/**
Expand All @@ -33,7 +33,7 @@
public class GroupedObservable<K, T> extends Observable<T> {
private final K key;

public GroupedObservable(K key, Action2<Observer<? super T>, OperatorSubscription> onSubscribe) {
public GroupedObservable(K key, Action1<Operator<? super T>> onSubscribe) {
super(onSubscribe);
this.key = key;
}
Expand Down
Loading

0 comments on commit 02ccc4d

Please sign in to comment.