Skip to content

Commit

Permalink
Fix Scan/Reduce/Collect Factory Ambiguity
Browse files Browse the repository at this point in the history
This puts the seed factory on `collect` and removes it from `scan` and `reduce` due to ambiguity.
See ReactiveX#1883 and ReactiveX#1881
  • Loading branch information
benjchristensen committed Nov 15, 2014
1 parent c5fd708 commit c63c76b
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 75 deletions.
78 changes: 10 additions & 68 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3459,7 +3459,7 @@ public final <R> Observable<R> cast(final Class<R> klass) {
* <dd>{@code collect} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param state
* @param stateFactory
* the mutable data structure that will collect the items
* @param collector
* a function that accepts the {@code state} and an emitted item, and modifies {@code state}
Expand All @@ -3468,7 +3468,7 @@ public final <R> Observable<R> cast(final Class<R> klass) {
* into a single mutable data structure
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#collect">RxJava wiki: collect</a>
*/
public final <R> Observable<R> collect(R state, final Action2<R, ? super T> collector) {
public final <R> Observable<R> collect(Func0<R> stateFactory, final Action2<R, ? super T> collector) {
Func2<R, T, R> accumulator = new Func2<R, T, R>() {

@Override
Expand All @@ -3478,7 +3478,14 @@ public final R call(R state, T value) {
}

};
return reduce(state, accumulator);

/*
* Discussion and confirmation of implementation at
* https://github.com/ReactiveX/RxJava/issues/423#issuecomment-27642532
*
* It should use last() not takeLast(1) since it needs to emit an error if the sequence is empty.
*/
return lift(new OperatorScan<R, T>(stateFactory, accumulator)).last();
}

/**
Expand Down Expand Up @@ -5293,40 +5300,6 @@ public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> acc
return scan(initialValue, accumulator).takeLast(1);
}

/**
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
* Observable and a specified seed value, then feeds the result of that function along with the second item
* emitted by an Observable into the same function, and so on until all items have been emitted by the
* source Observable, emitting the final result from the final call to your function as its sole item.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/reduceSeed.png" alt="">
* <p>
* This technique, which is called "reduce" here, is sometimec called "aggregate," "fold," "accumulate,"
* "compress," or "inject" in other programming contexts. Groovy, for instance, has an {@code inject} method
* that does a similar operation on lists.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure because by intent it will receive all values and reduce
* them to a single {@code onNext}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code reduce} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param initialValueFactory
* factory to produce the initial (seed) accumulator item each time the Observable is subscribed to
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source Observable, the
* result of which will be used in the next accumulator call
* @return an Observable that emits a single item that is the result of accumulating the output from the
* items emitted by the source Observable
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#reduce">RxJava wiki: reduce</a>
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public final <R> Observable<R> reduce(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
return scan(initialValueFactory, accumulator).takeLast(1);
}


/**
* Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely.
* <p>
Expand Down Expand Up @@ -6359,37 +6332,6 @@ public final Observable<T> scan(Func2<T, T, T> accumulator) {
public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
return lift(new OperatorScan<R, T>(initialValue, accumulator));
}

/**
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
* Observable and a seed value, then feeds the result of that function along with the second item emitted by
* the source Observable into the same function, and so on until all items have been emitted by the source
* Observable, emitting the result of each of these iterations.
* <p>
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/scanSeed.png" alt="">
* <p>
* This sort of function is sometimes called an accumulator.
* <p>
* Note that the Observable that results from this method will emit the item returned from
* {@code initialValueFactory} as its first emitted item.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code scan} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param initialValueFactory
* factory to produce the initial (seed) accumulator item each time the Observable is subscribed to
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source Observable, whose
* result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the
* next accumulator call
* @return an Observable that emits the item returned from {@code initialValueFactory} followed by the
* results of each call to the accumulator function
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#scan">RxJava wiki: scan</a>
*/
public final <R> Observable<R> scan(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
return lift(new OperatorScan<R, T>(initialValueFactory, accumulator));
}

/**
* Forces an Observable's emissions and notifications to be serialized and for it to obey the Rx contract
Expand Down
31 changes: 28 additions & 3 deletions src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observables.ConnectableObservable;
Expand Down Expand Up @@ -965,23 +966,47 @@ public void testRangeWithScheduler() {

@Test
public void testCollectToList() {
List<Integer> list = Observable.just(1, 2, 3).collect(new ArrayList<Integer>(), new Action2<List<Integer>, Integer>() {
Observable<List<Integer>> o = Observable.just(1, 2, 3).collect(new Func0<List<Integer>>() {

@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}

}, new Action2<List<Integer>, Integer>() {

@Override
public void call(List<Integer> list, Integer v) {
list.add(v);
}
}).toBlocking().last();
});

List<Integer> list = o.toBlocking().last();

assertEquals(3, list.size());
assertEquals(1, list.get(0).intValue());
assertEquals(2, list.get(1).intValue());
assertEquals(3, list.get(2).intValue());

// test multiple subscribe
List<Integer> list2 = o.toBlocking().last();

assertEquals(3, list2.size());
assertEquals(1, list2.get(0).intValue());
assertEquals(2, list2.get(1).intValue());
assertEquals(3, list2.get(2).intValue());
}

@Test
public void testCollectToString() {
String value = Observable.just(1, 2, 3).collect(new StringBuilder(), new Action2<StringBuilder, Integer>() {
String value = Observable.just(1, 2, 3).collect(new Func0<StringBuilder>() {

@Override
public StringBuilder call() {
return new StringBuilder();
}

}, new Action2<StringBuilder, Integer>() {

@Override
public void call(StringBuilder sb, Integer v) {
Expand Down
11 changes: 7 additions & 4 deletions src/test/java/rx/internal/operators/OperatorScanTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
Expand Down Expand Up @@ -269,22 +270,24 @@ public void onNext(Integer t) {
assertEquals(101, count.get());
}

/**
* This uses the public API collect which uses scan under the covers.
*/
@Test
public void testSeedFactory() {
Observable<List<Integer>> o = Observable.range(1, 10)
.scan(new Func0<List<Integer>>() {
.collect(new Func0<List<Integer>>() {

@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}

}, new Func2<List<Integer>, Integer, List<Integer>>() {
}, new Action2<List<Integer>, Integer>() {

@Override
public List<Integer> call(List<Integer> list, Integer t2) {
public void call(List<Integer> list, Integer t2) {
list.add(t2);
return list;
}

}).takeLast(1);
Expand Down

0 comments on commit c63c76b

Please sign in to comment.