From 67374198e9dc1e3e00dd56f5c53984af3cc1a721 Mon Sep 17 00:00:00 2001 From: George Campbell Date: Tue, 28 Jan 2014 00:15:02 -0800 Subject: [PATCH] Convert to scan to use lift --- rxjava-core/src/main/java/rx/Observable.java | 26 +- .../main/java/rx/operators/OperationScan.java | 188 +++++------ .../main/java/rx/operators/OperationSum.java | 291 +++--------------- rxjava-core/src/test/java/rx/ReduceTests.java | 2 +- .../java/rx/operators/OperationScanTest.java | 6 +- .../java/rx/operators/OperationSumTest.java | 4 +- 6 files changed, 132 insertions(+), 385 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 544a60d712..a5a265fa8b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2529,14 +2529,6 @@ public final static Observable sequenceEqual(Observable sum(Observable source) { - return OperationSum.sum(source); - } - /** * Returns an Observable that emits the sum of all the Doubles emitted by the source Observable. *

@@ -2583,7 +2575,7 @@ public final static Observable sumFloat(Observable source) { * @see MSDN: Observable.Sum */ public final static Observable sumInteger(Observable source) { - return OperationSum.sum(source); + return OperationSum.sumIntegers(source); } /** @@ -5519,7 +5511,7 @@ public final Observable reduce(Func2 accumulator) { * It should use last() not takeLast(1) since it needs to emit an error if the sequence is * empty. */ - return create(OperationScan.scan(this, accumulator)).last(); + return scan(accumulator).last(); } /** @@ -5547,7 +5539,7 @@ public final Observable reduce(Func2 accumulator) { * @see Wikipedia: Fold (higher-order function) */ public final Observable reduce(R initialValue, Func2 accumulator) { - return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1); + return scan(initialValue, accumulator).takeLast(1); } /** @@ -6148,7 +6140,7 @@ public final Observable sample(Observable sampler) { * @see MSDN: Observable.Scan */ public final Observable scan(Func2 accumulator) { - return create(OperationScan.scan(this, accumulator)); + return lift(OperationScan.scan(accumulator)); } /** @@ -6175,7 +6167,7 @@ public final Observable scan(Func2 accumulator) { * @see MSDN: Observable.Scan */ public final Observable scan(R initialValue, Func2 accumulator) { - return create(OperationScan.scan(this, initialValue, accumulator)); + return lift(OperationScan.scan(initialValue, accumulator)); } /** @@ -7079,7 +7071,7 @@ public final Observable subscribeOn(Scheduler scheduler) { * @see MSDN: Observable.Sum */ public final Observable sumDouble(Func1 valueExtractor) { - return create(new OperationSum.SumDoubleExtractor(this, valueExtractor)); + return OperationSum.sumAtLeastOneDoubles(map(valueExtractor)); } /** @@ -7096,7 +7088,7 @@ public final Observable sumDouble(Func1 valueExtracto * @see MSDN: Observable.Sum */ public final Observable sumFloat(Func1 valueExtractor) { - return create(new OperationSum.SumFloatExtractor(this, valueExtractor)); + return OperationSum.sumAtLeastOneFloats(map(valueExtractor)); } /** @@ -7113,7 +7105,7 @@ public final Observable sumFloat(Func1 valueExtractor) * @see MSDN: Observable.Sum */ public final Observable sumInteger(Func1 valueExtractor) { - return create(new OperationSum.SumIntegerExtractor(this, valueExtractor)); + return OperationSum.sumAtLeastOneIntegers(map(valueExtractor)); } /** @@ -7130,7 +7122,7 @@ public final Observable sumInteger(Func1 valueExtra * @see MSDN: Observable.Sum */ public final Observable sumLong(Func1 valueExtractor) { - return create(new OperationSum.SumLongExtractor(this, valueExtractor)); + return OperationSum.sumAtLeastOneLongs(map(valueExtractor)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationScan.java b/rxjava-core/src/main/java/rx/operators/OperationScan.java index 3c58e053c5..8bbaeb0e59 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationScan.java +++ b/rxjava-core/src/main/java/rx/operators/OperationScan.java @@ -15,10 +15,8 @@ */ package rx.operators; -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscription; +import rx.Observable.Operator; +import rx.Subscriber; import rx.util.functions.Func2; /** @@ -36,7 +34,8 @@ */ public final class OperationScan { /** - * Applies an accumulator function over an observable sequence and returns each intermediate result with the specified source and accumulator. + * Applies an accumulator function over an observable sequence and returns each intermediate + * result with the specified source and accumulator. * * @param sequence * An observable sequence of elements to project. @@ -45,124 +44,97 @@ public final class OperationScan { * @param accumulator * An accumulator function to be invoked on each element from the sequence. * - * @return An observable sequence whose elements are the result of accumulating the output from the list of Observables. - * @see Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource, + * @return An observable sequence whose elements are the result of accumulating the output from + * the list of Observables. + * @see Observable.Scan(TSource, + * TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource, * TAccumulate)) */ - public static OnSubscribeFunc scan(Observable sequence, R initialValue, Func2 accumulator) { - return new Accumulator(sequence, initialValue, accumulator); + public static Operator scan(final R initialValue, final Func2 accumulator) { + return new Operator() { + @Override + public Subscriber call(final Subscriber observer) { + observer.onNext(initialValue); + return new Subscriber(observer) { + private R value = initialValue; + + @Override + public void onNext(T value) { + try { + this.value = accumulator.call(this.value, value); + } catch (Throwable e) { + observer.onError(e); + observer.unsubscribe(); + } + observer.onNext(this.value); + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + }; + } + }; } /** - * Applies an accumulator function over an observable sequence and returns each intermediate result with the specified source and accumulator. + * Applies an accumulator function over an observable sequence and returns each intermediate + * result with the specified source and accumulator. * * @param sequence * An observable sequence of elements to project. * @param accumulator * An accumulator function to be invoked on each element from the sequence. * - * @return An observable sequence whose elements are the result of accumulating the output from the list of Observables. - * @see Observable.Scan(TSource) Method (IObservable(TSource), Func(TSource, TSource, TSource)) + * @return An observable sequence whose elements are the result of accumulating the output from + * the list of Observables. + * @see Observable.Scan(TSource) + * Method (IObservable(TSource), Func(TSource, TSource, TSource)) */ - public static OnSubscribeFunc scan(Observable sequence, Func2 accumulator) { - return new AccuWithoutInitialValue(sequence, accumulator); - } - - private static class AccuWithoutInitialValue implements OnSubscribeFunc { - private final Observable sequence; - private final Func2 accumulatorFunction; - - private AccumulatingObserver accumulatingObserver; - - private AccuWithoutInitialValue(Observable sequence, Func2 accumulator) { - this.sequence = sequence; - this.accumulatorFunction = accumulator; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - return sequence.subscribe(new Observer() { - - // has to be synchronized so that the initial value is always sent only once. - @Override - public synchronized void onNext(T value) { - if (accumulatingObserver == null) { - observer.onNext(value); - accumulatingObserver = new AccumulatingObserver(observer, value, accumulatorFunction); - } else { - accumulatingObserver.onNext(value); + public static Operator scan(final Func2 accumulator) { + return new Operator() { + @Override + public Subscriber call(final Subscriber observer) { + return new Subscriber(observer) { + private boolean first = true; + private T value; + + @Override + public void onNext(T value) { + if (first) { + this.value = value; + first = false; + } + else { + try { + this.value = accumulator.call(this.value, value); + } catch (Throwable e) { + observer.onError(e); + observer.unsubscribe(); + } + } + observer.onNext(this.value); } - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - @Override - public void onCompleted() { - observer.onCompleted(); - } - }); - } - } - - private static class Accumulator implements OnSubscribeFunc { - private final Observable sequence; - private final R initialValue; - private final Func2 accumulatorFunction; - - private Accumulator(Observable sequence, R initialValue, Func2 accumulator) { - this.sequence = sequence; - this.initialValue = initialValue; - this.accumulatorFunction = accumulator; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - observer.onNext(initialValue); - return sequence.subscribe(new AccumulatingObserver(observer, initialValue, accumulatorFunction)); - } - } - - private static class AccumulatingObserver implements Observer { - private final Observer observer; - private final Func2 accumulatorFunction; - - private R acc; - - private AccumulatingObserver(Observer observer, R initialValue, Func2 accumulator) { - this.observer = observer; - this.accumulatorFunction = accumulator; - - this.acc = initialValue; - } + @Override + public void onError(Throwable e) { + observer.onError(e); + } - /** - * We must synchronize this because we can't allow - * multiple threads to execute the 'accumulatorFunction' at the same time because - * the accumulator code very often will be doing mutation of the 'acc' object such as a non-threadsafe HashMap - * - * Because it's synchronized it's using non-atomic variables since everything in this method is single-threaded - */ - @Override - public synchronized void onNext(T value) { - try { - acc = accumulatorFunction.call(acc, value); - observer.onNext(acc); - } catch (Throwable ex) { - observer.onError(ex); + @Override + public void onCompleted() { + observer.onCompleted(); + } + }; } - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - - @Override - public void onCompleted() { - observer.onCompleted(); - } + }; } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationSum.java b/rxjava-core/src/main/java/rx/operators/OperationSum.java index 4a12335e42..65d4c8e4a9 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSum.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSum.java @@ -16,291 +16,74 @@ package rx.operators; import rx.Observable; -import rx.Observer; -import rx.Subscription; -import rx.util.functions.Func1; import rx.util.functions.Func2; /** * A few operators for implementing the sum operation. * - * @see MSDN: Observable.Sum + * @see MSDN: + * Observable.Sum */ public final class OperationSum { - public static Observable sum(Observable source) { - return source.reduce(0, new Func2() { - @Override - public Integer call(Integer accu, Integer next) { - return accu + next; - } - }); + + public static Observable sumIntegers(Observable source) { + return source.reduce(0, ACCUM_INT); } public static Observable sumLongs(Observable source) { - return source.reduce(0L, new Func2() { - @Override - public Long call(Long accu, Long next) { - return accu + next; - } - }); + return source.reduce(0l, ACCUM_LONG); } public static Observable sumFloats(Observable source) { - return source.reduce(0.0f, new Func2() { - @Override - public Float call(Float accu, Float next) { - return accu + next; - } - }); + return source.reduce(0.0f, ACCUM_FLOAT); } public static Observable sumDoubles(Observable source) { - return source.reduce(0.0d, new Func2() { - @Override - public Double call(Double accu, Double next) { - return accu + next; - } - }); + return source.reduce(0.0d, ACCUM_DOUBLE); } - /** - * Compute the sum by extracting integer values from the source via an - * extractor function. - * - * @param - * the source value type - */ - public static final class SumIntegerExtractor implements Observable.OnSubscribeFunc { - final Observable source; - final Func1 valueExtractor; - - public SumIntegerExtractor(Observable source, Func1 valueExtractor) { - this.source = source; - this.valueExtractor = valueExtractor; - } - - @Override - public Subscription onSubscribe(Observer t1) { - return source.subscribe(new SumObserver(t1)); - } - - /** Computes the average. */ - private final class SumObserver implements Observer { - final Observer observer; - int sum; - boolean hasValue; - - public SumObserver(Observer observer) { - this.observer = observer; - } - - @Override - public void onNext(T args) { - sum += valueExtractor.call(args); - hasValue = true; - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - - @Override - public void onCompleted() { - if (hasValue) { - try { - observer.onNext(sum); - } catch (Throwable t) { - observer.onError(t); - return; - } - observer.onCompleted(); - } else { - observer.onError(new IllegalArgumentException("Sequence contains no elements")); - } - } - - } + public static Observable sumAtLeastOneIntegers(Observable source) { + return source.reduce(ACCUM_INT); } - /** - * Compute the sum by extracting long values from the source via an - * extractor function. - * - * @param - * the source value type - */ - public static final class SumLongExtractor implements Observable.OnSubscribeFunc { - final Observable source; - final Func1 valueExtractor; - - public SumLongExtractor(Observable source, Func1 valueExtractor) { - this.source = source; - this.valueExtractor = valueExtractor; - } - - @Override - public Subscription onSubscribe(Observer t1) { - return source.subscribe(new SumObserver(t1)); - } - - /** Computes the average. */ - private final class SumObserver implements Observer { - final Observer observer; - long sum; - boolean hasValue; - - public SumObserver(Observer observer) { - this.observer = observer; - } - - @Override - public void onNext(T args) { - sum += valueExtractor.call(args); - hasValue = true; - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - - @Override - public void onCompleted() { - if (hasValue) { - try { - observer.onNext(sum); - } catch (Throwable t) { - observer.onError(t); - return; - } - observer.onCompleted(); - } else { - observer.onError(new IllegalArgumentException("Sequence contains no elements")); - } - } - - } + public static Observable sumAtLeastOneLongs(Observable source) { + return source.reduce(ACCUM_LONG); } - /** - * Compute the sum by extracting float values from the source via an - * extractor function. - * - * @param - * the source value type - */ - public static final class SumFloatExtractor implements Observable.OnSubscribeFunc { - final Observable source; - final Func1 valueExtractor; + public static Observable sumAtLeastOneFloats(Observable source) { + return source.reduce(ACCUM_FLOAT); + } - public SumFloatExtractor(Observable source, Func1 valueExtractor) { - this.source = source; - this.valueExtractor = valueExtractor; - } + public static Observable sumAtLeastOneDoubles(Observable source) { + return source.reduce(ACCUM_DOUBLE); + } + private static final Func2 ACCUM_INT = new Func2() { @Override - public Subscription onSubscribe(Observer t1) { - return source.subscribe(new SumObserver(t1)); + public Integer call(Integer accu, Integer next) { + return accu + next; } + }; - /** Computes the average. */ - private final class SumObserver implements Observer { - final Observer observer; - float sum; - boolean hasValue; - - public SumObserver(Observer observer) { - this.observer = observer; - } - - @Override - public void onNext(T args) { - sum += valueExtractor.call(args); - hasValue = true; - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - - @Override - public void onCompleted() { - if (hasValue) { - try { - observer.onNext(sum); - } catch (Throwable t) { - observer.onError(t); - return; - } - observer.onCompleted(); - } else { - observer.onError(new IllegalArgumentException("Sequence contains no elements")); - } - } - - } - } - - /** - * Compute the sum by extracting float values from the source via an - * extractor function. - * - * @param - * the source value type - */ - public static final class SumDoubleExtractor implements Observable.OnSubscribeFunc { - final Observable source; - final Func1 valueExtractor; - - public SumDoubleExtractor(Observable source, Func1 valueExtractor) { - this.source = source; - this.valueExtractor = valueExtractor; + private static final Func2 ACCUM_LONG = new Func2() { + @Override + public Long call(Long accu, Long next) { + return accu + next; } + }; + private static final Func2 ACCUM_FLOAT = new Func2() { @Override - public Subscription onSubscribe(Observer t1) { - return source.subscribe(new SumObserver(t1)); + public Float call(Float accu, Float next) { + return accu + next; } + }; - /** Computes the average. */ - private final class SumObserver implements Observer { - final Observer observer; - double sum; - boolean hasValue; - - public SumObserver(Observer observer) { - this.observer = observer; - } - - @Override - public void onNext(T args) { - sum += valueExtractor.call(args); - hasValue = true; - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - - @Override - public void onCompleted() { - if (hasValue) { - try { - observer.onNext(sum); - } catch (Throwable t) { - observer.onError(t); - return; - } - observer.onCompleted(); - } else { - observer.onError(new IllegalArgumentException("Sequence contains no elements")); - } - } - + private static final Func2 ACCUM_DOUBLE = new Func2() { + @Override + public Double call(Double accu, Double next) { + return accu + next; } - } - + }; } diff --git a/rxjava-core/src/test/java/rx/ReduceTests.java b/rxjava-core/src/test/java/rx/ReduceTests.java index 20d1b8811c..8704f80d61 100644 --- a/rxjava-core/src/test/java/rx/ReduceTests.java +++ b/rxjava-core/src/test/java/rx/ReduceTests.java @@ -52,7 +52,7 @@ public Movie call(Movie t1, Movie t2) { } }; - Observable reduceResult = Observable.create(OperationScan.scan(horrorMovies, chooseSecondMovie)).takeLast(1); + Observable reduceResult = horrorMovies.lift(OperationScan.scan(chooseSecondMovie)).takeLast(1); Observable reduceResult2 = horrorMovies.reduce(chooseSecondMovie); } diff --git a/rxjava-core/src/test/java/rx/operators/OperationScanTest.java b/rxjava-core/src/test/java/rx/operators/OperationScanTest.java index 47a3274e52..2e1613353b 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationScanTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationScanTest.java @@ -41,7 +41,7 @@ public void testScanIntegersWithInitialValue() { Observable observable = Observable.from(1, 2, 3); - Observable m = Observable.create(scan(observable, "", new Func2() { + Observable m = observable.lift(scan("", new Func2() { @Override public String call(String s, Integer n) { @@ -68,7 +68,7 @@ public void testScanIntegersWithoutInitialValue() { Observable observable = Observable.from(1, 2, 3); - Observable m = Observable.create(scan(observable, new Func2() { + Observable m = observable.lift(scan(new Func2() { @Override public Integer call(Integer t1, Integer t2) { @@ -95,7 +95,7 @@ public void testScanIntegersWithoutInitialValueAndOnlyOneValue() { Observable observable = Observable.from(1); - Observable m = Observable.create(scan(observable, new Func2() { + Observable m = observable.lift(scan(new Func2() { @Override public Integer call(Integer t1, Integer t2) { diff --git a/rxjava-core/src/test/java/rx/operators/OperationSumTest.java b/rxjava-core/src/test/java/rx/operators/OperationSumTest.java index 478158eb21..7253e6196f 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSumTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSumTest.java @@ -39,7 +39,7 @@ public class OperationSumTest { @Test public void testSumOfAFewInts() throws Throwable { Observable src = Observable.from(1, 2, 3, 4, 5); - sum(src).subscribe(w); + sumIntegers(src).subscribe(w); verify(w, times(1)).onNext(anyInt()); verify(w).onNext(15); @@ -50,7 +50,7 @@ public void testSumOfAFewInts() throws Throwable { @Test public void testEmptySum() throws Throwable { Observable src = Observable.empty(); - sum(src).subscribe(w); + sumIntegers(src).subscribe(w); verify(w, times(1)).onNext(anyInt()); verify(w).onNext(0);