From e6bb70ceaf9fd1bf4893a37bf165088bfe1fcca7 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 22 Nov 2013 21:46:13 +0100 Subject: [PATCH] Operation GroupByUntil --- rxjava-core/src/main/java/rx/Observable.java | 26 ++ .../rx/operators/OperationGroupByUntil.java | 259 +++++++++++++++ .../operators/OperationGroupByUntilTest.java | 305 ++++++++++++++++++ 3 files changed, 590 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperationGroupByUntilTest.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 56a4664654..2daf88573f 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -50,6 +50,7 @@ import rx.operators.OperationFinally; import rx.operators.OperationFirstOrDefault; import rx.operators.OperationGroupBy; +import rx.operators.OperationGroupByUntil; import rx.operators.OperationInterval; import rx.operators.OperationLast; import rx.operators.OperationMap; @@ -120,6 +121,7 @@ import rx.util.functions.Func9; import rx.util.functions.FuncN; import rx.util.functions.Function; +import rx.util.functions.Functions; /** * The Observable interface that implements the Reactive Pattern. @@ -5690,4 +5692,28 @@ private boolean isInternalImplementation(Object o) { } } + /** + * Groups the elements of an observable sequence according to a specified key selector function until the duration observable expires for the key. + * @param keySelector A function to extract the key for each element. + * @param durationSelector A function to signal the expiration of a group. + * @return A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. + * + * @see MSDN: Observable.GroupByUntil + */ + public Observable> groupByUntil(Func1 keySelector, Func1, Observable> durationSelector) { + return groupByUntil(keySelector, Functions.identity(), durationSelector); + } + /** + * Groups the elements of an observable sequence according to a specified key and value selector function until the duration observable expires for the key. + * @param keySelector A function to extract the key for each element. + * @param valueSelector A function to map each source element to an element in an observable group. + * @param durationSelector A function to signal the expiration of a group. + * @return A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. + * + * @see MSDN: Observable.GroupByUntil + */ + public Observable> groupByUntil(Func1 keySelector, Func1 valueSelector, Func1, Observable> durationSelector) { + return create(new OperationGroupByUntil(this, keySelector, valueSelector, durationSelector)); + } + } diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java b/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java new file mode 100644 index 0000000000..4450bcbc28 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java @@ -0,0 +1,259 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.observables.GroupedObservable; +import rx.subjects.PublishSubject; +import rx.subjects.Subject; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.SerialSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; + +/** + * Groups the elements of an observable sequence according to a specified key selector, value selector and duration selector function. + * + * @see MSDN: Observable.GroupByUntil + * @see MSDN: Observable.GroupByUntil + */ +public class OperationGroupByUntil implements OnSubscribeFunc> { + final Observable source; + final Func1 keySelector; + final Func1 valueSelector; + final Func1, Observable> durationSelector; + public OperationGroupByUntil(Observable source, + Func1 keySelector, + Func1 valueSelector, + Func1, Observable> durationSelector) { + this.source = source; + this.keySelector = keySelector; + this.valueSelector = valueSelector; + this.durationSelector = durationSelector; + } + + @Override + public Subscription onSubscribe(Observer> t1) { + SerialSubscription cancel = new SerialSubscription(); + ResultSink sink = new ResultSink(t1, cancel); + cancel.setSubscription(sink.run()); + return cancel; + } + /** The source value sink and group manager. */ + class ResultSink implements Observer { + /** Guarded by gate. */ + protected final Observer> observer; + protected final Subscription cancel; + protected final CompositeSubscription group = new CompositeSubscription(); + protected final Object gate = new Object(); + /** Guarded by gate. */ + protected final Map> map = new HashMap>(); + public ResultSink(Observer> observer, Subscription cancel) { + this.observer = observer; + this.cancel = cancel; + } + /** Prepare the subscription tree. */ + public Subscription run() { + SerialSubscription toSource = new SerialSubscription(); + group.add(toSource); + + toSource.setSubscription(source.subscribe(this)); + + return group; + } + + @Override + public void onNext(TSource args) { + TKey key; + TResult value; + try { + key = keySelector.call(args); + value = valueSelector.call(args); + } catch (Throwable t) { + onError(t); + return; + } + + GroupSubject g; + boolean newGroup = false; + synchronized (key) { + g = map.get(key); + if (g == null) { + g = create(key); + map.put(key, g); + newGroup = true; + } + } + + if (newGroup) { + Observable duration; + try { + duration = durationSelector.call(g); + } catch (Throwable t) { + onError(t); + return; + } + + synchronized (gate) { + observer.onNext(g); + } + + SerialSubscription durationHandle = new SerialSubscription(); + group.add(durationHandle); + + DurationObserver durationObserver = new DurationObserver(key, durationHandle); + durationHandle.setSubscription(duration.subscribe(durationObserver)); + + } + + synchronized (gate) { + g.onNext(value); + } + } + + @Override + public void onError(Throwable e) { + synchronized (gate) { + List> gs = new ArrayList>(map.values()); + map.clear(); + for (GroupSubject g : gs) { + g.onError(e); + } + observer.onError(e); + } + cancel.unsubscribe(); + } + + @Override + public void onCompleted() { + synchronized (gate) { + List> gs = new ArrayList>(map.values()); + map.clear(); + for (GroupSubject g : gs) { + g.onCompleted(); + } + observer.onCompleted(); + } + cancel.unsubscribe(); + } + /** Create a new group. */ + public GroupSubject create(TKey key) { + PublishSubject publish = PublishSubject.create(); + return new GroupSubject(key, publish); + } + /** Terminate a group. */ + public void expire(TKey key, Subscription handle) { + synchronized (gate) { + GroupSubject g = map.remove(key); + if (g != null) { + g.onCompleted(); + } + } + handle.unsubscribe(); + } + /** Observe the completion of a group. */ + class DurationObserver implements Observer { + final TKey key; + final Subscription handle; + public DurationObserver(TKey key, Subscription handle) { + this.key = key; + this.handle = handle; + } + @Override + public void onNext(TDuration args) { + expire(key, handle); + } + + @Override + public void onError(Throwable e) { + ResultSink.this.onError(e); + } + + @Override + public void onCompleted() { + expire(key, handle); + } + + } + } + protected static OnSubscribeFunc neverSubscribe() { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer t1) { + return Subscriptions.empty(); + } + }; + } + /** A grouped observable with subject-like behavior. */ + public static class GroupSubject extends GroupedObservable implements Observer { + protected final Subject publish; + protected Throwable error; + protected boolean done; + protected final Object sgate = new Object(); + public GroupSubject(K key, Subject publish) { + super(key, OperationGroupByUntil.neverSubscribe()); + this.publish = publish; + } + + @Override + public Subscription subscribe(Observer observer) { + // handle escaped group subjects + synchronized (sgate) { + if (done) { + if (error != null) { + observer.onError(error); + } else { + observer.onCompleted(); + } + return Subscriptions.empty(); + } + return publish.subscribe(observer); + } + } + + @Override + public void onNext(V args) { + publish.onNext(args); + } + + @Override + public void onError(Throwable e) { + synchronized (sgate) { + if (!done) { + done = true; + error = e; + } + } + publish.onError(e); + } + + @Override + public void onCompleted() { + synchronized (sgate) { + done = true; + } + publish.onCompleted(); + } + + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationGroupByUntilTest.java b/rxjava-core/src/test/java/rx/operators/OperationGroupByUntilTest.java new file mode 100644 index 0000000000..4eca6c6cea --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationGroupByUntilTest.java @@ -0,0 +1,305 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; +import static org.mockito.Mockito.*; +import org.mockito.MockitoAnnotations; +import rx.Observable; +import rx.Observer; +import rx.observables.GroupedObservable; +import rx.util.functions.Action1; +import rx.util.functions.Func1; +import rx.util.functions.Functions; + +/** + * + */ +public class OperationGroupByUntilTest { + @Mock + Observer observer; + Func1 just(final R value) { + return new Func1() { + @Override + public R call(T t1) { + return value; + } + }; + } + Func1 fail(T dummy) { + return new Func1() { + @Override + public T call(Integer t1) { + throw new RuntimeException("Forced failure"); + } + }; + } + Func1 fail2(R dummy2) { + return new Func1() { + @Override + public R call(T t1) { + throw new RuntimeException("Forced failure"); + } + }; + } + Func1 dbl = new Func1() { + @Override + public Integer call(Integer t1) { + return t1 * 2; + } + }; + Func1 identity = Functions.identity(); + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + @Test + public void normalBehavior() { + Observable source = Observable.from(Arrays.asList( + " foo", + " FoO ", + "baR ", + "foO ", + " Baz ", + " qux ", + " bar", + " BAR ", + "FOO ", + "baz ", + " bAZ ", + " fOo " + )); + + Func1, Observable> duration = new Func1, Observable>() { + @Override + public Observable call(GroupedObservable t1) { + return t1.skip(2); + } + }; + Func1, String> getkey = new Func1, String>() { + + @Override + public String call(GroupedObservable t1) { + return t1.getKey(); + } + + }; + Func1 keysel = new Func1() { + @Override + public String call(String t1) { + return t1.trim().toLowerCase(); + } + }; + Func1 valuesel = new Func1() { + @Override + public String call(String t1) { + return t1 + t1; + } + }; + + Observable m = source.groupByUntil( + keysel, valuesel, + duration).map(getkey); + + m.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("foo"); + inOrder.verify(observer, times(1)).onNext("bar"); + inOrder.verify(observer, times(1)).onNext("baz"); + inOrder.verify(observer, times(1)).onNext("qux"); + inOrder.verify(observer, times(1)).onNext("foo"); + inOrder.verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + @Test + public void behaveAsGroupBy() { + Observable source = Observable.from(0, 1, 2, 3, 4, 5, 6); + + Func1, Observable> duration = just(Observable.never()); + + Observable> m = source.groupByUntil( + identity, dbl, + duration); + + final Map actual = new HashMap(); + + m.subscribe(new Action1>() { + @Override + public void call(final GroupedObservable t1) { + t1.subscribe(new Action1() { + @Override + public void call(Integer t2) { + actual.put(t1.getKey(), t2); + } + }); + } + }); + + Map expected = new HashMap(); + for (int i = 0; i < 7; i++) { + expected.put(i, i * 2); + } + + Assert.assertEquals(expected, actual); + } + @Test + public void keySelectorThrows() { + Observable source = Observable.from(0, 1, 2, 3, 4, 5, 6); + + Func1, Observable> duration = just(Observable.never()); + + Observable> m = source.groupByUntil( + fail(0), dbl, + duration); + + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void valueSelectorThrows() { + Observable source = Observable.from(0, 1, 2, 3, 4, 5, 6); + + Func1, Observable> duration = just(Observable.never()); + + Observable> m = source.groupByUntil( + identity, fail(0), + duration); + + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void durationSelectorThrows() { + Observable source = Observable.from(0, 1, 2, 3, 4, 5, 6); + + Func1, Observable> duration = fail2((Observable)null); + + Observable> m = source.groupByUntil( + identity, dbl, + duration); + + m.subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + + } + @Test + public void durationThrows() { + Observable source = Observable.from(0, 1, 2, 3, 4, 5, 6); + + Func1, Integer> getkey = new Func1, Integer>() { + + @Override + public Integer call(GroupedObservable t1) { + return t1.getKey(); + } + + }; + Func1, Observable> duration = just(Observable.error(new RuntimeException("Forced failure"))); + + Observable m = source.groupByUntil( + identity, dbl, + duration).map(getkey); + + m.subscribe(observer); + + verify(observer, times(1)).onNext(0); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onNext(1); + verify(observer, never()).onNext(2); + verify(observer, never()).onNext(3); + verify(observer, never()).onNext(4); + verify(observer, never()).onNext(5); + verify(observer, never()).onNext(6); + verify(observer, never()).onCompleted(); + + } + @Test + public void innerEscapeCompleted() { + Observable source = Observable.from(0); + + final AtomicReference> inner = new AtomicReference>(); + + Func1, Observable> duration = just(Observable.never()); + + Observable> m = source.groupByUntil(identity, dbl, duration); + + m.subscribe(new Action1>() { + @Override + public void call(GroupedObservable t1) { + inner.set(t1); + } + }); + + inner.get().subscribe(observer); + + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + } + + @Test + public void innerEscapeError() { + Observable source = Observable.concat(Observable.from(0), Observable.error(new RuntimeException("Forced failure"))); + + final AtomicReference> inner = new AtomicReference>(); + + Func1, Observable> duration = just(Observable.never()); + + Observable> m = source.groupByUntil(identity, dbl, duration); + + m.subscribe(new Observer>() { + @Override + public void onNext(GroupedObservable t1) { + inner.set(t1); + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onCompleted() { + } + + }); + + inner.get().subscribe(observer); + + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + verify(observer, never()).onNext(any()); + } +}