diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index a5a265fa8b..cb5b82fd46 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -72,7 +72,7 @@ import rx.operators.OperationReplay; import rx.operators.OperationRetry; import rx.operators.OperationSample; -import rx.operators.OperationScan; +import rx.operators.OperatorScan; import rx.operators.OperationSequenceEqual; import rx.operators.OperationSingle; import rx.operators.OperationSkip; @@ -6140,7 +6140,7 @@ public final Observable sample(Observable sampler) { * @see MSDN: Observable.Scan */ public final Observable scan(Func2 accumulator) { - return lift(OperationScan.scan(accumulator)); + return lift(new OperatorScan(accumulator)); } /** @@ -6167,7 +6167,7 @@ public final Observable scan(Func2 accumulator) { * @see MSDN: Observable.Scan */ public final Observable scan(R initialValue, Func2 accumulator) { - return lift(OperationScan.scan(initialValue, accumulator)); + return lift(new OperatorScan(initialValue, accumulator)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationScan.java b/rxjava-core/src/main/java/rx/operators/OperationScan.java deleted file mode 100644 index 8bbaeb0e59..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationScan.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import rx.Observable.Operator; -import rx.Subscriber; -import rx.util.functions.Func2; - -/** - * Returns an Observable that applies a function to the first item emitted by a source Observable, - * then feeds the result of that function along with the second item emitted by an Observable into - * the same function, and so on until all items have been emitted by the source Observable, - * emitting the result of each of these iterations. - *

- * - *

- * This sort of function is sometimes called an accumulator. - *

- * Note that when you pass a seed to scan() the resulting Observable will emit that - * seed as its first emitted item. - */ -public final class OperationScan { - /** - * Applies an accumulator function over an observable sequence and returns each intermediate - * result with the specified source and accumulator. - * - * @param sequence - * An observable sequence of elements to project. - * @param initialValue - * The initial (seed) accumulator value. - * @param accumulator - * An accumulator function to be invoked on each element from the sequence. - * - * @return An observable sequence whose elements are the result of accumulating the output from - * the list of Observables. - * @see Observable.Scan(TSource, - * TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource, - * TAccumulate)) - */ - public static Operator scan(final R initialValue, final Func2 accumulator) { - return new Operator() { - @Override - public Subscriber call(final Subscriber observer) { - observer.onNext(initialValue); - return new Subscriber(observer) { - private R value = initialValue; - - @Override - public void onNext(T value) { - try { - this.value = accumulator.call(this.value, value); - } catch (Throwable e) { - observer.onError(e); - observer.unsubscribe(); - } - observer.onNext(this.value); - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - - @Override - public void onCompleted() { - observer.onCompleted(); - } - }; - } - }; - } - - /** - * Applies an accumulator function over an observable sequence and returns each intermediate - * result with the specified source and accumulator. - * - * @param sequence - * An observable sequence of elements to project. - * @param accumulator - * An accumulator function to be invoked on each element from the sequence. - * - * @return An observable sequence whose elements are the result of accumulating the output from - * the list of Observables. - * @see Observable.Scan(TSource) - * Method (IObservable(TSource), Func(TSource, TSource, TSource)) - */ - public static Operator scan(final Func2 accumulator) { - return new Operator() { - @Override - public Subscriber call(final Subscriber observer) { - return new Subscriber(observer) { - private boolean first = true; - private T value; - - @Override - public void onNext(T value) { - if (first) { - this.value = value; - first = false; - } - else { - try { - this.value = accumulator.call(this.value, value); - } catch (Throwable e) { - observer.onError(e); - observer.unsubscribe(); - } - } - observer.onNext(this.value); - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - - @Override - public void onCompleted() { - observer.onCompleted(); - } - }; - } - }; - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorScan.java b/rxjava-core/src/main/java/rx/operators/OperatorScan.java new file mode 100644 index 0000000000..c3b4a3d22d --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorScan.java @@ -0,0 +1,117 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Observable.Operator; +import rx.Subscriber; +import rx.util.functions.Func2; + +/** + * Returns an Observable that applies a function to the first item emitted by a source Observable, + * then feeds the result of that function along with the second item emitted by an Observable into + * the same function, and so on until all items have been emitted by the source Observable, + * emitting the result of each of these iterations. + *

+ * + *

+ * This sort of function is sometimes called an accumulator. + *

+ * Note that when you pass a seed to scan() the resulting Observable will emit that + * seed as its first emitted item. + */ +public final class OperatorScan implements Operator { + + private final R initialValue; + private final Func2 accumulator; + // sentinel if we don't receive an initial value + private static final Object NO_INITIAL_VALUE = new Object(); + + /** + * Applies an accumulator function over an observable sequence and returns each intermediate + * result with the specified source and accumulator. + * + * @param sequence + * An observable sequence of elements to project. + * @param initialValue + * The initial (seed) accumulator value. + * @param accumulator + * An accumulator function to be invoked on each element from the sequence. + * + * @return An observable sequence whose elements are the result of accumulating the output from + * the list of Observables. + * @see Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource, + * TAccumulate)) + */ + public OperatorScan(R initialValue, Func2 accumulator) { + this.initialValue = initialValue; + this.accumulator = accumulator; + } + + /** + * Applies an accumulator function over an observable sequence and returns each intermediate + * result with the specified source and accumulator. + * + * @param sequence + * An observable sequence of elements to project. + * @param accumulator + * An accumulator function to be invoked on each element from the sequence. + * + * @return An observable sequence whose elements are the result of accumulating the output from + * the list of Observables. + * @see Observable.Scan(TSource) Method (IObservable(TSource), Func(TSource, TSource, TSource)) + */ + @SuppressWarnings("unchecked") + public OperatorScan(final Func2 accumulator) { + this((R) NO_INITIAL_VALUE, accumulator); + } + + @Override + public Subscriber call(final Subscriber observer) { + if (initialValue != NO_INITIAL_VALUE) { + observer.onNext(initialValue); + } + return new Subscriber(observer) { + private R value = initialValue; + + @SuppressWarnings("unchecked") + @Override + public void onNext(T value) { + if (this.value == NO_INITIAL_VALUE) { + // if there is NO_INITIAL_VALUE then we know it is type T for both so cast T to R + this.value = (R) value; + } else { + try { + this.value = accumulator.call(this.value, value); + } catch (Throwable e) { + observer.onError(e); + observer.unsubscribe(); + } + } + observer.onNext(this.value); + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + }; + } +} diff --git a/rxjava-core/src/test/java/rx/ReduceTests.java b/rxjava-core/src/test/java/rx/ReduceTests.java index 8704f80d61..63cce95c2f 100644 --- a/rxjava-core/src/test/java/rx/ReduceTests.java +++ b/rxjava-core/src/test/java/rx/ReduceTests.java @@ -21,7 +21,7 @@ import rx.CovarianceTest.HorrorMovie; import rx.CovarianceTest.Movie; -import rx.operators.OperationScan; +import rx.operators.OperatorScan; import rx.util.functions.Func2; public class ReduceTests { @@ -52,7 +52,7 @@ public Movie call(Movie t1, Movie t2) { } }; - Observable reduceResult = horrorMovies.lift(OperationScan.scan(chooseSecondMovie)).takeLast(1); + Observable reduceResult = horrorMovies.scan(chooseSecondMovie).takeLast(1); Observable reduceResult2 = horrorMovies.reduce(chooseSecondMovie); } diff --git a/rxjava-core/src/test/java/rx/operators/OperationScanTest.java b/rxjava-core/src/test/java/rx/operators/OperatorScanTest.java similarity index 89% rename from rxjava-core/src/test/java/rx/operators/OperationScanTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorScanTest.java index 2e1613353b..0a653af315 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationScanTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorScanTest.java @@ -17,7 +17,7 @@ import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import static rx.operators.OperationScan.*; +import static rx.operators.OperatorScan.*; import org.junit.Before; import org.junit.Test; @@ -27,7 +27,7 @@ import rx.Observer; import rx.util.functions.Func2; -public class OperationScanTest { +public class OperatorScanTest { @Before public void before() { @@ -41,14 +41,14 @@ public void testScanIntegersWithInitialValue() { Observable observable = Observable.from(1, 2, 3); - Observable m = observable.lift(scan("", new Func2() { + Observable m = observable.scan("", new Func2() { @Override public String call(String s, Integer n) { return s + n.toString(); } - })); + }); m.subscribe(observer); verify(observer, never()).onError(any(Throwable.class)); @@ -68,14 +68,14 @@ public void testScanIntegersWithoutInitialValue() { Observable observable = Observable.from(1, 2, 3); - Observable m = observable.lift(scan(new Func2() { + Observable m = observable.scan(new Func2() { @Override public Integer call(Integer t1, Integer t2) { return t1 + t2; } - })); + }); m.subscribe(observer); verify(observer, never()).onError(any(Throwable.class)); @@ -95,14 +95,14 @@ public void testScanIntegersWithoutInitialValueAndOnlyOneValue() { Observable observable = Observable.from(1); - Observable m = observable.lift(scan(new Func2() { + Observable m = observable.scan(new Func2() { @Override public Integer call(Integer t1, Integer t2) { return t1 + t2; } - })); + }); m.subscribe(observer); verify(observer, never()).onError(any(Throwable.class));