From 9a1832ef0716cce511f911e38ace1c484266d4bb Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 7 Sep 2013 19:49:03 +0200 Subject: [PATCH 1/4] implemented count operator (#32) --- rxjava-core/src/main/java/rx/Observable.java | 15 +++++++ .../src/test/java/rx/ObservableTests.java | 40 ++++++++++++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 1a72f61501..318f0de971 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2043,6 +2043,21 @@ public Observable reduce(Func2 accumulator return create(OperationScan.scan(this, accumulator)).takeLast(1); } + /** + * Returns an Observable that counts the total number of elements in the source Observable. + * @return an Observable emitting the number of counted elements of the source Observable + * as its single item. + * @see MSDN: Observable.Count + */ + public Observable count() { + return reduce(0, new Func2() { + @Override + public Integer call(Integer t1, T t2) { + return t1 + 1; + } + }); + } + /** * Returns a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable that will replay all of its items and notifications to any future {@link Observer}. diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index ba11919040..22047c7ff0 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -27,7 +27,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import rx.Observable.OnSubscribeFunc; @@ -69,10 +68,47 @@ public Subscription onSubscribe(Observer Observer) { verify(aObserver, times(1)).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, times(1)).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Throwable.class)); + verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } + @Test + public void testCountAFewItems() { + Observable observable = Observable.from("a", "b", "c", "d"); + observable.count().subscribe(w); + // we should be called only once + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(4); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testCountZeroItems() { + Observable observable = Observable.empty(); + observable.count().subscribe(w); + // we should be called only once + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(0); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testCountError() { + Observable o = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer obsv) { + obsv.onError(new RuntimeException()); + return Subscriptions.empty(); + } + }); + o.count().subscribe(w); + verify(w, never()).onNext(anyInt()); + verify(w, never()).onCompleted(); + verify(w, times(1)).onError(any(RuntimeException.class)); + } + @Test public void testReduce() { Observable observable = Observable.from(1, 2, 3, 4); From 849d7fea251dac11e4ef032f8493678fd0d628e0 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 7 Sep 2013 20:24:56 +0200 Subject: [PATCH 2/4] added sum operations --- rxjava-core/src/main/java/rx/Observable.java | 37 +++++ .../main/java/rx/operators/OperationSum.java | 142 ++++++++++++++++++ 2 files changed, 179 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationSum.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 318f0de971..440c3f6e7f 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -50,6 +50,7 @@ import rx.operators.OperationScan; import rx.operators.OperationSkip; import rx.operators.OperationSubscribeOn; +import rx.operators.OperationSum; import rx.operators.OperationSwitch; import rx.operators.OperationSynchronize; import rx.operators.OperationTake; @@ -2058,6 +2059,42 @@ public Integer call(Integer t1, T t2) { }); } + /** + * Returns an Observable that sums up the elements in the source Observable. + * @param source + * Source observable to compute the sum of. + * @return an Observable emitting the sum of all the elements of the source Observable + * as its single item. + * @see MSDN: Observable.Sum + */ + public static Observable sum(Observable source) { + return OperationSum.sum(source); + } + + /** + * @see #sum(Observable) + * @see MSDN: Observable.Sum + */ + public static Observable sumLongs(Observable source) { + return OperationSum.sumLongs(source); + } + + /** + * @see #sum(Observable) + * @see MSDN: Observable.Sum + */ + public static Observable sumFloats(Observable source) { + return OperationSum.sumFloats(source); + } + + /** + * @see #sum(Observable) + * @see MSDN: Observable.Sum + */ + public static Observable sumDoubles(Observable source) { + return OperationSum.sumDoubles(source); + } + /** * Returns a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable that will replay all of its items and notifications to any future {@link Observer}. diff --git a/rxjava-core/src/main/java/rx/operators/OperationSum.java b/rxjava-core/src/main/java/rx/operators/OperationSum.java new file mode 100644 index 0000000000..cda8944b33 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSum.java @@ -0,0 +1,142 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Mockito.*; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.util.functions.Func2; + +/** + * A few operators for implementing the sum operation. + */ +public final class OperationSum { + public static Observable sum(Observable source) { + return source.reduce(0, new Func2() { + @Override + public Integer call(Integer accu, Integer next) { + return accu + next; + } + }); + } + + public static Observable sumLongs(Observable source) { + return source.reduce(0L, new Func2() { + @Override + public Long call(Long accu, Long next) { + return accu + next; + } + }); + } + + public static Observable sumFloats(Observable source) { + return source.reduce(0.0f, new Func2() { + @Override + public Float call(Float accu, Float next) { + return accu + next; + } + }); + } + + public static Observable sumDoubles(Observable source) { + return source.reduce(0.0d, new Func2() { + @Override + public Double call(Double accu, Double next) { + return accu + next; + } + }); + } + + public static class UnitTest { + @SuppressWarnings("unchecked") + Observer w = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer wl = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer wf = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer wd = mock(Observer.class); + + @Test + public void testSumOfAFewInts() throws Throwable { + Observable src = Observable.from(1, 2, 3, 4, 5); + sum(src).subscribe(w); + + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(15); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testEmptySum() throws Throwable { + Observable src = Observable.from(); + sum(src).subscribe(w); + + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(0); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testSumOfAFewFloats() throws Throwable { + Observable src = Observable.from(1.0f); + sumFloats(src).subscribe(wf); + + verify(wf, times(1)).onNext(anyFloat()); + verify(wf).onNext(1.0f); + verify(wf, never()).onError(any(Throwable.class)); + verify(wf, times(1)).onCompleted(); + } + + @Test + public void testEmptySumFloats() throws Throwable { + Observable src = Observable.from(); + sumFloats(src).subscribe(wf); + + verify(wf, times(1)).onNext(anyFloat()); + verify(wf).onNext(0.0f); + verify(wf, never()).onError(any(Throwable.class)); + verify(wf, times(1)).onCompleted(); + } + + @Test + public void testSumOfAFewDoubles() throws Throwable { + Observable src = Observable.from(0.0d, 1.0d, 0.5d); + sumDoubles(src).subscribe(wd); + + verify(wd, times(1)).onNext(anyDouble()); + verify(wd).onNext(1.5d); + verify(wd, never()).onError(any(Throwable.class)); + verify(wd, times(1)).onCompleted(); + } + + @Test + public void testEmptySumDoubles() throws Throwable { + Observable src = Observable.from(); + sumDoubles(src).subscribe(wd); + + verify(wd, times(1)).onNext(anyDouble()); + verify(wd).onNext(0.0d); + verify(wd, never()).onError(any(Throwable.class)); + verify(wd, times(1)).onCompleted(); + } + } +} From 3ff761ef88597e979bfa9c841d3098433b7363e7 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 7 Sep 2013 20:49:52 +0200 Subject: [PATCH 3/4] added test against long sum, too (#83) --- .../main/java/rx/operators/OperationSum.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationSum.java b/rxjava-core/src/main/java/rx/operators/OperationSum.java index cda8944b33..22dda862cf 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSum.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSum.java @@ -95,6 +95,28 @@ public void testEmptySum() throws Throwable { verify(w, times(1)).onCompleted(); } + @Test + public void testSumOfAFewLongs() throws Throwable { + Observable src = Observable.from(1L, 2L, 3L, 4L, 5L); + sumLongs(src).subscribe(wl); + + verify(wl, times(1)).onNext(anyLong()); + verify(wl).onNext(15L); + verify(wl, never()).onError(any(Throwable.class)); + verify(wl, times(1)).onCompleted(); + } + + @Test + public void testEmptySumLongs() throws Throwable { + Observable src = Observable.from(); + sumLongs(src).subscribe(wl); + + verify(wl, times(1)).onNext(anyLong()); + verify(wl).onNext(0L); + verify(wl, never()).onError(any(Throwable.class)); + verify(wl, times(1)).onCompleted(); + } + @Test public void testSumOfAFewFloats() throws Throwable { Observable src = Observable.from(1.0f); From cf7f9f7a5acb57d5e96b147a4dd837f38600f313 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 7 Sep 2013 21:06:40 +0200 Subject: [PATCH 4/4] implemented average operation (#25) --- rxjava-core/src/main/java/rx/Observable.java | 38 ++++ .../java/rx/operators/OperationAverage.java | 198 ++++++++++++++++++ .../main/java/rx/operators/OperationSum.java | 1 + 3 files changed, 237 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationAverage.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 440c3f6e7f..359836ee66 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -27,6 +27,7 @@ import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; import rx.operators.OperationAll; +import rx.operators.OperationAverage; import rx.operators.OperationBuffer; import rx.operators.OperationCache; import rx.operators.OperationCombineLatest; @@ -2095,6 +2096,43 @@ public static Observable sumDoubles(Observable source) { return OperationSum.sumDoubles(source); } + /** + * Returns an Observable that computes the average of all elements in the source Observable. + * For an empty source, it causes an ArithmeticException. + * @param source + * Source observable to compute the average of. + * @return an Observable emitting the averageof all the elements of the source Observable + * as its single item. + * @see MSDN: Observable.Average + */ + public static Observable average(Observable source) { + return OperationAverage.average(source); + } + + /** + * @see #average(Observable) + * @see MSDN: Observable.Average + */ + public static Observable averageLongs(Observable source) { + return OperationAverage.averageLongs(source); + } + + /** + * @see #average(Observable) + * @see MSDN: Observable.Average + */ + public static Observable averageFloats(Observable source) { + return OperationAverage.averageFloats(source); + } + + /** + * @see #average(Observable) + * @see MSDN: Observable.Average + */ + public static Observable averageDoubles(Observable source) { + return OperationAverage.averageDoubles(source); + } + /** * Returns a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable that will replay all of its items and notifications to any future {@link Observer}. diff --git a/rxjava-core/src/main/java/rx/operators/OperationAverage.java b/rxjava-core/src/main/java/rx/operators/OperationAverage.java new file mode 100644 index 0000000000..deb9b774e5 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationAverage.java @@ -0,0 +1,198 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Mockito.*; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +/** + * A few operators for implementing the averaging operation. + * @see MSDN: Observable.Average + */ +public final class OperationAverage { + private static final class Tuple2 { + private final T current; + private final Integer count; + + private Tuple2(T v1, Integer v2) { + current = v1; + count = v2; + } + } + + public static Observable average(Observable source) { + return source.reduce(new Tuple2(0, 0), new Func2, Integer, Tuple2>() { + @Override + public Tuple2 call(Tuple2 accu, Integer next) { + return new Tuple2(accu.current + next, accu.count + 1); + } + }).map(new Func1, Integer>() { + @Override + public Integer call(Tuple2 result) { + return result.current / result.count; // may throw DivisionByZero, this should be correct... + } + }); + } + + public static Observable averageLongs(Observable source) { + return source.reduce(new Tuple2(0L, 0), new Func2, Long, Tuple2>() { + @Override + public Tuple2 call(Tuple2 accu, Long next) { + return new Tuple2(accu.current + next, accu.count + 1); + } + }).map(new Func1, Long>() { + @Override + public Long call(Tuple2 result) { + return result.current / result.count; // may throw DivisionByZero, this should be correct... + } + }); + } + + public static Observable averageFloats(Observable source) { + return source.reduce(new Tuple2(0.0f, 0), new Func2, Float, Tuple2>() { + @Override + public Tuple2 call(Tuple2 accu, Float next) { + return new Tuple2(accu.current + next, accu.count + 1); + } + }).map(new Func1, Float>() { + @Override + public Float call(Tuple2 result) { + if (result.count == 0) { + throw new ArithmeticException("divide by zero"); + } + return result.current / result.count; + } + }); + } + + public static Observable averageDoubles(Observable source) { + return source.reduce(new Tuple2(0.0d, 0), new Func2, Double, Tuple2>() { + @Override + public Tuple2 call(Tuple2 accu, Double next) { + return new Tuple2(accu.current + next, accu.count + 1); + } + }).map(new Func1, Double>() { + @Override + public Double call(Tuple2 result) { + if (result.count == 0) { + throw new ArithmeticException("divide by zero"); + } + return result.current / result.count; + } + }); + } + + public static class UnitTest { + @SuppressWarnings("unchecked") + Observer w = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer wl = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer wf = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer wd = mock(Observer.class); + + @Test + public void testAverageOfAFewInts() throws Throwable { + Observable src = Observable.from(1, 2, 3, 4, 6); + average(src).subscribe(w); + + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(3); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testEmptyAverage() throws Throwable { + Observable src = Observable.from(); + average(src).subscribe(w); + + verify(w, never()).onNext(anyInt()); + verify(w, times(1)).onError(any(ArithmeticException.class)); + verify(w, never()).onCompleted(); + } + + @Test + public void testAverageOfAFewLongs() throws Throwable { + Observable src = Observable.from(1L, 2L, 3L, 4L, 6L); + averageLongs(src).subscribe(wl); + + verify(wl, times(1)).onNext(anyLong()); + verify(wl).onNext(3L); + verify(wl, never()).onError(any(Throwable.class)); + verify(wl, times(1)).onCompleted(); + } + + @Test + public void testEmptyAverageLongs() throws Throwable { + Observable src = Observable.from(); + averageLongs(src).subscribe(wl); + + verify(wl, never()).onNext(anyLong()); + verify(wl, times(1)).onError(any(ArithmeticException.class)); + verify(wl, never()).onCompleted(); + } + + @Test + public void testAverageOfAFewFloats() throws Throwable { + Observable src = Observable.from(1.0f, 2.0f); + averageFloats(src).subscribe(wf); + + verify(wf, times(1)).onNext(anyFloat()); + verify(wf).onNext(1.5f); + verify(wf, never()).onError(any(Throwable.class)); + verify(wf, times(1)).onCompleted(); + } + + @Test + public void testEmptyAverageFloats() throws Throwable { + Observable src = Observable.from(); + averageFloats(src).subscribe(wf); + + verify(wf, never()).onNext(anyFloat()); + verify(wf, times(1)).onError(any(ArithmeticException.class)); + verify(wf, never()).onCompleted(); + } + + @Test + public void testAverageOfAFewDoubles() throws Throwable { + Observable src = Observable.from(1.0d, 2.0d); + averageDoubles(src).subscribe(wd); + + verify(wd, times(1)).onNext(anyDouble()); + verify(wd).onNext(1.5d); + verify(wd, never()).onError(any(Throwable.class)); + verify(wd, times(1)).onCompleted(); + } + + @Test + public void testEmptyAverageDoubles() throws Throwable { + Observable src = Observable.from(); + averageDoubles(src).subscribe(wd); + + verify(wd, never()).onNext(anyDouble()); + verify(wd, times(1)).onError(any(ArithmeticException.class)); + verify(wd, never()).onCompleted(); + } + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationSum.java b/rxjava-core/src/main/java/rx/operators/OperationSum.java index 22dda862cf..97abf8f618 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSum.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSum.java @@ -25,6 +25,7 @@ /** * A few operators for implementing the sum operation. + * @see MSDN: Observable.Sum */ public final class OperationSum { public static Observable sum(Observable source) {