Skip to content

Commit

Permalink
Merge pull request #3241 from akarnokd/PublisherCombineLatest2x
Browse files Browse the repository at this point in the history
Operator combineLatest
  • Loading branch information
akarnokd committed Aug 29, 2015
2 parents bc5012e + ff06115 commit ff0ae2d
Show file tree
Hide file tree
Showing 2 changed files with 533 additions and 32 deletions.
159 changes: 127 additions & 32 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,7 @@ public final <R> Observable<R> flatMap(Function<? super T, ? extends Publisher<?
if (maxConcurrency <= 0) {
throw new IllegalArgumentException("maxConcurrency > 0 required but it was " + maxConcurrency);
}
if (bufferSize <= 0) {
throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize);
}
validateBufferSize(bufferSize);
if (onSubscribe instanceof PublisherScalarSource) {
PublisherScalarSource<T> scalar = (PublisherScalarSource<T>) onSubscribe;
return create(scalar.flatMap(mapper));
Expand Down Expand Up @@ -443,9 +441,7 @@ public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(scheduler);
if (bufferSize <= 0) {
throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize);
}
validateBufferSize(bufferSize);
return lift(new OperatorObserveOn<>(scheduler, delayError, bufferSize));
}

Expand Down Expand Up @@ -476,9 +472,7 @@ public final ConnectableObservable<T> publish() {
}

public final ConnectableObservable<T> publish(int bufferSize) {
if (bufferSize <= 0) {
throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize);
}
validateBufferSize(bufferSize);
return OperatorPublish.create(this, bufferSize);
}

Expand All @@ -487,9 +481,7 @@ public final <R> Observable<R> publish(Function<? super Observable<T>, ? extends
}

public final <R> Observable<R> publish(Function<? super Observable<T>, ? extends Observable<R>> selector, int bufferSize) {
if (bufferSize <= 0) {
throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize);
}
validateBufferSize(bufferSize);
Objects.requireNonNull(selector);
return OperatorPublish.create(this, selector, bufferSize);
}
Expand Down Expand Up @@ -940,15 +932,14 @@ public final <K, V> Observable<GroupedObservable<V, K>> groupBy(Function<? super
boolean delayError, int bufferSize) {
Objects.requireNonNull(keySelector);
Objects.requireNonNull(valueSelector);
if (bufferSize <= 0) {
throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize);
}
validateBufferSize(bufferSize);

return lift(new OperatorGroupBy<>(keySelector, valueSelector, bufferSize, delayError));
}

@SuppressWarnings("unchecked")
private static <T1, T2, R> Function<Object[], R> toFunction(BiFunction<? super T1, ? super T2, ? extends R> biFunction) {
Objects.requireNonNull(biFunction);
return a -> {
if (a.length != 2) {
throw new IllegalArgumentException("Array of size 2 expected but got " + a.length);
Expand Down Expand Up @@ -1036,9 +1027,7 @@ public static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends
return empty();
}
Objects.requireNonNull(zipper);
if (bufferSize <= 0) {
throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize);
}
validateBufferSize(bufferSize);
return create(new PublisherZip<>(sources, null, zipper, bufferSize, delayError));
}

Expand All @@ -1047,9 +1036,7 @@ public static <T, R> Observable<R> zipIterable(Function<? super Object[], ? exte
Iterable<? extends Publisher<? extends T>> sources) {
Objects.requireNonNull(zipper);
Objects.requireNonNull(sources);
if (bufferSize <= 0) {
throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize);
}
validateBufferSize(bufferSize);
return create(new PublisherZip<>(null, sources, zipper, bufferSize, delayError));
}

Expand Down Expand Up @@ -1462,10 +1449,10 @@ public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler schedule
public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(unit);
Objects.requireNonNull(scheduler);
if (bufferSize <= 0) {
throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize);
}
return lift(new OperatorSkipLastTimed<>(time, unit, scheduler, bufferSize, delayError));
validateBufferSize(bufferSize);
// the internal buffer holds pairs of (timestamp, value) so double the default buffer size
int s = bufferSize << 1;
return lift(new OperatorSkipLastTimed<>(time, unit, scheduler, s, delayError));
}

public final Observable<T> takeLast(long time, TimeUnit unit) {
Expand Down Expand Up @@ -1499,9 +1486,7 @@ public final Observable<T> takeLast(long time, TimeUnit unit, Scheduler schedule
public final Observable<T> takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(unit);
Objects.requireNonNull(scheduler);
if (bufferSize <= 0) {
throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize);
}
validateBufferSize(bufferSize);
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}
Expand Down Expand Up @@ -1550,9 +1535,7 @@ public final <R> Observable<R> switchMap(Function<? super T, ? extends Publisher

public final <R> Observable<R> switchMap(Function<? super T, ? extends Publisher<? extends R>> mapper, int bufferSize) {
Objects.requireNonNull(mapper);
if (bufferSize <= 0) {
throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize);
}
validateBufferSize(bufferSize);
return lift(new OperatorSwitchMap<>(mapper, bufferSize));
}

Expand Down Expand Up @@ -1653,9 +1636,121 @@ public static <T> Observable<Boolean> sequenceEqual(Publisher<? extends T> p1, P
Objects.requireNonNull(p1);
Objects.requireNonNull(p2);
Objects.requireNonNull(isEqual);
validateBufferSize(bufferSize);
return create(new PublisherSequenceEqual<>(p1, p2, isEqual, bufferSize));
}

public static <T, R> Observable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super Object[], ? extends R> combiner) {
return combineLatest(sources, combiner, false, bufferSize());
}

public static <T, R> Observable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super Object[], ? extends R> combiner, boolean delayError) {
return combineLatest(sources, combiner, delayError, bufferSize());
}

@SafeVarargs
public static <T, R> Observable<R> combineLatest(Function<? super Object[], ? extends R> combiner, boolean delayError, int bufferSize, Publisher<? extends T>... sources) {
return combineLatest(sources, combiner, delayError, bufferSize);
}

public static <T, R> Observable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super Object[], ? extends R> combiner, boolean delayError, int bufferSize) {
validateBufferSize(bufferSize);
Objects.requireNonNull(combiner);
if (sources.length == 0) {
return empty();
}
// the queue holds a pair of values so we need to double the capacity
int s = bufferSize << 1;
return create(new PublisherCombineLatest<>(sources, null, combiner, s, delayError));
}

public static <T, R> Observable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<? super Object[], ? extends R> combiner) {
return combineLatest(sources, combiner, false, bufferSize());
}

public static <T, R> Observable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<? super Object[], ? extends R> combiner, boolean delayError) {
return combineLatest(sources, combiner, delayError, bufferSize());
}

public static <T, R> Observable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<? super Object[], ? extends R> combiner, boolean delayError, int bufferSize) {
Objects.requireNonNull(sources);
Objects.requireNonNull(combiner);
validateBufferSize(bufferSize);

// the queue holds a pair of values so we need to double the capacity
int s = bufferSize << 1;
return create(new PublisherCombineLatest<>(null, sources, combiner, s, delayError));
}

private static void validateBufferSize(int bufferSize) {
if (bufferSize <= 0) {
throw new IllegalArgumentException("bufferSize > 0 required but it was " + bufferSize);
}
return create(new PublisherSequenceEqual<>(p1, p2, isEqual, bufferSize));
}

public static <T1, T2, R> Observable<R> combineLatest(
Publisher<? extends T1> p1, Publisher<? extends T2> p2,
BiFunction<? super T1, ? super T2, ? extends R> combiner) {
Function<Object[], R> f = toFunction(combiner);
return combineLatest(f, false, bufferSize(), p1, p2);
}

public static <T1, T2, T3, R> Observable<R> combineLatest(
Publisher<? extends T1> p1, Publisher<? extends T2> p2,
Publisher<? extends T3> p3,
Function3<? super T1, ? super T2, ? super T3, ? extends R> combiner) {
return combineLatest(combiner, false, bufferSize(), p1, p2, p3);
}

public static <T1, T2, T3, T4, R> Observable<R> combineLatest(
Publisher<? extends T1> p1, Publisher<? extends T2> p2,
Publisher<? extends T3> p3, Publisher<? extends T4> p4,
Function4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> combiner) {
return combineLatest(combiner, false, bufferSize(), p1, p2, p3, p4);
}

public static <T1, T2, T3, T4, T5, R> Observable<R> combineLatest(
Publisher<? extends T1> p1, Publisher<? extends T2> p2,
Publisher<? extends T3> p3, Publisher<? extends T4> p4,
Publisher<? extends T5> p5,
Function5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> combiner) {
return combineLatest(combiner, false, bufferSize(), p1, p2, p3, p4, p5);
}

public static <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(
Publisher<? extends T1> p1, Publisher<? extends T2> p2,
Publisher<? extends T3> p3, Publisher<? extends T4> p4,
Publisher<? extends T5> p5, Publisher<? extends T6> p6,
Function6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> combiner) {
return combineLatest(combiner, false, bufferSize(), p1, p2, p3, p4, p5, p6);
}

public static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> combineLatest(
Publisher<? extends T1> p1, Publisher<? extends T2> p2,
Publisher<? extends T3> p3, Publisher<? extends T4> p4,
Publisher<? extends T5> p5, Publisher<? extends T6> p6,
Publisher<? extends T7> p7,
Function7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> combiner) {
return combineLatest(combiner, false, bufferSize(), p1, p2, p3, p4, p5, p6, p7);
}

public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> combineLatest(
Publisher<? extends T1> p1, Publisher<? extends T2> p2,
Publisher<? extends T3> p3, Publisher<? extends T4> p4,
Publisher<? extends T5> p5, Publisher<? extends T6> p6,
Publisher<? extends T7> p7, Publisher<? extends T8> p8,
Function8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> combiner) {
return combineLatest(combiner, false, bufferSize(), p1, p2, p3, p4, p5, p6, p7, p8);
}

public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combineLatest(
Publisher<? extends T1> p1, Publisher<? extends T2> p2,
Publisher<? extends T3> p3, Publisher<? extends T4> p4,
Publisher<? extends T5> p5, Publisher<? extends T6> p6,
Publisher<? extends T7> p7, Publisher<? extends T8> p8,
Publisher<? extends T9> p9,
Function9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> combiner) {
return combineLatest(combiner, false, bufferSize(), p1, p2, p3, p4, p5, p6, p7, p8, p9);
}

}
Loading

0 comments on commit ff0ae2d

Please sign in to comment.