Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operation GroupByUntil #511

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import rx.operators.OperationFinally;
import rx.operators.OperationFirstOrDefault;
import rx.operators.OperationGroupBy;
import rx.operators.OperationGroupByUntil;
import rx.operators.OperationInterval;
import rx.operators.OperationLast;
import rx.operators.OperationMap;
Expand Down Expand Up @@ -120,6 +121,7 @@
import rx.util.functions.Func9;
import rx.util.functions.FuncN;
import rx.util.functions.Function;
import rx.util.functions.Functions;

/**
* The Observable interface that implements the Reactive Pattern.
Expand Down Expand Up @@ -5690,4 +5692,28 @@ private boolean isInternalImplementation(Object o) {
}
}

/**
* Groups the elements of an observable sequence according to a specified key selector function until the duration observable expires for the key.
* @param keySelector A function to extract the key for each element.
* @param durationSelector A function to signal the expiration of a group.
* @return A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211932.aspx'>MSDN: Observable.GroupByUntil</a>
*/
public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Func1<T, TKey> keySelector, Func1<GroupedObservable<TKey, T>, Observable<TDuration>> durationSelector) {
return groupByUntil(keySelector, Functions.<T>identity(), durationSelector);
}
/**
* Groups the elements of an observable sequence according to a specified key and value selector function until the duration observable expires for the key.
* @param keySelector A function to extract the key for each element.
* @param valueSelector A function to map each source element to an element in an observable group.
* @param durationSelector A function to signal the expiration of a group.
* @return A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229433.aspx'>MSDN: Observable.GroupByUntil</a>
*/
public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(Func1<T, TKey> keySelector, Func1<T, TValue> valueSelector, Func1<GroupedObservable<TKey, TValue>, Observable<TDuration>> durationSelector) {
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector));
}

}
259 changes: 259 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.observables.GroupedObservable;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

/**
* Groups the elements of an observable sequence according to a specified key selector, value selector and duration selector function.
*
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211932.aspx'>MSDN: Observable.GroupByUntil</a>
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229433.aspx'>MSDN: Observable.GroupByUntil</a>
*/
public class OperationGroupByUntil<TSource, TKey, TResult, TDuration> implements OnSubscribeFunc<GroupedObservable<TKey, TResult>> {
final Observable<TSource> source;
final Func1<TSource, TKey> keySelector;
final Func1<TSource, TResult> valueSelector;
final Func1<GroupedObservable<TKey, TResult>, Observable<TDuration>> durationSelector;
public OperationGroupByUntil(Observable<TSource> source,
Func1<TSource, TKey> keySelector,
Func1<TSource, TResult> valueSelector,
Func1<GroupedObservable<TKey, TResult>, Observable<TDuration>> durationSelector) {
this.source = source;
this.keySelector = keySelector;
this.valueSelector = valueSelector;
this.durationSelector = durationSelector;
}

@Override
public Subscription onSubscribe(Observer<? super GroupedObservable<TKey, TResult>> t1) {
SerialSubscription cancel = new SerialSubscription();
ResultSink sink = new ResultSink(t1, cancel);
cancel.setSubscription(sink.run());
return cancel;
}
/** The source value sink and group manager. */
class ResultSink implements Observer<TSource> {
/** Guarded by gate. */
protected final Observer<? super GroupedObservable<TKey, TResult>> observer;
protected final Subscription cancel;
protected final CompositeSubscription group = new CompositeSubscription();
protected final Object gate = new Object();
/** Guarded by gate. */
protected final Map<TKey, GroupSubject<TKey, TResult>> map = new HashMap<TKey, GroupSubject<TKey, TResult>>();
public ResultSink(Observer<? super GroupedObservable<TKey, TResult>> observer, Subscription cancel) {
this.observer = observer;
this.cancel = cancel;
}
/** Prepare the subscription tree. */
public Subscription run() {
SerialSubscription toSource = new SerialSubscription();
group.add(toSource);

toSource.setSubscription(source.subscribe(this));

return group;
}

@Override
public void onNext(TSource args) {
TKey key;
TResult value;
try {
key = keySelector.call(args);
value = valueSelector.call(args);
} catch (Throwable t) {
onError(t);
return;
}

GroupSubject<TKey, TResult> g;
boolean newGroup = false;
synchronized (key) {
g = map.get(key);
if (g == null) {
g = create(key);
map.put(key, g);
newGroup = true;
}
}

if (newGroup) {
Observable<TDuration> duration;
try {
duration = durationSelector.call(g);
} catch (Throwable t) {
onError(t);
return;
}

synchronized (gate) {
observer.onNext(g);
}

SerialSubscription durationHandle = new SerialSubscription();
group.add(durationHandle);

DurationObserver durationObserver = new DurationObserver(key, durationHandle);
durationHandle.setSubscription(duration.subscribe(durationObserver));

}

synchronized (gate) {
g.onNext(value);
}
}

@Override
public void onError(Throwable e) {
synchronized (gate) {
List<GroupSubject<TKey, TResult>> gs = new ArrayList<GroupSubject<TKey, TResult>>(map.values());
map.clear();
for (GroupSubject<TKey, TResult> g : gs) {
g.onError(e);
}
observer.onError(e);
}
cancel.unsubscribe();
}

@Override
public void onCompleted() {
synchronized (gate) {
List<GroupSubject<TKey, TResult>> gs = new ArrayList<GroupSubject<TKey, TResult>>(map.values());
map.clear();
for (GroupSubject<TKey, TResult> g : gs) {
g.onCompleted();
}
observer.onCompleted();
}
cancel.unsubscribe();
}
/** Create a new group. */
public GroupSubject<TKey, TResult> create(TKey key) {
PublishSubject<TResult> publish = PublishSubject.create();
return new GroupSubject<TKey, TResult>(key, publish);
}
/** Terminate a group. */
public void expire(TKey key, Subscription handle) {
synchronized (gate) {
GroupSubject<TKey, TResult> g = map.remove(key);
if (g != null) {
g.onCompleted();
}
}
handle.unsubscribe();
}
/** Observe the completion of a group. */
class DurationObserver implements Observer<TDuration> {
final TKey key;
final Subscription handle;
public DurationObserver(TKey key, Subscription handle) {
this.key = key;
this.handle = handle;
}
@Override
public void onNext(TDuration args) {
expire(key, handle);
}

@Override
public void onError(Throwable e) {
ResultSink.this.onError(e);
}

@Override
public void onCompleted() {
expire(key, handle);
}

}
}
protected static <T> OnSubscribeFunc<T> neverSubscribe() {
return new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> t1) {
return Subscriptions.empty();
}
};
}
/** A grouped observable with subject-like behavior. */
public static class GroupSubject<K, V> extends GroupedObservable<K, V> implements Observer<V> {
protected final Subject<V, V> publish;
protected Throwable error;
protected boolean done;
protected final Object sgate = new Object();
public GroupSubject(K key, Subject<V, V> publish) {
super(key, OperationGroupByUntil.<V>neverSubscribe());
this.publish = publish;
}

@Override
public Subscription subscribe(Observer<? super V> observer) {
// handle escaped group subjects
synchronized (sgate) {
if (done) {
if (error != null) {
observer.onError(error);
} else {
observer.onCompleted();
}
return Subscriptions.empty();
}
return publish.subscribe(observer);
}
}

@Override
public void onNext(V args) {
publish.onNext(args);
}

@Override
public void onError(Throwable e) {
synchronized (sgate) {
if (!done) {
done = true;
error = e;
}
}
publish.onError(e);
}

@Override
public void onCompleted() {
synchronized (sgate) {
done = true;
}
publish.onCompleted();
}

}
}
Loading