diff --git a/rxjava-contrib/rxjava-math/build.gradle b/rxjava-contrib/rxjava-math/build.gradle new file mode 100644 index 0000000000..cbc8cca804 --- /dev/null +++ b/rxjava-contrib/rxjava-math/build.gradle @@ -0,0 +1,31 @@ +apply plugin: 'osgi' + +sourceCompatibility = JavaVersion.VERSION_1_6 +targetCompatibility = JavaVersion.VERSION_1_6 + +dependencies { + compile project(':rxjava-core') + testCompile project(":rxjava-core").sourceSets.test.output + provided 'junit:junit-dep:4.10' + provided 'org.mockito:mockito-core:1.8.5' +} + +javadoc { + options { + doclet = "org.benjchristensen.doclet.DocletExclude" + docletpath = [rootProject.file('./gradle/doclet-exclude.jar')] + stylesheetFile = rootProject.file('./gradle/javadocStyleSheet.css') + windowTitle = "RxJava Javadoc ${project.version}" + } + options.addStringOption('top').value = '

RxJava

' +} + +jar { + manifest { + name = 'rxjava-math' + instruction 'Bundle-Vendor', 'Netflix' + instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava' + instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*' + instruction 'Fragment-Host', 'com.netflix.rxjava.core' + } +} diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationAverage.java b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationAverage.java new file mode 100644 index 0000000000..25d6bc7376 --- /dev/null +++ b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationAverage.java @@ -0,0 +1,348 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.math.operators; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.functions.Func1; +import rx.functions.Func2; + +/** + * A few operators for implementing the averaging operation. + * + * @see MSDN: Observable.Average + */ +public final class OperationAverage { + private static final class Tuple2 { + private final T current; + private final Integer count; + + private Tuple2(T v1, Integer v2) { + current = v1; + count = v2; + } + } + + public static Observable average(Observable source) { + return source.reduce(new Tuple2(0, 0), new Func2, Integer, Tuple2>() { + @Override + public Tuple2 call(Tuple2 accu, Integer next) { + return new Tuple2(accu.current + next, accu.count + 1); + } + }).map(new Func1, Integer>() { + @Override + public Integer call(Tuple2 result) { + if (result.count == 0) { + throw new IllegalArgumentException("Sequence contains no elements"); + } + return result.current / result.count; + } + }); + } + + public static Observable averageLongs(Observable source) { + return source.reduce(new Tuple2(0L, 0), new Func2, Long, Tuple2>() { + @Override + public Tuple2 call(Tuple2 accu, Long next) { + return new Tuple2(accu.current + next, accu.count + 1); + } + }).map(new Func1, Long>() { + @Override + public Long call(Tuple2 result) { + if (result.count == 0) { + throw new IllegalArgumentException("Sequence contains no elements"); + } + return result.current / result.count; + } + }); + } + + public static Observable averageFloats(Observable source) { + return source.reduce(new Tuple2(0.0f, 0), new Func2, Float, Tuple2>() { + @Override + public Tuple2 call(Tuple2 accu, Float next) { + return new Tuple2(accu.current + next, accu.count + 1); + } + }).map(new Func1, Float>() { + @Override + public Float call(Tuple2 result) { + if (result.count == 0) { + throw new IllegalArgumentException("Sequence contains no elements"); + } + return result.current / result.count; + } + }); + } + + public static Observable averageDoubles(Observable source) { + return source.reduce(new Tuple2(0.0d, 0), new Func2, Double, Tuple2>() { + @Override + public Tuple2 call(Tuple2 accu, Double next) { + return new Tuple2(accu.current + next, accu.count + 1); + } + }).map(new Func1, Double>() { + @Override + public Double call(Tuple2 result) { + if (result.count == 0) { + throw new IllegalArgumentException("Sequence contains no elements"); + } + return result.current / result.count; + } + }); + } + + /** + * Compute the average by extracting integer values from the source via an + * extractor function. + * + * @param + * the source value type + */ + public static final class AverageIntegerExtractor implements OnSubscribeFunc { + final Observable source; + final Func1 valueExtractor; + + public AverageIntegerExtractor(Observable source, Func1 valueExtractor) { + this.source = source; + this.valueExtractor = valueExtractor; + } + + @Override + public Subscription onSubscribe(Observer t1) { + return source.subscribe(new AverageObserver(t1)); + } + + /** Computes the average. */ + private final class AverageObserver implements Observer { + final Observer observer; + int sum; + int count; + + public AverageObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onNext(T args) { + sum += valueExtractor.call(args); + count++; + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onCompleted() { + if (count > 0) { + try { + observer.onNext(sum / count); + } catch (Throwable t) { + observer.onError(t); + return; + } + observer.onCompleted(); + } else { + observer.onError(new IllegalArgumentException("Sequence contains no elements")); + } + } + + } + } + + /** + * Compute the average by extracting long values from the source via an + * extractor function. + * + * @param + * the source value type + */ + public static final class AverageLongExtractor implements OnSubscribeFunc { + final Observable source; + final Func1 valueExtractor; + + public AverageLongExtractor(Observable source, Func1 valueExtractor) { + this.source = source; + this.valueExtractor = valueExtractor; + } + + @Override + public Subscription onSubscribe(Observer t1) { + return source.subscribe(new AverageObserver(t1)); + } + + /** Computes the average. */ + private final class AverageObserver implements Observer { + final Observer observer; + long sum; + int count; + + public AverageObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onNext(T args) { + sum += valueExtractor.call(args); + count++; + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onCompleted() { + if (count > 0) { + try { + observer.onNext(sum / count); + } catch (Throwable t) { + observer.onError(t); + return; + } + observer.onCompleted(); + } else { + observer.onError(new IllegalArgumentException("Sequence contains no elements")); + } + } + + } + } + + /** + * Compute the average by extracting float values from the source via an + * extractor function. + * + * @param + * the source value type + */ + public static final class AverageFloatExtractor implements OnSubscribeFunc { + final Observable source; + final Func1 valueExtractor; + + public AverageFloatExtractor(Observable source, Func1 valueExtractor) { + this.source = source; + this.valueExtractor = valueExtractor; + } + + @Override + public Subscription onSubscribe(Observer t1) { + return source.subscribe(new AverageObserver(t1)); + } + + /** Computes the average. */ + private final class AverageObserver implements Observer { + final Observer observer; + float sum; + int count; + + public AverageObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onNext(T args) { + sum += valueExtractor.call(args); + count++; + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onCompleted() { + if (count > 0) { + try { + observer.onNext(sum / count); + } catch (Throwable t) { + observer.onError(t); + return; + } + observer.onCompleted(); + } else { + observer.onError(new IllegalArgumentException("Sequence contains no elements")); + } + } + + } + } + + /** + * Compute the average by extracting double values from the source via an + * extractor function. + * + * @param + * the source value type + */ + public static final class AverageDoubleExtractor implements OnSubscribeFunc { + final Observable source; + final Func1 valueExtractor; + + public AverageDoubleExtractor(Observable source, Func1 valueExtractor) { + this.source = source; + this.valueExtractor = valueExtractor; + } + + @Override + public Subscription onSubscribe(Observer t1) { + return source.subscribe(new AverageObserver(t1)); + } + + /** Computes the average. */ + private final class AverageObserver implements Observer { + final Observer observer; + double sum; + int count; + + public AverageObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onNext(T args) { + sum += valueExtractor.call(args); + count++; + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onCompleted() { + if (count > 0) { + try { + observer.onNext(sum / count); + } catch (Throwable t) { + observer.onError(t); + return; + } + observer.onCompleted(); + } else { + observer.onError(new IllegalArgumentException("Sequence contains no elements")); + } + } + + } + } +} diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationMinMax.java b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationMinMax.java new file mode 100644 index 0000000000..cfa92d4fce --- /dev/null +++ b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationMinMax.java @@ -0,0 +1,147 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.math.operators; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import rx.Observable; +import rx.functions.Func1; +import rx.functions.Func2; + +/** + * Returns the minimum element in an observable sequence. + */ +public class OperationMinMax { + + public static > Observable min( + Observable source) { + return minMax(source, -1L); + } + + public static Observable min(Observable source, + final Comparator comparator) { + return minMax(source, comparator, -1L); + } + + public static > Observable> minBy( + Observable source, final Func1 selector) { + return minMaxBy(source, selector, -1L); + } + + public static Observable> minBy(Observable source, + final Func1 selector, final Comparator comparator) { + return minMaxBy(source, selector, comparator, -1L); + } + + public static > Observable max( + Observable source) { + return minMax(source, 1L); + } + + public static Observable max(Observable source, + final Comparator comparator) { + return minMax(source, comparator, 1L); + } + + public static > Observable> maxBy( + Observable source, final Func1 selector) { + return minMaxBy(source, selector, 1L); + } + + public static Observable> maxBy(Observable source, + final Func1 selector, final Comparator comparator) { + return minMaxBy(source, selector, comparator, 1L); + } + + private static > Observable minMax( + Observable source, final long flag) { + return source.reduce(new Func2() { + @Override + public T call(T acc, T value) { + if (flag * acc.compareTo(value) > 0) { + return acc; + } + return value; + } + }); + } + + private static Observable minMax(Observable source, + final Comparator comparator, final long flag) { + return source.reduce(new Func2() { + @Override + public T call(T acc, T value) { + if (flag * comparator.compare(acc, value) > 0) { + return acc; + } + return value; + } + }); + } + + private static > Observable> minMaxBy( + Observable source, final Func1 selector, final long flag) { + return source.reduce(new ArrayList(), + new Func2, T, List>() { + + @Override + public List call(List acc, T value) { + if (acc.isEmpty()) { + acc.add(value); + } else { + int compareResult = selector.call(acc.get(0)) + .compareTo(selector.call(value)); + if (compareResult == 0) { + acc.add(value); + } else if (flag * compareResult < 0) { + acc.clear(); + acc.add(value); + } + } + return acc; + } + }); + } + + private static Observable> minMaxBy(Observable source, + final Func1 selector, final Comparator comparator, + final long flag) { + return source.reduce(new ArrayList(), + new Func2, T, List>() { + + @Override + public List call(List acc, T value) { + if (acc.isEmpty()) { + acc.add(value); + } else { + int compareResult = comparator.compare( + selector.call(acc.get(0)), + selector.call(value)); + if (compareResult == 0) { + acc.add(value); + } else if (flag * compareResult < 0) { + acc.clear(); + acc.add(value); + } + } + return acc; + } + }); + } + +} diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationSum.java b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationSum.java new file mode 100644 index 0000000000..3d519e4220 --- /dev/null +++ b/rxjava-contrib/rxjava-math/src/main/java/rx/math/operators/OperationSum.java @@ -0,0 +1,89 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.math.operators; + +import rx.Observable; +import rx.functions.Func2; + +/** + * A few operators for implementing the sum operation. + * + * @see MSDN: + * Observable.Sum + */ +public final class OperationSum { + + public static Observable sumIntegers(Observable source) { + return source.reduce(0, ACCUM_INT); + } + + public static Observable sumLongs(Observable source) { + return source.reduce(0l, ACCUM_LONG); + } + + public static Observable sumFloats(Observable source) { + return source.reduce(0.0f, ACCUM_FLOAT); + } + + public static Observable sumDoubles(Observable source) { + return source.reduce(0.0d, ACCUM_DOUBLE); + } + + public static Observable sumAtLeastOneIntegers(Observable source) { + return source.reduce(ACCUM_INT); + } + + public static Observable sumAtLeastOneLongs(Observable source) { + return source.reduce(ACCUM_LONG); + } + + public static Observable sumAtLeastOneFloats(Observable source) { + return source.reduce(ACCUM_FLOAT); + } + + public static Observable sumAtLeastOneDoubles(Observable source) { + return source.reduce(ACCUM_DOUBLE); + } + + private static final Func2 ACCUM_INT = new Func2() { + @Override + public Integer call(Integer accu, Integer next) { + return accu + next; + } + }; + + private static final Func2 ACCUM_LONG = new Func2() { + @Override + public Long call(Long accu, Long next) { + return accu + next; + } + }; + + private static final Func2 ACCUM_FLOAT = new Func2() { + @Override + public Float call(Float accu, Float next) { + return accu + next; + } + }; + + private static final Func2 ACCUM_DOUBLE = new Func2() { + @Override + public Double call(Double accu, Double next) { + return accu + next; + } + }; +} diff --git a/rxjava-contrib/rxjava-math/src/main/java/rx/observables/MathObservable.java b/rxjava-contrib/rxjava-math/src/main/java/rx/observables/MathObservable.java new file mode 100644 index 0000000000..2614983bf8 --- /dev/null +++ b/rxjava-contrib/rxjava-math/src/main/java/rx/observables/MathObservable.java @@ -0,0 +1,378 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.observables; + +import java.util.Comparator; + +import rx.Observable; +import rx.functions.Func1; +import rx.math.operators.OperationAverage; +import rx.math.operators.OperationMinMax; +import rx.math.operators.OperationSum; + +public class MathObservable { + + private final Observable o; + + private MathObservable(Observable o) { + this.o = o; + } + + public static MathObservable from(Observable o) { + return new MathObservable(o); + } + + /** + * Returns an Observable that emits the average of the Doubles emitted by the source Observable. + *

+ * + * + * @param source + * source Observable to compute the average of + * @return an Observable that emits a single item: the average of all the Doubles emitted by the source + * Observable + * @see RxJava Wiki: averageDouble() + * @see MSDN: Observable.Average + */ + public final static Observable averageDouble(Observable source) { + return OperationAverage.averageDoubles(source); + } + + /** + * Returns an Observable that emits the average of the Floats emitted by the source Observable. + *

+ * + * + * @param source + * source Observable to compute the average of + * @return an Observable that emits a single item: the average of all the Floats emitted by the source + * Observable + * @see RxJava Wiki: averageFloat() + * @see MSDN: Observable.Average + */ + public final static Observable averageFloat(Observable source) { + return OperationAverage.averageFloats(source); + } + + /** + * Returns an Observable that emits the average of the Integers emitted by the source Observable. + *

+ * + * + * @param source + * source Observable to compute the average of + * @return an Observable that emits a single item: the average of all the Integers emitted by the source + * Observable + * @throws IllegalArgumentException + * if the source Observable emits no items + * @see RxJava Wiki: averageInteger() + * @see MSDN: Observable.Average + */ + public final static Observable averageInteger(Observable source) { + return OperationAverage.average(source); + } + + /** + * Returns an Observable that emits the average of the Longs emitted by the source Observable. + *

+ * + * + * @param source + * source Observable to compute the average of + * @return an Observable that emits a single item: the average of all the Longs emitted by the source + * Observable + * @see RxJava Wiki: averageLong() + * @see MSDN: Observable.Average + */ + public final static Observable averageLong(Observable source) { + return OperationAverage.averageLongs(source); + } + + /** + * Returns an Observable that emits the single item emitted by the source Observable with the maximum + * numeric value. If there is more than one item with the same maximum value, it emits the last-emitted of + * these. + *

+ * + * + * @param source + * an Observable to scan for the maximum emitted item + * @return an Observable that emits this maximum item + * @throws IllegalArgumentException + * if the source is empty + * @see RxJava Wiki: max() + * @see MSDN: Observable.Max + */ + public final static > Observable max(Observable source) { + return OperationMinMax.max(source); + } + + /** + * Returns an Observable that emits the single numerically minimum item emitted by the source Observable. + * If there is more than one such item, it returns the last-emitted one. + *

+ * + * + * @param source + * an Observable to determine the minimum item of + * @return an Observable that emits the minimum item emitted by the source Observable + * @throws IllegalArgumentException + * if the source is empty + * @see MSDN: Observable.Min + */ + public final static > Observable min(Observable source) { + return OperationMinMax.min(source); + } + + /** + * Returns an Observable that emits the sum of all the Doubles emitted by the source Observable. + *

+ * + * + * @param source + * the source Observable to compute the sum of + * @return an Observable that emits a single item: the sum of all the Doubles emitted by the source + * Observable + * @see RxJava Wiki: sumDouble() + * @see MSDN: Observable.Sum + */ + public final static Observable sumDouble(Observable source) { + return OperationSum.sumDoubles(source); + } + + /** + * Returns an Observable that emits the sum of all the Floats emitted by the source Observable. + *

+ * + * + * @param source + * the source Observable to compute the sum of + * @return an Observable that emits a single item: the sum of all the Floats emitted by the source + * Observable + * @see RxJava Wiki: sumFloat() + * @see MSDN: Observable.Sum + */ + public final static Observable sumFloat(Observable source) { + return OperationSum.sumFloats(source); + } + + /** + * Returns an Observable that emits the sum of all the Integers emitted by the source Observable. + *

+ * + * + * @param source + * source Observable to compute the sum of + * @return an Observable that emits a single item: the sum of all the Integers emitted by the source + * Observable + * @see RxJava Wiki: sumInteger() + * @see MSDN: Observable.Sum + */ + public final static Observable sumInteger(Observable source) { + return OperationSum.sumIntegers(source); + } + + /** + * Returns an Observable that emits the sum of all the Longs emitted by the source Observable. + *

+ * + * + * @param source + * source Observable to compute the sum of + * @return an Observable that emits a single item: the sum of all the Longs emitted by the + * source Observable + * @see RxJava Wiki: sumLong() + * @see MSDN: Observable.Sum + */ + public final static Observable sumLong(Observable source) { + return OperationSum.sumLongs(source); + } + + /** + * Returns an Observable that transforms items emitted by the source Observable into Doubles by using a + * function you provide and then emits the Double average of the complete sequence of transformed values. + *

+ * + * + * @param valueExtractor + * the function to transform an item emitted by the source Observable into a Double + * @return an Observable that emits a single item: the Double average of the complete sequence of items + * emitted by the source Observable when transformed into Doubles by the specified function + * @see RxJava Wiki: averageDouble() + * @see MSDN: Observable.Average + */ + public final Observable averageDouble(Func1 valueExtractor) { + return Observable.create(new OperationAverage.AverageDoubleExtractor(o, valueExtractor)); + } + + /** + * Returns an Observable that transforms items emitted by the source Observable into Floats by using a + * function you provide and then emits the Float average of the complete sequence of transformed values. + *

+ * + * + * @param valueExtractor + * the function to transform an item emitted by the source Observable into a Float + * @return an Observable that emits a single item: the Float average of the complete sequence of items + * emitted by the source Observable when transformed into Floats by the specified function + * @see RxJava Wiki: averageFloat() + * @see MSDN: Observable.Average + */ + public final Observable averageFloat(Func1 valueExtractor) { + return Observable.create(new OperationAverage.AverageFloatExtractor(o, valueExtractor)); + } + + /** + * Returns an Observable that transforms items emitted by the source Observable into Integers by using a + * function you provide and then emits the Integer average of the complete sequence of transformed values. + *

+ * + * + * @param valueExtractor + * the function to transform an item emitted by the source Observable into an Integer + * @return an Observable that emits a single item: the Integer average of the complete sequence of items + * emitted by the source Observable when transformed into Integers by the specified function + * @see RxJava Wiki: averageInteger() + * @see MSDN: Observable.Average + */ + public final Observable averageInteger(Func1 valueExtractor) { + return Observable.create(new OperationAverage.AverageIntegerExtractor(o, valueExtractor)); + } + + /** + * Returns an Observable that transforms items emitted by the source Observable into Longs by using a + * function you provide and then emits the Long average of the complete sequence of transformed values. + *

+ * + * + * @param valueExtractor + * the function to transform an item emitted by the source Observable into a Long + * @return an Observable that emits a single item: the Long average of the complete sequence of items + * emitted by the source Observable when transformed into Longs by the specified function + * @see RxJava Wiki: averageLong() + * @see MSDN: Observable.Average + */ + public final Observable averageLong(Func1 valueExtractor) { + return Observable.create(new OperationAverage.AverageLongExtractor(o, valueExtractor)); + } + + /** + * Returns an Observable that emits the maximum item emitted by the source Observable, according to the + * specified comparator. If there is more than one item with the same maximum value, it emits the + * last-emitted of these. + *

+ * + * + * @param comparator + * the comparer used to compare items + * @return an Observable that emits the maximum item emitted by the source Observable, according to the + * specified comparator + * @throws IllegalArgumentException + * if the source is empty + * @see RxJava Wiki: max() + * @see MSDN: Observable.Max + */ + public final Observable max(Comparator comparator) { + return OperationMinMax.max(o, comparator); + } + + /** + * Returns an Observable that emits the minimum item emitted by the source Observable, according to a + * specified comparator. If there is more than one such item, it returns the last-emitted one. + *

+ * + * + * @param comparator + * the comparer used to compare elements + * @return an Observable that emits the minimum item emitted by the source Observable according to the + * specified comparator + * @throws IllegalArgumentException + * if the source is empty + * @see RxJava Wiki: min() + * @see MSDN: Observable.Min + */ + public final Observable min(Comparator comparator) { + return OperationMinMax.min(o, comparator); + } + + /** + * Returns an Observable that extracts a Double from each of the items emitted by the source Observable via + * a function you specify, and then emits the sum of these Doubles. + *

+ * + * + * @param valueExtractor + * the function to extract a Double from each item emitted by the source Observable + * @return an Observable that emits the Double sum of the Double values corresponding to the items emitted + * by the source Observable as transformed by the provided function + * @see RxJava Wiki: sumDouble() + * @see MSDN: Observable.Sum + */ + public final Observable sumDouble(Func1 valueExtractor) { + return OperationSum.sumAtLeastOneDoubles(o.map(valueExtractor)); + } + + /** + * Returns an Observable that extracts a Float from each of the items emitted by the source Observable via + * a function you specify, and then emits the sum of these Floats. + *

+ * + * + * @param valueExtractor + * the function to extract a Float from each item emitted by the source Observable + * @return an Observable that emits the Float sum of the Float values corresponding to the items emitted by + * the source Observable as transformed by the provided function + * @see RxJava Wiki: sumFloat() + * @see MSDN: Observable.Sum + */ + public final Observable sumFloat(Func1 valueExtractor) { + return OperationSum.sumAtLeastOneFloats(o.map(valueExtractor)); + } + + /** + * Returns an Observable that extracts an Integer from each of the items emitted by the source Observable + * via a function you specify, and then emits the sum of these Integers. + *

+ * + * + * @param valueExtractor + * the function to extract an Integer from each item emitted by the source Observable + * @return an Observable that emits the Integer sum of the Integer values corresponding to the items emitted + * by the source Observable as transformed by the provided function + * @see RxJava Wiki: sumInteger() + * @see MSDN: Observable.Sum + */ + public final Observable sumInteger(Func1 valueExtractor) { + return OperationSum.sumAtLeastOneIntegers(o.map(valueExtractor)); + } + + /** + * Returns an Observable that extracts a Long from each of the items emitted by the source Observable via a + * function you specify, and then emits the sum of these Longs. + *

+ * + * + * @param valueExtractor + * the function to extract a Long from each item emitted by the source Observable + * @return an Observable that emits the Long sum of the Long values corresponding to the items emitted by + * the source Observable as transformed by the provided function + * @see RxJava Wiki: sumLong() + * @see MSDN: Observable.Sum + */ + public final Observable sumLong(Func1 valueExtractor) { + return OperationSum.sumAtLeastOneLongs(o.map(valueExtractor)); + } +} diff --git a/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationAverageTest.java b/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationAverageTest.java new file mode 100644 index 0000000000..a77868dc7a --- /dev/null +++ b/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationAverageTest.java @@ -0,0 +1,342 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.math.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static rx.math.operators.OperationAverage.*; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.functions.Func1; +import rx.observables.MathObservable; + +public class OperationAverageTest { + + @SuppressWarnings("unchecked") + Observer w = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer wl = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer wf = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer wd = mock(Observer.class); + + @Test + public void testAverageOfAFewInts() throws Throwable { + Observable src = Observable.from(1, 2, 3, 4, 6); + average(src).subscribe(w); + + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(3); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testEmptyAverage() throws Throwable { + Observable src = Observable.empty(); + average(src).subscribe(w); + + verify(w, never()).onNext(anyInt()); + verify(w, times(1)).onError(isA(IllegalArgumentException.class)); + verify(w, never()).onCompleted(); + } + + @Test + public void testAverageOfAFewLongs() throws Throwable { + Observable src = Observable.from(1L, 2L, 3L, 4L, 6L); + averageLongs(src).subscribe(wl); + + verify(wl, times(1)).onNext(anyLong()); + verify(wl).onNext(3L); + verify(wl, never()).onError(any(Throwable.class)); + verify(wl, times(1)).onCompleted(); + } + + @Test + public void testEmptyAverageLongs() throws Throwable { + Observable src = Observable.empty(); + averageLongs(src).subscribe(wl); + + verify(wl, never()).onNext(anyLong()); + verify(wl, times(1)).onError(isA(IllegalArgumentException.class)); + verify(wl, never()).onCompleted(); + } + + @Test + public void testAverageOfAFewFloats() throws Throwable { + Observable src = Observable.from(1.0f, 2.0f); + averageFloats(src).subscribe(wf); + + verify(wf, times(1)).onNext(anyFloat()); + verify(wf).onNext(1.5f); + verify(wf, never()).onError(any(Throwable.class)); + verify(wf, times(1)).onCompleted(); + } + + @Test + public void testEmptyAverageFloats() throws Throwable { + Observable src = Observable.empty(); + averageFloats(src).subscribe(wf); + + verify(wf, never()).onNext(anyFloat()); + verify(wf, times(1)).onError(isA(IllegalArgumentException.class)); + verify(wf, never()).onCompleted(); + } + + @Test + public void testAverageOfAFewDoubles() throws Throwable { + Observable src = Observable.from(1.0d, 2.0d); + averageDoubles(src).subscribe(wd); + + verify(wd, times(1)).onNext(anyDouble()); + verify(wd).onNext(1.5d); + verify(wd, never()).onError(any(Throwable.class)); + verify(wd, times(1)).onCompleted(); + } + + @Test + public void testEmptyAverageDoubles() throws Throwable { + Observable src = Observable.empty(); + averageDoubles(src).subscribe(wd); + + verify(wd, never()).onNext(anyDouble()); + verify(wd, times(1)).onError(isA(IllegalArgumentException.class)); + verify(wd, never()).onCompleted(); + } + + void testThrows(Observer o, Class errorClass) { + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + verify(o, times(1)).onError(any(errorClass)); + } + + void testValue(Observer o, N value) { + verify(o, times(1)).onNext(value); + verify(o, times(1)).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testIntegerAverageSelector() { + Observable source = Observable.from("a", "bb", "ccc", "dddd"); + Func1 length = new Func1() { + @Override + public Integer call(String t1) { + return t1.length(); + } + }; + + Observable result = MathObservable.from(source).averageInteger(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testValue(o, 2); + } + + @Test + public void testLongAverageSelector() { + Observable source = Observable.from("a", "bb", "ccc", "dddd"); + Func1 length = new Func1() { + @Override + public Long call(String t1) { + return (long) t1.length(); + } + }; + + Observable result = MathObservable.from(source).averageLong(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testValue(o, 2L); + } + + @Test + public void testFloatAverageSelector() { + Observable source = Observable.from("a", "bb", "ccc", "dddd"); + Func1 length = new Func1() { + @Override + public Float call(String t1) { + return (float) t1.length(); + } + }; + + Observable result = MathObservable.from(source).averageFloat(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testValue(o, 2.5f); + } + + @Test + public void testDoubleAverageSelector() { + Observable source = Observable.from("a", "bb", "ccc", "dddd"); + Func1 length = new Func1() { + @Override + public Double call(String t1) { + return (double) t1.length(); + } + }; + + Observable result = MathObservable.from(source).averageDouble(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testValue(o, 2.5d); + } + + @Test + public void testIntegerAverageSelectorEmpty() { + Observable source = Observable.empty(); + Func1 length = new Func1() { + @Override + public Integer call(String t1) { + return t1.length(); + } + }; + + Observable result = MathObservable.from(source).averageInteger(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, IllegalArgumentException.class); + } + + @Test + public void testLongAverageSelectorEmpty() { + Observable source = Observable.empty(); + Func1 length = new Func1() { + @Override + public Long call(String t1) { + return (long) t1.length(); + } + }; + + Observable result = MathObservable.from(source).averageLong(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, IllegalArgumentException.class); + } + + @Test + public void testFloatAverageSelectorEmpty() { + Observable source = Observable.empty(); + Func1 length = new Func1() { + @Override + public Float call(String t1) { + return (float) t1.length(); + } + }; + + Observable result = MathObservable.from(source).averageFloat(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, IllegalArgumentException.class); + } + + @Test + public void testDoubleAverageSelectorEmpty() { + Observable source = Observable.empty(); + Func1 length = new Func1() { + @Override + public Double call(String t1) { + return (double) t1.length(); + } + }; + + Observable result = MathObservable.from(source).averageDouble(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, IllegalArgumentException.class); + } + + @Test + public void testIntegerAverageSelectorThrows() { + Observable source = Observable.from("a"); + Func1 length = new Func1() { + @Override + public Integer call(String t1) { + throw new CustomException(); + } + }; + + Observable result = MathObservable.from(source).averageInteger(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, CustomException.class); + } + + @Test + public void testLongAverageSelectorThrows() { + Observable source = Observable.from("a"); + Func1 length = new Func1() { + @Override + public Long call(String t1) { + throw new CustomException(); + } + }; + + Observable result = MathObservable.from(source).averageLong(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, CustomException.class); + } + + @Test + public void testFloatAverageSelectorThrows() { + Observable source = Observable.from("a"); + Func1 length = new Func1() { + @Override + public Float call(String t1) { + throw new CustomException(); + } + }; + + Observable result = MathObservable.from(source).averageFloat(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, CustomException.class); + } + + @Test + public void testDoubleAverageSelectorThrows() { + Observable source = Observable.from("a"); + Func1 length = new Func1() { + @Override + public Double call(String t1) { + throw new CustomException(); + } + }; + + Observable result = MathObservable.from(source).averageDouble(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, CustomException.class); + } + + static class CustomException extends RuntimeException { + } +} diff --git a/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationMinMaxTest.java b/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationMinMaxTest.java new file mode 100644 index 0000000000..63eee7b28f --- /dev/null +++ b/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationMinMaxTest.java @@ -0,0 +1,354 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.math.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static rx.math.operators.OperationMinMax.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observer; +import rx.functions.Func1; + +public class OperationMinMaxTest { + @Test + public void testMin() { + Observable observable = min(Observable.from(2, 3, 1, 4)); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMinWithEmpty() { + Observable observable = min(Observable. empty()); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMinWithComparator() { + Observable observable = min(Observable.from(2, 3, 1, 4), + new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(4); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMinWithComparatorAndEmpty() { + Observable observable = min(Observable. empty(), + new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMinBy() { + Observable> observable = minBy( + Observable.from("1", "2", "3", "4", "5", "6"), + new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(Arrays.asList("2", "4", "6")); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMinByWithEmpty() { + Observable> observable = minBy( + Observable. empty(), new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(new ArrayList()); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMinByWithComparator() { + Observable> observable = minBy( + Observable.from("1", "2", "3", "4", "5", "6"), + new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }, new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(Arrays.asList("1", "3", "5")); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMinByWithComparatorAndEmpty() { + Observable> observable = minBy( + Observable. empty(), new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }, new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(new ArrayList()); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMax() { + Observable observable = max(Observable.from(2, 3, 1, 4)); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(4); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMaxWithEmpty() { + Observable observable = max(Observable. empty()); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMaxWithComparator() { + Observable observable = max(Observable.from(2, 3, 1, 4), + new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMaxWithComparatorAndEmpty() { + Observable observable = max(Observable. empty(), + new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMaxBy() { + Observable> observable = maxBy( + Observable.from("1", "2", "3", "4", "5", "6"), + new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(Arrays.asList("1", "3", "5")); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMaxByWithEmpty() { + Observable> observable = maxBy( + Observable. empty(), new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(new ArrayList()); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMaxByWithComparator() { + Observable> observable = maxBy( + Observable.from("1", "2", "3", "4", "5", "6"), + new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }, new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(Arrays.asList("2", "4", "6")); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMaxByWithComparatorAndEmpty() { + Observable> observable = maxBy( + Observable. empty(), new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }, new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(new ArrayList()); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } +} diff --git a/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationSumTest.java b/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationSumTest.java new file mode 100644 index 0000000000..43e2805d43 --- /dev/null +++ b/rxjava-contrib/rxjava-math/src/test/java/rx/math/operators/OperationSumTest.java @@ -0,0 +1,346 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.math.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static rx.math.operators.OperationSum.*; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.functions.Func1; +import rx.observables.MathObservable; + +public class OperationSumTest { + + @SuppressWarnings("unchecked") + Observer w = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer wl = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer wf = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer wd = mock(Observer.class); + + @Test + public void testSumOfAFewInts() throws Throwable { + Observable src = Observable.from(1, 2, 3, 4, 5); + sumIntegers(src).subscribe(w); + + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(15); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testEmptySum() throws Throwable { + Observable src = Observable.empty(); + sumIntegers(src).subscribe(w); + + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(0); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testSumOfAFewLongs() throws Throwable { + Observable src = Observable.from(1L, 2L, 3L, 4L, 5L); + sumLongs(src).subscribe(wl); + + verify(wl, times(1)).onNext(anyLong()); + verify(wl).onNext(15L); + verify(wl, never()).onError(any(Throwable.class)); + verify(wl, times(1)).onCompleted(); + } + + @Test + public void testEmptySumLongs() throws Throwable { + Observable src = Observable.empty(); + sumLongs(src).subscribe(wl); + + verify(wl, times(1)).onNext(anyLong()); + verify(wl).onNext(0L); + verify(wl, never()).onError(any(Throwable.class)); + verify(wl, times(1)).onCompleted(); + } + + @Test + public void testSumOfAFewFloats() throws Throwable { + Observable src = Observable.from(1.0f); + sumFloats(src).subscribe(wf); + + verify(wf, times(1)).onNext(anyFloat()); + verify(wf).onNext(1.0f); + verify(wf, never()).onError(any(Throwable.class)); + verify(wf, times(1)).onCompleted(); + } + + @Test + public void testEmptySumFloats() throws Throwable { + Observable src = Observable.empty(); + sumFloats(src).subscribe(wf); + + verify(wf, times(1)).onNext(anyFloat()); + verify(wf).onNext(0.0f); + verify(wf, never()).onError(any(Throwable.class)); + verify(wf, times(1)).onCompleted(); + } + + @Test + public void testSumOfAFewDoubles() throws Throwable { + Observable src = Observable.from(0.0d, 1.0d, 0.5d); + sumDoubles(src).subscribe(wd); + + verify(wd, times(1)).onNext(anyDouble()); + verify(wd).onNext(1.5d); + verify(wd, never()).onError(any(Throwable.class)); + verify(wd, times(1)).onCompleted(); + } + + @Test + public void testEmptySumDoubles() throws Throwable { + Observable src = Observable.empty(); + sumDoubles(src).subscribe(wd); + + verify(wd, times(1)).onNext(anyDouble()); + verify(wd).onNext(0.0d); + verify(wd, never()).onError(any(Throwable.class)); + verify(wd, times(1)).onCompleted(); + } + + void testThrows(Observer o, Class errorClass) { + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + verify(o, times(1)).onError(any(errorClass)); + } + + void testValue(Observer o, N value) { + verify(o, times(1)).onNext(value); + verify(o, times(1)).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testIntegerSumSelector() { + Observable source = Observable.from("a", "bb", "ccc", "dddd"); + Func1 length = new Func1() { + @Override + public Integer call(String t1) { + return t1.length(); + } + }; + + Observable result = MathObservable.from(source).sumInteger(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testValue(o, 10); + } + + @Test + public void testLongSumSelector() { + Observable source = Observable.from("a", "bb", "ccc", "dddd"); + Func1 length = new Func1() { + @Override + public Long call(String t1) { + return (long) t1.length(); + } + }; + + Observable result = MathObservable.from(source).sumLong(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testValue(o, 10L); + } + + @Test + public void testFloatSumSelector() { + Observable source = Observable.from("a", "bb", "ccc", "dddd"); + Func1 length = new Func1() { + @Override + public Float call(String t1) { + return (float) t1.length(); + } + }; + + Observable result = MathObservable.from(source).sumFloat(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testValue(o, 10f); + } + + @Test + public void testDoubleSumSelector() { + Observable source = Observable.from("a", "bb", "ccc", "dddd"); + Func1 length = new Func1() { + @Override + public Double call(String t1) { + return (double) t1.length(); + } + }; + + Observable result = MathObservable.from(source).sumDouble(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testValue(o, 10d); + } + + @Test + public void testIntegerSumSelectorEmpty() { + Observable source = Observable.empty(); + Func1 length = new Func1() { + @Override + public Integer call(String t1) { + return t1.length(); + } + }; + + Observable result = MathObservable.from(source).sumInteger(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, IllegalArgumentException.class); + } + + @Test + public void testLongSumSelectorEmpty() { + Observable source = Observable.empty(); + Func1 length = new Func1() { + @Override + public Long call(String t1) { + return (long) t1.length(); + } + }; + + Observable result = MathObservable.from(source).sumLong(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, IllegalArgumentException.class); + } + + @Test + public void testFloatSumSelectorEmpty() { + Observable source = Observable.empty(); + Func1 length = new Func1() { + @Override + public Float call(String t1) { + return (float) t1.length(); + } + }; + + Observable result = MathObservable.from(source).sumFloat(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, IllegalArgumentException.class); + } + + @Test + public void testDoubleSumSelectorEmpty() { + Observable source = Observable.empty(); + Func1 length = new Func1() { + @Override + public Double call(String t1) { + return (double) t1.length(); + } + }; + + Observable result = MathObservable.from(source).sumDouble(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, IllegalArgumentException.class); + } + + @Test + public void testIntegerSumSelectorThrows() { + Observable source = Observable.from("a"); + Func1 length = new Func1() { + @Override + public Integer call(String t1) { + throw new CustomException(); + } + }; + + Observable result = MathObservable.from(source).sumInteger(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, CustomException.class); + } + + @Test + public void testLongSumSelectorThrows() { + Observable source = Observable.from("a"); + Func1 length = new Func1() { + @Override + public Long call(String t1) { + throw new CustomException(); + } + }; + + Observable result = MathObservable.from(source).sumLong(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, CustomException.class); + } + + @Test + public void testFloatSumSelectorThrows() { + Observable source = Observable.from("a"); + Func1 length = new Func1() { + @Override + public Float call(String t1) { + throw new CustomException(); + } + }; + + Observable result = MathObservable.from(source).sumFloat(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, CustomException.class); + } + + @Test + public void testDoubleSumSelectorThrows() { + Observable source = Observable.from("a"); + Func1 length = new Func1() { + @Override + public Double call(String t1) { + throw new CustomException(); + } + }; + + Observable result = MathObservable.from(source).sumDouble(length); + Observer o = mock(Observer.class); + result.subscribe(o); + + testThrows(o, CustomException.class); + } + + static class CustomException extends RuntimeException { + } +} diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index c244fb5001..92ae1c7c40 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -441,6 +441,7 @@ public final static Observable average(Observable source) { * Observable * @see RxJava Wiki: averageDouble() * @see MSDN: Observable.Average + * @deprecated Use rxjava-math module instead */ public final static Observable averageDouble(Observable source) { return OperationAverage.averageDoubles(source); @@ -457,6 +458,7 @@ public final static Observable averageDouble(Observable source) * Observable * @see RxJava Wiki: averageFloat() * @see MSDN: Observable.Average + * @deprecated Use rxjava-math module instead */ public final static Observable averageFloat(Observable source) { return OperationAverage.averageFloats(source); @@ -475,6 +477,7 @@ public final static Observable averageFloat(Observable source) { * if the source Observable emits no items * @see RxJava Wiki: averageInteger() * @see MSDN: Observable.Average + * @deprecated Use rxjava-math module instead */ public final static Observable averageInteger(Observable source) { return OperationAverage.average(source); @@ -491,6 +494,7 @@ public final static Observable averageInteger(Observable sourc * Observable * @see RxJava Wiki: averageLong() * @see MSDN: Observable.Average + * @deprecated Use rxjava-math module instead */ public final static Observable averageLong(Observable source) { return OperationAverage.averageLongs(source); @@ -1595,6 +1599,7 @@ public final static Observable just(T value, Scheduler scheduler) { * if the source is empty * @see RxJava Wiki: max() * @see MSDN: Observable.Max + * @deprecated Use rxjava-math module instead */ public final static > Observable max(Observable source) { return OperationMinMax.max(source); @@ -2309,6 +2314,7 @@ public final static Observable mergeDelayError(Observable t1 * @throws IllegalArgumentException * if the source is empty * @see MSDN: Observable.Min + * @deprecated Use rxjava-math module instead */ public final static > Observable min(Observable source) { return OperationMinMax.min(source); @@ -2485,7 +2491,7 @@ public final Boolean call(T first, T second) { public final static Observable sequenceEqual(Observable first, Observable second, Func2 equality) { return OperationSequenceEqual.sequenceEqual(first, second, equality); } - + /** * Returns an Observable that emits the sum of all the Doubles emitted by the source Observable. *

@@ -2497,6 +2503,7 @@ public final static Observable sequenceEqual(ObservableRxJava Wiki: sumDouble() * @see MSDN: Observable.Sum + * @deprecated Use rxjava-math module instead */ public final static Observable sumDouble(Observable source) { return OperationSum.sumDoubles(source); @@ -2513,6 +2520,7 @@ public final static Observable sumDouble(Observable source) { * Observable * @see RxJava Wiki: sumFloat() * @see MSDN: Observable.Sum + * @deprecated Use rxjava-math module instead */ public final static Observable sumFloat(Observable source) { return OperationSum.sumFloats(source); @@ -2529,6 +2537,7 @@ public final static Observable sumFloat(Observable source) { * Observable * @see RxJava Wiki: sumInteger() * @see MSDN: Observable.Sum + * @deprecated Use rxjava-math module instead */ public final static Observable sumInteger(Observable source) { return OperationSum.sumIntegers(source); @@ -2545,6 +2554,7 @@ public final static Observable sumInteger(Observable source) { * source Observable * @see RxJava Wiki: sumLong() * @see MSDN: Observable.Sum + * @deprecated Use rxjava-math module instead */ public final static Observable sumLong(Observable source) { return OperationSum.sumLongs(source); @@ -3413,6 +3423,7 @@ public final Observable asObservable() { * emitted by the source Observable when transformed into Doubles by the specified function * @see RxJava Wiki: averageDouble() * @see MSDN: Observable.Average + * @deprecated Use rxjava-math module instead */ public final Observable averageDouble(Func1 valueExtractor) { return create(new OperationAverage.AverageDoubleExtractor(this, valueExtractor)); @@ -3430,6 +3441,7 @@ public final Observable averageDouble(Func1 valueExtr * emitted by the source Observable when transformed into Floats by the specified function * @see RxJava Wiki: averageFloat() * @see MSDN: Observable.Average + * @deprecated Use rxjava-math module instead */ public final Observable averageFloat(Func1 valueExtractor) { return create(new OperationAverage.AverageFloatExtractor(this, valueExtractor)); @@ -3447,6 +3459,7 @@ public final Observable averageFloat(Func1 valueExtract * emitted by the source Observable when transformed into Integers by the specified function * @see RxJava Wiki: averageInteger() * @see MSDN: Observable.Average + * @deprecated Use rxjava-math module instead */ public final Observable averageInteger(Func1 valueExtractor) { return create(new OperationAverage.AverageIntegerExtractor(this, valueExtractor)); @@ -3464,6 +3477,7 @@ public final Observable averageInteger(Func1 valueE * emitted by the source Observable when transformed into Longs by the specified function * @see RxJava Wiki: averageLong() * @see MSDN: Observable.Average + * @deprecated Use rxjava-math module instead */ public final Observable averageLong(Func1 valueExtractor) { return create(new OperationAverage.AverageLongExtractor(this, valueExtractor)); @@ -4843,6 +4857,7 @@ public final Observable> materialize() { * if the source is empty * @see RxJava Wiki: max() * @see MSDN: Observable.Max + * @deprecated Use rxjava-math module instead */ public final Observable max(Comparator comparator) { return OperationMinMax.max(this, comparator); @@ -5011,6 +5026,7 @@ public final Observable mergeMapIterable(Func1RxJava Wiki: min() * @see MSDN: Observable.Min + * @deprecated Use rxjava-math module instead */ public final Observable min(Comparator comparator) { return OperationMinMax.min(this, comparator); @@ -7053,6 +7069,7 @@ public final Observable subscribeOn(Scheduler scheduler) { * by the source Observable as transformed by the provided function * @see RxJava Wiki: sumDouble() * @see MSDN: Observable.Sum + * @deprecated Use rxjava-math module instead */ public final Observable sumDouble(Func1 valueExtractor) { return OperationSum.sumAtLeastOneDoubles(map(valueExtractor)); @@ -7070,6 +7087,7 @@ public final Observable sumDouble(Func1 valueExtracto * the source Observable as transformed by the provided function * @see RxJava Wiki: sumFloat() * @see MSDN: Observable.Sum + * @deprecated Use rxjava-math module instead */ public final Observable sumFloat(Func1 valueExtractor) { return OperationSum.sumAtLeastOneFloats(map(valueExtractor)); @@ -7087,6 +7105,7 @@ public final Observable sumFloat(Func1 valueExtractor) * by the source Observable as transformed by the provided function * @see RxJava Wiki: sumInteger() * @see MSDN: Observable.Sum + * @deprecated Use rxjava-math module instead */ public final Observable sumInteger(Func1 valueExtractor) { return OperationSum.sumAtLeastOneIntegers(map(valueExtractor)); @@ -7104,6 +7123,7 @@ public final Observable sumInteger(Func1 valueExtra * the source Observable as transformed by the provided function * @see RxJava Wiki: sumLong() * @see MSDN: Observable.Sum + * @deprecated Use rxjava-math module instead */ public final Observable sumLong(Func1 valueExtractor) { return OperationSum.sumAtLeastOneLongs(map(valueExtractor)); diff --git a/settings.gradle b/settings.gradle index 5be4134cef..5a9e3e8cb0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -9,6 +9,7 @@ include 'rxjava-core', \ 'rxjava-contrib:rxjava-android', \ 'rxjava-contrib:rxjava-apache-http', \ 'rxjava-contrib:rxjava-string', \ +'rxjava-contrib:rxjava-math', \ 'rxjava-contrib:rxjava-debug', \ 'rxjava-contrib:rxjava-async-util', \ 'rxjava-contrib:rxjava-computation-expressions'