Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scan/Reduce with Seed Factory #1835

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5429,6 +5429,41 @@ public final Observable<T> reduce(Func2<T, T, T> accumulator) {
public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
return scan(initialValue, accumulator).takeLast(1);
}

/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a source
* Observable and a specified seed value, then feeds the result of that function along with the second item
* emitted by an Observable into the same function, and so on until all items have been emitted by the
* source Observable, emitting the final result from the final call to your function as its sole item.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/reduceSeed.png" alt="">
* <p>
* This technique, which is called "reduce" here, is sometimec called "aggregate," "fold," "accumulate,"
* "compress," or "inject" in other programming contexts. Groovy, for instance, has an {@code inject} method
* that does a similar operation on lists.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure because by intent it will receive all values and reduce
* them to a single {@code onNext}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code reduce} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param initialValueFactory
* factory to produce the initial (seed) accumulator item each time the Observable is subscribed to
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source Observable, the
* result of which will be used in the next accumulator call
* @return an Observable that emits a single item that is the result of accumulating the output from the
* items emitted by the source Observable
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#reduce">RxJava wiki: reduce</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229154.aspx">MSDN: Observable.Aggregate</a>
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public final <R> Observable<R> reduce(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
return scan(initialValueFactory, accumulator).takeLast(1);
}


/**
* Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely.
Expand Down Expand Up @@ -6491,6 +6526,38 @@ public final Observable<T> scan(Func2<T, T, T> accumulator) {
public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
return lift(new OperatorScan<R, T>(initialValue, accumulator));
}

/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a source
* Observable and a seed value, then feeds the result of that function along with the second item emitted by
* the source Observable into the same function, and so on until all items have been emitted by the source
* Observable, emitting the result of each of these iterations.
* <p>
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/scanSeed.png" alt="">
* <p>
* This sort of function is sometimes called an accumulator.
* <p>
* Note that the Observable that results from this method will emit {@code initialValue} as its first
* emitted item.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code scan} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param initialValueFactory
* factory to produce the initial (seed) accumulator item each time the Observable is subscribed to
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source Observable, whose
* result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the
* next accumulator call
* @return an Observable that emits {@code initialValue} followed by the results of each call to the
* accumulator function
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#scan">RxJava wiki: scan</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665.aspx">MSDN: Observable.Scan</a>
*/
public final <R> Observable<R> scan(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
return lift(new OperatorScan<R, T>(initialValueFactory, accumulator));
}

/**
* Forces an Observable's emissions and notifications to be serialized and for it to obey the Rx contract
Expand Down
20 changes: 17 additions & 3 deletions src/main/java/rx/internal/operators/OperatorScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func0;
import rx.functions.Func2;
import rx.internal.util.UtilityFunctions;

/**
* Returns an Observable that applies a function to the first item emitted by a source Observable, then feeds
Expand All @@ -38,7 +40,7 @@
*/
public final class OperatorScan<R, T> implements Operator<R, T> {

private final R initialValue;
private final Func0<R> initialValueFactory;
private final Func2<R, ? super T, R> accumulator;
// sentinel if we don't receive an initial value
private static final Object NO_INITIAL_VALUE = new Object();
Expand All @@ -54,8 +56,19 @@ public final class OperatorScan<R, T> implements Operator<R, T> {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212007.aspx">Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource,
* TAccumulate))</a>
*/
public OperatorScan(R initialValue, Func2<R, ? super T, R> accumulator) {
this.initialValue = initialValue;
public OperatorScan(final R initialValue, Func2<R, ? super T, R> accumulator) {
this(new Func0<R>() {

@Override
public R call() {
return initialValue;
}

}, accumulator);
}

public OperatorScan(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
this.initialValueFactory = initialValueFactory;
this.accumulator = accumulator;
}

Expand All @@ -75,6 +88,7 @@ public OperatorScan(final Func2<R, ? super T, R> accumulator) {
@Override
public Subscriber<? super T> call(final Subscriber<? super R> child) {
return new Subscriber<T>(child) {
private final R initialValue = initialValueFactory.call();
private R value = initialValue;
boolean initialized = false;

Expand Down
31 changes: 30 additions & 1 deletion src/test/java/rx/internal/operators/OperatorScanTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
package rx.internal.operators;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
Expand All @@ -24,6 +25,9 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
Expand All @@ -33,6 +37,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observers.TestSubscriber;
Expand Down Expand Up @@ -263,4 +268,28 @@ public void onNext(Integer t) {
// we only expect to receive 101 as we'll receive all 100 + the initial value
assertEquals(101, count.get());
}

@Test
public void testSeedFactory() {
Observable<List<Integer>> o = Observable.range(1, 10)
.scan(new Func0<List<Integer>>() {

@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}

}, new Func2<List<Integer>, Integer, List<Integer>>() {

@Override
public List<Integer> call(List<Integer> list, Integer t2) {
list.add(t2);
return list;
}

}).takeLast(1);

assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single());
assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single());
}
}