'
+}
+
+jar {
+ manifest {
+ name = 'rxjava-math'
+ instruction 'Bundle-Vendor', 'Netflix'
+ instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
+ instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
+ instruction 'Fragment-Host', 'com.netflix.rxjava.core'
+ }
+}
diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationAverage.java b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationAverage.java
new file mode 100644
index 0000000000..25d6bc7376
--- /dev/null
+++ b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationAverage.java
@@ -0,0 +1,348 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.math.operators;
+
+import rx.Observable;
+import rx.Observable.OnSubscribeFunc;
+import rx.Observer;
+import rx.Subscription;
+import rx.functions.Func1;
+import rx.functions.Func2;
+
+/**
+ * A few operators for implementing the averaging operation.
+ *
+ * @see MSDN: Observable.Average
+ */
+public final class OperationAverage {
+ private static final class Tuple2 {
+ private final T current;
+ private final Integer count;
+
+ private Tuple2(T v1, Integer v2) {
+ current = v1;
+ count = v2;
+ }
+ }
+
+ public static Observable average(Observable source) {
+ return source.reduce(new Tuple2(0, 0), new Func2, Integer, Tuple2>() {
+ @Override
+ public Tuple2 call(Tuple2 accu, Integer next) {
+ return new Tuple2(accu.current + next, accu.count + 1);
+ }
+ }).map(new Func1, Integer>() {
+ @Override
+ public Integer call(Tuple2 result) {
+ if (result.count == 0) {
+ throw new IllegalArgumentException("Sequence contains no elements");
+ }
+ return result.current / result.count;
+ }
+ });
+ }
+
+ public static Observable averageLongs(Observable source) {
+ return source.reduce(new Tuple2(0L, 0), new Func2, Long, Tuple2>() {
+ @Override
+ public Tuple2 call(Tuple2 accu, Long next) {
+ return new Tuple2(accu.current + next, accu.count + 1);
+ }
+ }).map(new Func1, Long>() {
+ @Override
+ public Long call(Tuple2 result) {
+ if (result.count == 0) {
+ throw new IllegalArgumentException("Sequence contains no elements");
+ }
+ return result.current / result.count;
+ }
+ });
+ }
+
+ public static Observable averageFloats(Observable source) {
+ return source.reduce(new Tuple2(0.0f, 0), new Func2, Float, Tuple2>() {
+ @Override
+ public Tuple2 call(Tuple2 accu, Float next) {
+ return new Tuple2(accu.current + next, accu.count + 1);
+ }
+ }).map(new Func1, Float>() {
+ @Override
+ public Float call(Tuple2 result) {
+ if (result.count == 0) {
+ throw new IllegalArgumentException("Sequence contains no elements");
+ }
+ return result.current / result.count;
+ }
+ });
+ }
+
+ public static Observable averageDoubles(Observable source) {
+ return source.reduce(new Tuple2(0.0d, 0), new Func2, Double, Tuple2>() {
+ @Override
+ public Tuple2 call(Tuple2 accu, Double next) {
+ return new Tuple2(accu.current + next, accu.count + 1);
+ }
+ }).map(new Func1, Double>() {
+ @Override
+ public Double call(Tuple2 result) {
+ if (result.count == 0) {
+ throw new IllegalArgumentException("Sequence contains no elements");
+ }
+ return result.current / result.count;
+ }
+ });
+ }
+
+ /**
+ * Compute the average by extracting integer values from the source via an
+ * extractor function.
+ *
+ * @param
+ * the source value type
+ */
+ public static final class AverageIntegerExtractor implements OnSubscribeFunc {
+ final Observable extends T> source;
+ final Func1 super T, Integer> valueExtractor;
+
+ public AverageIntegerExtractor(Observable extends T> source, Func1 super T, Integer> valueExtractor) {
+ this.source = source;
+ this.valueExtractor = valueExtractor;
+ }
+
+ @Override
+ public Subscription onSubscribe(Observer super Integer> t1) {
+ return source.subscribe(new AverageObserver(t1));
+ }
+
+ /** Computes the average. */
+ private final class AverageObserver implements Observer {
+ final Observer super Integer> observer;
+ int sum;
+ int count;
+
+ public AverageObserver(Observer super Integer> observer) {
+ this.observer = observer;
+ }
+
+ @Override
+ public void onNext(T args) {
+ sum += valueExtractor.call(args);
+ count++;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ observer.onError(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ if (count > 0) {
+ try {
+ observer.onNext(sum / count);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return;
+ }
+ observer.onCompleted();
+ } else {
+ observer.onError(new IllegalArgumentException("Sequence contains no elements"));
+ }
+ }
+
+ }
+ }
+
+ /**
+ * Compute the average by extracting long values from the source via an
+ * extractor function.
+ *
+ * @param
+ * the source value type
+ */
+ public static final class AverageLongExtractor implements OnSubscribeFunc {
+ final Observable extends T> source;
+ final Func1 super T, Long> valueExtractor;
+
+ public AverageLongExtractor(Observable extends T> source, Func1 super T, Long> valueExtractor) {
+ this.source = source;
+ this.valueExtractor = valueExtractor;
+ }
+
+ @Override
+ public Subscription onSubscribe(Observer super Long> t1) {
+ return source.subscribe(new AverageObserver(t1));
+ }
+
+ /** Computes the average. */
+ private final class AverageObserver implements Observer {
+ final Observer super Long> observer;
+ long sum;
+ int count;
+
+ public AverageObserver(Observer super Long> observer) {
+ this.observer = observer;
+ }
+
+ @Override
+ public void onNext(T args) {
+ sum += valueExtractor.call(args);
+ count++;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ observer.onError(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ if (count > 0) {
+ try {
+ observer.onNext(sum / count);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return;
+ }
+ observer.onCompleted();
+ } else {
+ observer.onError(new IllegalArgumentException("Sequence contains no elements"));
+ }
+ }
+
+ }
+ }
+
+ /**
+ * Compute the average by extracting float values from the source via an
+ * extractor function.
+ *
+ * @param
+ * the source value type
+ */
+ public static final class AverageFloatExtractor implements OnSubscribeFunc {
+ final Observable extends T> source;
+ final Func1 super T, Float> valueExtractor;
+
+ public AverageFloatExtractor(Observable extends T> source, Func1 super T, Float> valueExtractor) {
+ this.source = source;
+ this.valueExtractor = valueExtractor;
+ }
+
+ @Override
+ public Subscription onSubscribe(Observer super Float> t1) {
+ return source.subscribe(new AverageObserver(t1));
+ }
+
+ /** Computes the average. */
+ private final class AverageObserver implements Observer {
+ final Observer super Float> observer;
+ float sum;
+ int count;
+
+ public AverageObserver(Observer super Float> observer) {
+ this.observer = observer;
+ }
+
+ @Override
+ public void onNext(T args) {
+ sum += valueExtractor.call(args);
+ count++;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ observer.onError(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ if (count > 0) {
+ try {
+ observer.onNext(sum / count);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return;
+ }
+ observer.onCompleted();
+ } else {
+ observer.onError(new IllegalArgumentException("Sequence contains no elements"));
+ }
+ }
+
+ }
+ }
+
+ /**
+ * Compute the average by extracting double values from the source via an
+ * extractor function.
+ *
+ * @param
+ * the source value type
+ */
+ public static final class AverageDoubleExtractor implements OnSubscribeFunc {
+ final Observable extends T> source;
+ final Func1 super T, Double> valueExtractor;
+
+ public AverageDoubleExtractor(Observable extends T> source, Func1 super T, Double> valueExtractor) {
+ this.source = source;
+ this.valueExtractor = valueExtractor;
+ }
+
+ @Override
+ public Subscription onSubscribe(Observer super Double> t1) {
+ return source.subscribe(new AverageObserver(t1));
+ }
+
+ /** Computes the average. */
+ private final class AverageObserver implements Observer {
+ final Observer super Double> observer;
+ double sum;
+ int count;
+
+ public AverageObserver(Observer super Double> observer) {
+ this.observer = observer;
+ }
+
+ @Override
+ public void onNext(T args) {
+ sum += valueExtractor.call(args);
+ count++;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ observer.onError(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ if (count > 0) {
+ try {
+ observer.onNext(sum / count);
+ } catch (Throwable t) {
+ observer.onError(t);
+ return;
+ }
+ observer.onCompleted();
+ } else {
+ observer.onError(new IllegalArgumentException("Sequence contains no elements"));
+ }
+ }
+
+ }
+ }
+}
diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationMinMax.java b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationMinMax.java
new file mode 100644
index 0000000000..cfa92d4fce
--- /dev/null
+++ b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationMinMax.java
@@ -0,0 +1,147 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.math.operators;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import rx.Observable;
+import rx.functions.Func1;
+import rx.functions.Func2;
+
+/**
+ * Returns the minimum element in an observable sequence.
+ */
+public class OperationMinMax {
+
+ public static > Observable min(
+ Observable source) {
+ return minMax(source, -1L);
+ }
+
+ public static Observable min(Observable source,
+ final Comparator super T> comparator) {
+ return minMax(source, comparator, -1L);
+ }
+
+ public static > Observable> minBy(
+ Observable source, final Func1 selector) {
+ return minMaxBy(source, selector, -1L);
+ }
+
+ public static Observable> minBy(Observable source,
+ final Func1 selector, final Comparator super R> comparator) {
+ return minMaxBy(source, selector, comparator, -1L);
+ }
+
+ public static > Observable max(
+ Observable source) {
+ return minMax(source, 1L);
+ }
+
+ public static Observable max(Observable source,
+ final Comparator super T> comparator) {
+ return minMax(source, comparator, 1L);
+ }
+
+ public static > Observable> maxBy(
+ Observable source, final Func1 selector) {
+ return minMaxBy(source, selector, 1L);
+ }
+
+ public static Observable> maxBy(Observable source,
+ final Func1 selector, final Comparator super R> comparator) {
+ return minMaxBy(source, selector, comparator, 1L);
+ }
+
+ private static > Observable minMax(
+ Observable source, final long flag) {
+ return source.reduce(new Func2() {
+ @Override
+ public T call(T acc, T value) {
+ if (flag * acc.compareTo(value) > 0) {
+ return acc;
+ }
+ return value;
+ }
+ });
+ }
+
+ private static Observable minMax(Observable source,
+ final Comparator super T> comparator, final long flag) {
+ return source.reduce(new Func2() {
+ @Override
+ public T call(T acc, T value) {
+ if (flag * comparator.compare(acc, value) > 0) {
+ return acc;
+ }
+ return value;
+ }
+ });
+ }
+
+ private static > Observable> minMaxBy(
+ Observable source, final Func1 selector, final long flag) {
+ return source.reduce(new ArrayList(),
+ new Func2, T, List>() {
+
+ @Override
+ public List call(List acc, T value) {
+ if (acc.isEmpty()) {
+ acc.add(value);
+ } else {
+ int compareResult = selector.call(acc.get(0))
+ .compareTo(selector.call(value));
+ if (compareResult == 0) {
+ acc.add(value);
+ } else if (flag * compareResult < 0) {
+ acc.clear();
+ acc.add(value);
+ }
+ }
+ return acc;
+ }
+ });
+ }
+
+ private static Observable> minMaxBy(Observable source,
+ final Func1 selector, final Comparator super R> comparator,
+ final long flag) {
+ return source.reduce(new ArrayList(),
+ new Func2, T, List>() {
+
+ @Override
+ public List call(List acc, T value) {
+ if (acc.isEmpty()) {
+ acc.add(value);
+ } else {
+ int compareResult = comparator.compare(
+ selector.call(acc.get(0)),
+ selector.call(value));
+ if (compareResult == 0) {
+ acc.add(value);
+ } else if (flag * compareResult < 0) {
+ acc.clear();
+ acc.add(value);
+ }
+ }
+ return acc;
+ }
+ });
+ }
+
+}
diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationSum.java b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationSum.java
new file mode 100644
index 0000000000..3d519e4220
--- /dev/null
+++ b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationSum.java
@@ -0,0 +1,89 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.math.operators;
+
+import rx.Observable;
+import rx.functions.Func2;
+
+/**
+ * A few operators for implementing the sum operation.
+ *
+ * @see MSDN:
+ * Observable.Sum
+ */
+public final class OperationSum {
+
+ public static Observable sumIntegers(Observable source) {
+ return source.reduce(0, ACCUM_INT);
+ }
+
+ public static Observable sumLongs(Observable source) {
+ return source.reduce(0l, ACCUM_LONG);
+ }
+
+ public static Observable sumFloats(Observable source) {
+ return source.reduce(0.0f, ACCUM_FLOAT);
+ }
+
+ public static Observable sumDoubles(Observable source) {
+ return source.reduce(0.0d, ACCUM_DOUBLE);
+ }
+
+ public static Observable sumAtLeastOneIntegers(Observable source) {
+ return source.reduce(ACCUM_INT);
+ }
+
+ public static Observable sumAtLeastOneLongs(Observable source) {
+ return source.reduce(ACCUM_LONG);
+ }
+
+ public static Observable sumAtLeastOneFloats(Observable source) {
+ return source.reduce(ACCUM_FLOAT);
+ }
+
+ public static Observable sumAtLeastOneDoubles(Observable source) {
+ return source.reduce(ACCUM_DOUBLE);
+ }
+
+ private static final Func2 ACCUM_INT = new Func2() {
+ @Override
+ public Integer call(Integer accu, Integer next) {
+ return accu + next;
+ }
+ };
+
+ private static final Func2 ACCUM_LONG = new Func2() {
+ @Override
+ public Long call(Long accu, Long next) {
+ return accu + next;
+ }
+ };
+
+ private static final Func2 ACCUM_FLOAT = new Func2() {
+ @Override
+ public Float call(Float accu, Float next) {
+ return accu + next;
+ }
+ };
+
+ private static final Func2 ACCUM_DOUBLE = new Func2() {
+ @Override
+ public Double call(Double accu, Double next) {
+ return accu + next;
+ }
+ };
+}
diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/observables/MathObservable.java b/rxjava-contrib/rxjava-math/src/main/java/rx/observables/MathObservable.java
new file mode 100644
index 0000000000..2614983bf8
--- /dev/null
+++ b/rxjava-contrib/rxjava-math/src/main/java/rx/observables/MathObservable.java
@@ -0,0 +1,378 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.observables;
+
+import java.util.Comparator;
+
+import rx.Observable;
+import rx.functions.Func1;
+import rx.math.operators.OperationAverage;
+import rx.math.operators.OperationMinMax;
+import rx.math.operators.OperationSum;
+
+public class MathObservable {
+
+ private final Observable o;
+
+ private MathObservable(Observable o) {
+ this.o = o;
+ }
+
+ public static MathObservable from(Observable o) {
+ return new MathObservable(o);
+ }
+
+ /**
+ * Returns an Observable that emits the average of the Doubles emitted by the source Observable.
+ *
+ *
+ *
+ * @param source
+ * source Observable to compute the average of
+ * @return an Observable that emits a single item: the average of all the Doubles emitted by the source
+ * Observable
+ * @see RxJava Wiki: averageDouble()
+ * @see MSDN: Observable.Average
+ */
+ public final static Observable averageDouble(Observable source) {
+ return OperationAverage.averageDoubles(source);
+ }
+
+ /**
+ * Returns an Observable that emits the average of the Floats emitted by the source Observable.
+ *
+ *
+ *
+ * @param source
+ * source Observable to compute the average of
+ * @return an Observable that emits a single item: the average of all the Floats emitted by the source
+ * Observable
+ * @see RxJava Wiki: averageFloat()
+ * @see MSDN: Observable.Average
+ */
+ public final static Observable averageFloat(Observable source) {
+ return OperationAverage.averageFloats(source);
+ }
+
+ /**
+ * Returns an Observable that emits the average of the Integers emitted by the source Observable.
+ *
+ *
+ *
+ * @param source
+ * source Observable to compute the average of
+ * @return an Observable that emits a single item: the average of all the Integers emitted by the source
+ * Observable
+ * @throws IllegalArgumentException
+ * if the source Observable emits no items
+ * @see RxJava Wiki: averageInteger()
+ * @see MSDN: Observable.Average
+ */
+ public final static Observable averageInteger(Observable source) {
+ return OperationAverage.average(source);
+ }
+
+ /**
+ * Returns an Observable that emits the average of the Longs emitted by the source Observable.
+ *
+ *
+ *
+ * @param source
+ * source Observable to compute the average of
+ * @return an Observable that emits a single item: the average of all the Longs emitted by the source
+ * Observable
+ * @see RxJava Wiki: averageLong()
+ * @see MSDN: Observable.Average
+ */
+ public final static Observable averageLong(Observable source) {
+ return OperationAverage.averageLongs(source);
+ }
+
+ /**
+ * Returns an Observable that emits the single item emitted by the source Observable with the maximum
+ * numeric value. If there is more than one item with the same maximum value, it emits the last-emitted of
+ * these.
+ *
+ *
+ *
+ * @param source
+ * an Observable to scan for the maximum emitted item
+ * @return an Observable that emits this maximum item
+ * @throws IllegalArgumentException
+ * if the source is empty
+ * @see RxJava Wiki: max()
+ * @see MSDN: Observable.Max
+ */
+ public final static > Observable max(Observable source) {
+ return OperationMinMax.max(source);
+ }
+
+ /**
+ * Returns an Observable that emits the single numerically minimum item emitted by the source Observable.
+ * If there is more than one such item, it returns the last-emitted one.
+ *
+ *
+ *
+ * @param source
+ * an Observable to determine the minimum item of
+ * @return an Observable that emits the minimum item emitted by the source Observable
+ * @throws IllegalArgumentException
+ * if the source is empty
+ * @see MSDN: Observable.Min
+ */
+ public final static > Observable min(Observable source) {
+ return OperationMinMax.min(source);
+ }
+
+ /**
+ * Returns an Observable that emits the sum of all the Doubles emitted by the source Observable.
+ *
+ *
+ *
+ * @param source
+ * the source Observable to compute the sum of
+ * @return an Observable that emits a single item: the sum of all the Doubles emitted by the source
+ * Observable
+ * @see RxJava Wiki: sumDouble()
+ * @see MSDN: Observable.Sum
+ */
+ public final static Observable sumDouble(Observable source) {
+ return OperationSum.sumDoubles(source);
+ }
+
+ /**
+ * Returns an Observable that emits the sum of all the Floats emitted by the source Observable.
+ *
+ *
+ *
+ * @param source
+ * the source Observable to compute the sum of
+ * @return an Observable that emits a single item: the sum of all the Floats emitted by the source
+ * Observable
+ * @see RxJava Wiki: sumFloat()
+ * @see MSDN: Observable.Sum
+ */
+ public final static Observable sumFloat(Observable source) {
+ return OperationSum.sumFloats(source);
+ }
+
+ /**
+ * Returns an Observable that emits the sum of all the Integers emitted by the source Observable.
+ *
+ *
+ *
+ * @param source
+ * source Observable to compute the sum of
+ * @return an Observable that emits a single item: the sum of all the Integers emitted by the source
+ * Observable
+ * @see RxJava Wiki: sumInteger()
+ * @see MSDN: Observable.Sum
+ */
+ public final static Observable sumInteger(Observable source) {
+ return OperationSum.sumIntegers(source);
+ }
+
+ /**
+ * Returns an Observable that emits the sum of all the Longs emitted by the source Observable.
+ *
+ *
+ *
+ * @param source
+ * source Observable to compute the sum of
+ * @return an Observable that emits a single item: the sum of all the Longs emitted by the
+ * source Observable
+ * @see RxJava Wiki: sumLong()
+ * @see MSDN: Observable.Sum
+ */
+ public final static Observable sumLong(Observable source) {
+ return OperationSum.sumLongs(source);
+ }
+
+ /**
+ * Returns an Observable that transforms items emitted by the source Observable into Doubles by using a
+ * function you provide and then emits the Double average of the complete sequence of transformed values.
+ *
+ *
+ *
+ * @param valueExtractor
+ * the function to transform an item emitted by the source Observable into a Double
+ * @return an Observable that emits a single item: the Double average of the complete sequence of items
+ * emitted by the source Observable when transformed into Doubles by the specified function
+ * @see RxJava Wiki: averageDouble()
+ * @see MSDN: Observable.Average
+ */
+ public final Observable averageDouble(Func1 super T, Double> valueExtractor) {
+ return Observable.create(new OperationAverage.AverageDoubleExtractor(o, valueExtractor));
+ }
+
+ /**
+ * Returns an Observable that transforms items emitted by the source Observable into Floats by using a
+ * function you provide and then emits the Float average of the complete sequence of transformed values.
+ *
+ *
+ *
+ * @param valueExtractor
+ * the function to transform an item emitted by the source Observable into a Float
+ * @return an Observable that emits a single item: the Float average of the complete sequence of items
+ * emitted by the source Observable when transformed into Floats by the specified function
+ * @see RxJava Wiki: averageFloat()
+ * @see MSDN: Observable.Average
+ */
+ public final Observable averageFloat(Func1 super T, Float> valueExtractor) {
+ return Observable.create(new OperationAverage.AverageFloatExtractor(o, valueExtractor));
+ }
+
+ /**
+ * Returns an Observable that transforms items emitted by the source Observable into Integers by using a
+ * function you provide and then emits the Integer average of the complete sequence of transformed values.
+ *
+ *
+ *
+ * @param valueExtractor
+ * the function to transform an item emitted by the source Observable into an Integer
+ * @return an Observable that emits a single item: the Integer average of the complete sequence of items
+ * emitted by the source Observable when transformed into Integers by the specified function
+ * @see RxJava Wiki: averageInteger()
+ * @see MSDN: Observable.Average
+ */
+ public final Observable averageInteger(Func1 super T, Integer> valueExtractor) {
+ return Observable.create(new OperationAverage.AverageIntegerExtractor(o, valueExtractor));
+ }
+
+ /**
+ * Returns an Observable that transforms items emitted by the source Observable into Longs by using a
+ * function you provide and then emits the Long average of the complete sequence of transformed values.
+ *
+ *
+ *
+ * @param valueExtractor
+ * the function to transform an item emitted by the source Observable into a Long
+ * @return an Observable that emits a single item: the Long average of the complete sequence of items
+ * emitted by the source Observable when transformed into Longs by the specified function
+ * @see RxJava Wiki: averageLong()
+ * @see MSDN: Observable.Average
+ */
+ public final Observable averageLong(Func1 super T, Long> valueExtractor) {
+ return Observable.create(new OperationAverage.AverageLongExtractor(o, valueExtractor));
+ }
+
+ /**
+ * Returns an Observable that emits the maximum item emitted by the source Observable, according to the
+ * specified comparator. If there is more than one item with the same maximum value, it emits the
+ * last-emitted of these.
+ *
+ *
+ *
+ * @param comparator
+ * the comparer used to compare items
+ * @return an Observable that emits the maximum item emitted by the source Observable, according to the
+ * specified comparator
+ * @throws IllegalArgumentException
+ * if the source is empty
+ * @see RxJava Wiki: max()
+ * @see MSDN: Observable.Max
+ */
+ public final Observable max(Comparator super T> comparator) {
+ return OperationMinMax.max(o, comparator);
+ }
+
+ /**
+ * Returns an Observable that emits the minimum item emitted by the source Observable, according to a
+ * specified comparator. If there is more than one such item, it returns the last-emitted one.
+ *
+ *
+ *
+ * @param comparator
+ * the comparer used to compare elements
+ * @return an Observable that emits the minimum item emitted by the source Observable according to the
+ * specified comparator
+ * @throws IllegalArgumentException
+ * if the source is empty
+ * @see RxJava Wiki: min()
+ * @see MSDN: Observable.Min
+ */
+ public final Observable min(Comparator super T> comparator) {
+ return OperationMinMax.min(o, comparator);
+ }
+
+ /**
+ * Returns an Observable that extracts a Double from each of the items emitted by the source Observable via
+ * a function you specify, and then emits the sum of these Doubles.
+ *
+ *
+ *
+ * @param valueExtractor
+ * the function to extract a Double from each item emitted by the source Observable
+ * @return an Observable that emits the Double sum of the Double values corresponding to the items emitted
+ * by the source Observable as transformed by the provided function
+ * @see RxJava Wiki: sumDouble()
+ * @see MSDN: Observable.Sum
+ */
+ public final Observable sumDouble(Func1 super T, Double> valueExtractor) {
+ return OperationSum.sumAtLeastOneDoubles(o.map(valueExtractor));
+ }
+
+ /**
+ * Returns an Observable that extracts a Float from each of the items emitted by the source Observable via
+ * a function you specify, and then emits the sum of these Floats.
+ *
+ *
+ *
+ * @param valueExtractor
+ * the function to extract a Float from each item emitted by the source Observable
+ * @return an Observable that emits the Float sum of the Float values corresponding to the items emitted by
+ * the source Observable as transformed by the provided function
+ * @see RxJava Wiki: sumFloat()
+ * @see MSDN: Observable.Sum
+ */
+ public final Observable sumFloat(Func1 super T, Float> valueExtractor) {
+ return OperationSum.sumAtLeastOneFloats(o.map(valueExtractor));
+ }
+
+ /**
+ * Returns an Observable that extracts an Integer from each of the items emitted by the source Observable
+ * via a function you specify, and then emits the sum of these Integers.
+ *
+ *
+ *
+ * @param valueExtractor
+ * the function to extract an Integer from each item emitted by the source Observable
+ * @return an Observable that emits the Integer sum of the Integer values corresponding to the items emitted
+ * by the source Observable as transformed by the provided function
+ * @see RxJava Wiki: sumInteger()
+ * @see MSDN: Observable.Sum
+ */
+ public final Observable sumInteger(Func1 super T, Integer> valueExtractor) {
+ return OperationSum.sumAtLeastOneIntegers(o.map(valueExtractor));
+ }
+
+ /**
+ * Returns an Observable that extracts a Long from each of the items emitted by the source Observable via a
+ * function you specify, and then emits the sum of these Longs.
+ *
+ *
+ *
+ * @param valueExtractor
+ * the function to extract a Long from each item emitted by the source Observable
+ * @return an Observable that emits the Long sum of the Long values corresponding to the items emitted by
+ * the source Observable as transformed by the provided function
+ * @see RxJava Wiki: sumLong()
+ * @see MSDN: Observable.Sum
+ */
+ public final Observable sumLong(Func1 super T, Long> valueExtractor) {
+ return OperationSum.sumAtLeastOneLongs(o.map(valueExtractor));
+ }
+}
diff --git a/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationAverageTest.java b/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationAverageTest.java
new file mode 100644
index 0000000000..a77868dc7a
--- /dev/null
+++ b/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationAverageTest.java
@@ -0,0 +1,342 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.math.operators;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+import static rx.math.operators.OperationAverage.*;
+
+import org.junit.Test;
+
+import rx.Observable;
+import rx.Observer;
+import rx.functions.Func1;
+import rx.observables.MathObservable;
+
+public class OperationAverageTest {
+
+ @SuppressWarnings("unchecked")
+ Observer w = mock(Observer.class);
+ @SuppressWarnings("unchecked")
+ Observer wl = mock(Observer.class);
+ @SuppressWarnings("unchecked")
+ Observer wf = mock(Observer.class);
+ @SuppressWarnings("unchecked")
+ Observer wd = mock(Observer.class);
+
+ @Test
+ public void testAverageOfAFewInts() throws Throwable {
+ Observable src = Observable.from(1, 2, 3, 4, 6);
+ average(src).subscribe(w);
+
+ verify(w, times(1)).onNext(anyInt());
+ verify(w).onNext(3);
+ verify(w, never()).onError(any(Throwable.class));
+ verify(w, times(1)).onCompleted();
+ }
+
+ @Test
+ public void testEmptyAverage() throws Throwable {
+ Observable src = Observable.empty();
+ average(src).subscribe(w);
+
+ verify(w, never()).onNext(anyInt());
+ verify(w, times(1)).onError(isA(IllegalArgumentException.class));
+ verify(w, never()).onCompleted();
+ }
+
+ @Test
+ public void testAverageOfAFewLongs() throws Throwable {
+ Observable src = Observable.from(1L, 2L, 3L, 4L, 6L);
+ averageLongs(src).subscribe(wl);
+
+ verify(wl, times(1)).onNext(anyLong());
+ verify(wl).onNext(3L);
+ verify(wl, never()).onError(any(Throwable.class));
+ verify(wl, times(1)).onCompleted();
+ }
+
+ @Test
+ public void testEmptyAverageLongs() throws Throwable {
+ Observable src = Observable.empty();
+ averageLongs(src).subscribe(wl);
+
+ verify(wl, never()).onNext(anyLong());
+ verify(wl, times(1)).onError(isA(IllegalArgumentException.class));
+ verify(wl, never()).onCompleted();
+ }
+
+ @Test
+ public void testAverageOfAFewFloats() throws Throwable {
+ Observable src = Observable.from(1.0f, 2.0f);
+ averageFloats(src).subscribe(wf);
+
+ verify(wf, times(1)).onNext(anyFloat());
+ verify(wf).onNext(1.5f);
+ verify(wf, never()).onError(any(Throwable.class));
+ verify(wf, times(1)).onCompleted();
+ }
+
+ @Test
+ public void testEmptyAverageFloats() throws Throwable {
+ Observable src = Observable.empty();
+ averageFloats(src).subscribe(wf);
+
+ verify(wf, never()).onNext(anyFloat());
+ verify(wf, times(1)).onError(isA(IllegalArgumentException.class));
+ verify(wf, never()).onCompleted();
+ }
+
+ @Test
+ public void testAverageOfAFewDoubles() throws Throwable {
+ Observable src = Observable.from(1.0d, 2.0d);
+ averageDoubles(src).subscribe(wd);
+
+ verify(wd, times(1)).onNext(anyDouble());
+ verify(wd).onNext(1.5d);
+ verify(wd, never()).onError(any(Throwable.class));
+ verify(wd, times(1)).onCompleted();
+ }
+
+ @Test
+ public void testEmptyAverageDoubles() throws Throwable {
+ Observable src = Observable.empty();
+ averageDoubles(src).subscribe(wd);
+
+ verify(wd, never()).onNext(anyDouble());
+ verify(wd, times(1)).onError(isA(IllegalArgumentException.class));
+ verify(wd, never()).onCompleted();
+ }
+
+ void testThrows(Observer