Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Scan/Reduce/Collect Lambda Ambiguity #1883

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 20 additions & 71 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import rx.internal.operators.*;
import rx.internal.util.ScalarSynchronousObservable;
import rx.internal.util.UtilityFunctions;

import rx.observables.*;
import rx.observers.SafeSubscriber;
import rx.plugins.*;
Expand Down Expand Up @@ -3459,16 +3458,16 @@ public final <R> Observable<R> cast(final Class<R> klass) {
* <dd>{@code collect} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param state
* the mutable data structure that will collect the items
* @param stateFactory
* factory for the mutable data structure that will collect the items
* @param collector
* a function that accepts the {@code state} and an emitted item, and modifies {@code state}
* accordingly
* @return an Observable that emits the result of collecting the values emitted by the source Observable
* into a single mutable data structure
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#collect">RxJava wiki: collect</a>
*/
public final <R> Observable<R> collect(R state, final Action2<R, ? super T> collector) {
public final <R> Observable<R> collect(Func0<R> stateFactory, final Action2<R, ? super T> collector) {
Func2<R, T, R> accumulator = new Func2<R, T, R>() {

@Override
Expand All @@ -3478,7 +3477,7 @@ public final R call(R state, T value) {
}

};
return reduce(state, accumulator);
return reduce(stateFactory, accumulator);
}

/**
Expand Down Expand Up @@ -3565,7 +3564,14 @@ public final Boolean call(T t1) {
* @see #countLong()
*/
public final Observable<Integer> count() {
return reduce(0, new Func2<Integer, T, Integer>() {
return reduce(new Func0<Integer>() {

@Override
public Integer call() {
return 0;
}

}, new Func2<Integer, T, Integer>() {
@Override
public final Integer call(Integer t1, T t2) {
return t1 + 1;
Expand All @@ -3592,7 +3598,14 @@ public final Integer call(Integer t1, T t2) {
* @see #count()
*/
public final Observable<Long> countLong() {
return reduce(0L, new Func2<Long, T, Long>() {
return reduce(new Func0<Long>() {

@Override
public Long call() {
return 0L;
}

}, new Func2<Long, T, Long>() {
@Override
public final Long call(Long t1, T t2) {
return t1 + 1;
Expand Down Expand Up @@ -5260,39 +5273,6 @@ public final Observable<T> reduce(Func2<T, T, T> accumulator) {
return scan(accumulator).last();
}

/**
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
* Observable and a specified seed value, then feeds the result of that function along with the second item
* emitted by an Observable into the same function, and so on until all items have been emitted by the
* source Observable, emitting the final result from the final call to your function as its sole item.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/reduceSeed.png" alt="">
* <p>
* This technique, which is called "reduce" here, is sometimec called "aggregate," "fold," "accumulate,"
* "compress," or "inject" in other programming contexts. Groovy, for instance, has an {@code inject} method
* that does a similar operation on lists.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure because by intent it will receive all values and reduce
* them to a single {@code onNext}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code reduce} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param initialValue
* the initial (seed) accumulator value
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source Observable, the
* result of which will be used in the next accumulator call
* @return an Observable that emits a single item that is the result of accumulating the output from the
* items emitted by the source Observable
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#reduce">RxJava wiki: reduce</a>
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
return scan(initialValue, accumulator).takeLast(1);
}

/**
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
* Observable and a specified seed value, then feeds the result of that function along with the second item
Expand Down Expand Up @@ -6329,37 +6309,6 @@ public final Observable<T> scan(Func2<T, T, T> accumulator) {
return lift(new OperatorScan<T, T>(accumulator));
}

/**
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
* Observable and a seed value, then feeds the result of that function along with the second item emitted by
* the source Observable into the same function, and so on until all items have been emitted by the source
* Observable, emitting the result of each of these iterations.
* <p>
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/scanSeed.png" alt="">
* <p>
* This sort of function is sometimes called an accumulator.
* <p>
* Note that the Observable that results from this method will emit {@code initialValue} as its first
* emitted item.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code scan} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param initialValue
* the initial (seed) accumulator item
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source Observable, whose
* result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the
* next accumulator call
* @return an Observable that emits {@code initialValue} followed by the results of each call to the
* accumulator function
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#scan">RxJava wiki: scan</a>
*/
public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
return lift(new OperatorScan<R, T>(initialValue, accumulator));
}

/**
* Returns an Observable that applies a specified accumulator function to the first item emitted by a source
* Observable and a seed value, then feeds the result of that function along with the second item emitted by
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/rx/internal/operators/OnSubscribeRedo.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -105,7 +106,14 @@ public RetryWithPredicate(Func2<Integer, Throwable, Boolean> predicate) {

@Override
public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> ts) {
return ts.scan(Notification.createOnNext(0), new Func2<Notification<Integer>, Notification<?>, Notification<Integer>>() {
return ts.scan(new Func0<Notification<Integer>>() {

@Override
public Notification<Integer> call() {
return Notification.createOnNext(0);
}

}, new Func2<Notification<Integer>, Notification<?>, Notification<Integer>>() {
@SuppressWarnings("unchecked")
@Override
public Notification<Integer> call(Notification<Integer> n, Notification<?> term) {
Expand Down
23 changes: 19 additions & 4 deletions src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observables.ConnectableObservable;
Expand Down Expand Up @@ -277,7 +278,7 @@ public void call(Integer t1) {
@Test
public void testReduceWithEmptyObservableAndSeed() {
Observable<Integer> observable = Observable.range(1, 0);
int value = observable.reduce(1, new Func2<Integer, Integer, Integer>() {
int value = observable.startWith(1).reduce(new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
Expand All @@ -292,7 +293,7 @@ public Integer call(Integer t1, Integer t2) {
@Test
public void testReduceWithInitialValue() {
Observable<Integer> observable = Observable.just(1, 2, 3, 4);
observable.reduce(50, new Func2<Integer, Integer, Integer>() {
observable.startWith(50).reduce(new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
Expand Down Expand Up @@ -965,7 +966,14 @@ public void testRangeWithScheduler() {

@Test
public void testCollectToList() {
List<Integer> list = Observable.just(1, 2, 3).collect(new ArrayList<Integer>(), new Action2<List<Integer>, Integer>() {
List<Integer> list = Observable.just(1, 2, 3).collect(new Func0<List<Integer>>() {

@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}

}, new Action2<List<Integer>, Integer>() {

@Override
public void call(List<Integer> list, Integer v) {
Expand All @@ -981,7 +989,14 @@ public void call(List<Integer> list, Integer v) {

@Test
public void testCollectToString() {
String value = Observable.just(1, 2, 3).collect(new StringBuilder(), new Action2<StringBuilder, Integer>() {
String value = Observable.just(1, 2, 3).collect(new Func0<StringBuilder>() {

@Override
public StringBuilder call() {
return new StringBuilder();
}

}, new Action2<StringBuilder, Integer>() {

@Override
public void call(StringBuilder sb, Integer v) {
Expand Down
10 changes: 9 additions & 1 deletion src/test/java/rx/ScanTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import rx.EventStream.Event;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func2;

public class ScanTests {
Expand All @@ -30,7 +31,14 @@ public class ScanTests {
public void testUnsubscribeScan() {

EventStream.getEventStream("HTTP-ClusterB", 20)
.scan(new HashMap<String, String>(), new Func2<Map<String, String>, Event, Map<String, String>>() {
.scan(new Func0<Map<String, String>>() {

@Override
public Map<String, String> call() {
return new HashMap<String, String>();
}

}, new Func2<Map<String, String>, Event, Map<String, String>>() {

@Override
public Map<String, String> call(Map<String, String> accum, Event perInstanceEvent) {
Expand Down
10 changes: 9 additions & 1 deletion src/test/java/rx/ZipTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import rx.CovarianceTest.Result;
import rx.EventStream.Event;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.FuncN;
Expand All @@ -57,7 +58,14 @@ public String call(Event e) {

@Override
public Observable<Map<String, String>> call(final GroupedObservable<String, Event> ge) {
return ge.scan(new HashMap<String, String>(), new Func2<Map<String, String>, Event, Map<String, String>>() {
return ge.scan(new Func0<Map<String, String>>() {

@Override
public Map<String, String> call() {
return new HashMap<String, String>();
}

}, new Func2<Map<String, String>, Event, Map<String, String>>() {

@Override
public Map<String, String> call(Map<String, String> accum, Event perInstanceEvent) {
Expand Down
18 changes: 13 additions & 5 deletions src/test/java/rx/internal/operators/OperatorReduceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import rx.Observable;
import rx.Observer;
import rx.exceptions.TestException;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.internal.util.UtilityFunctions;
Expand All @@ -53,7 +54,7 @@ public Integer call(Integer t1, Integer t2) {
@Test
public void testAggregateAsIntSum() {

Observable<Integer> result = Observable.just(1, 2, 3, 4, 5).reduce(0, sum).map(UtilityFunctions.<Integer> identity());
Observable<Integer> result = Observable.just(1, 2, 3, 4, 5).reduce(sum).map(UtilityFunctions.<Integer> identity());

result.subscribe(observer);

Expand All @@ -66,7 +67,7 @@ public void testAggregateAsIntSum() {
public void testAggregateAsIntSumSourceThrows() {
Observable<Integer> result = Observable.concat(Observable.just(1, 2, 3, 4, 5),
Observable.<Integer> error(new TestException()))
.reduce(0, sum).map(UtilityFunctions.<Integer> identity());
.reduce(sum).map(UtilityFunctions.<Integer> identity());

result.subscribe(observer);

Expand All @@ -85,7 +86,7 @@ public Integer call(Integer t1, Integer t2) {
};

Observable<Integer> result = Observable.just(1, 2, 3, 4, 5)
.reduce(0, sumErr).map(UtilityFunctions.<Integer> identity());
.reduce(sumErr).map(UtilityFunctions.<Integer> identity());

result.subscribe(observer);

Expand All @@ -106,7 +107,7 @@ public Integer call(Integer t1) {
};

Observable<Integer> result = Observable.just(1, 2, 3, 4, 5)
.reduce(0, sum).map(error);
.reduce(sum).map(error);

result.subscribe(observer);

Expand All @@ -127,7 +128,14 @@ public void testBackpressureWithNoInitialValue() throws InterruptedException {
@Test
public void testBackpressureWithInitialValue() throws InterruptedException {
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6);
Observable<Integer> reduced = source.reduce(0, sum);
Observable<Integer> reduced = source.reduce(new Func0<Integer>() {

@Override
public Integer call() {
return 0;
}

}, sum);

Integer r = reduced.toBlocking().first();
assertEquals(21, r.intValue());
Expand Down
29 changes: 25 additions & 4 deletions src/test/java/rx/internal/operators/OperatorScanTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,14 @@ public void testScanIntegersWithInitialValue() {

Observable<Integer> observable = Observable.just(1, 2, 3);

Observable<String> m = observable.scan("", new Func2<String, Integer, String>() {
Observable<String> m = observable.scan(new Func0<String>() {

@Override
public String call() {
return "";
}

}, new Func2<String, Integer, String>() {

@Override
public String call(String s, Integer n) {
Expand Down Expand Up @@ -131,7 +138,7 @@ public Integer call(Integer t1, Integer t2) {
@Test
public void shouldNotEmitUntilAfterSubscription() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.range(1, 100).scan(0, new Func2<Integer, Integer, Integer>() {
Observable.range(1, 100).scan(new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
Expand All @@ -155,7 +162,14 @@ public Boolean call(Integer t1) {
public void testBackpressureWithInitialValue() {
final AtomicInteger count = new AtomicInteger();
Observable.range(1, 100)
.scan(0, new Func2<Integer, Integer, Integer>() {
.scan(new Func0<Integer>() {

@Override
public Integer call() {
return 0;
}

}, new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
Expand Down Expand Up @@ -237,7 +251,14 @@ public void onNext(Integer t) {
public void testNoBackpressureWithInitialValue() {
final AtomicInteger count = new AtomicInteger();
Observable.range(1, 100)
.scan(0, new Func2<Integer, Integer, Integer>() {
.scan(new Func0<Integer>() {

@Override
public Integer call() {
return 0;
}

}, new Func2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer t1, Integer t2) {
Expand Down