diff --git a/build.gradle b/build.gradle index 6f62543b8d..99bf16483a 100644 --- a/build.gradle +++ b/build.gradle @@ -29,8 +29,8 @@ dependencies { testCompile 'junit:junit:4.12' testCompile 'org.mockito:mockito-core:2.1.0' - perfCompile 'org.openjdk.jmh:jmh-core:1.13' - perfCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.13' + perfCompile 'org.openjdk.jmh:jmh-core:1.16' + perfCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.16' testCompile 'org.reactivestreams:reactive-streams-tck:1.0.0' testCompile group: 'org.testng', name: 'testng', version: '6.9.10' diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 4c4cc601e3..d42e542378 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -10516,7 +10516,9 @@ public final Maybe reduce(BiFunction reducer) { @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) public final Single reduce(R seed, BiFunction reducer) { - return RxJavaPlugins.onAssembly(new FlowableSingleSingle(scan(seed, reducer).takeLast(1), null)); // TODO dedicated operator + ObjectHelper.requireNonNull(seed, "seed is null"); + ObjectHelper.requireNonNull(reducer, "reducer is null"); + return RxJavaPlugins.onAssembly(new FlowableReduceSeedSingle(this, seed, reducer)); } /** @@ -10567,7 +10569,9 @@ public final Single reduce(R seed, BiFunction reducer) { @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) public final Single reduceWith(Callable seedSupplier, BiFunction reducer) { - return RxJavaPlugins.onAssembly(new FlowableSingleSingle(scanWith(seedSupplier, reducer).takeLast(1), null)); // TODO dedicated operator + ObjectHelper.requireNonNull(seedSupplier, "seedSupplier is null"); + ObjectHelper.requireNonNull(reducer, "reducer is null"); + return RxJavaPlugins.onAssembly(new FlowableReduceWithSingle(this, seedSupplier, reducer)); } /** diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 08b9e40e61..04d5699576 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -8685,7 +8685,8 @@ public final Observable publish(Function, ? extends @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Maybe reduce(BiFunction reducer) { - return scan(reducer).takeLast(1).singleElement(); + ObjectHelper.requireNonNull(reducer, "reducer is null"); + return RxJavaPlugins.onAssembly(new ObservableReduceMaybe(this, reducer)); } /** @@ -8732,7 +8733,9 @@ public final Maybe reduce(BiFunction reducer) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Single reduce(R seed, BiFunction reducer) { - return RxJavaPlugins.onAssembly(new ObservableSingleSingle(scan(seed, reducer).takeLast(1), null)); + ObjectHelper.requireNonNull(seed, "seed is null"); + ObjectHelper.requireNonNull(reducer, "reducer is null"); + return RxJavaPlugins.onAssembly(new ObservableReduceSeedSingle(this, seed, reducer)); } /** @@ -8779,7 +8782,9 @@ public final Single reduce(R seed, BiFunction reducer) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Single reduceWith(Callable seedSupplier, BiFunction reducer) { - return RxJavaPlugins.onAssembly(new ObservableSingleSingle(scanWith(seedSupplier, reducer).takeLast(1), null)); + ObjectHelper.requireNonNull(seedSupplier, "seedSupplier is null"); + ObjectHelper.requireNonNull(reducer, "reducer is null"); + return RxJavaPlugins.onAssembly(new ObservableReduceWithSingle(this, seedSupplier, reducer)); } /** diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReduceSeedSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReduceSeedSingle.java new file mode 100644 index 0000000000..516752c7df --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReduceSeedSingle.java @@ -0,0 +1,126 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.BiFunction; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Reduce a sequence of values, starting from a seed value and by using + * an accumulator function and return the last accumulated value. + * + * @param the source value type + * @param the accumulated result type + */ +public final class FlowableReduceSeedSingle extends Single { + + final Publisher source; + + final R seed; + + final BiFunction reducer; + + public FlowableReduceSeedSingle(Publisher source, R seed, BiFunction reducer) { + this.source = source; + this.seed = seed; + this.reducer = reducer; + } + + @Override + protected void subscribeActual(SingleObserver observer) { + source.subscribe(new ReduceSeedObserver(observer, reducer, seed)); + } + + static final class ReduceSeedObserver implements Subscriber, Disposable { + + final SingleObserver actual; + + final BiFunction reducer; + + R value; + + Subscription s; + + public ReduceSeedObserver(SingleObserver actual, BiFunction reducer, R value) { + this.actual = actual; + this.value = value; + this.reducer = reducer; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(T value) { + R v = this.value; + if (v != null) { + try { + this.value = ObjectHelper.requireNonNull(reducer.apply(v, value), "The reducer returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.cancel(); + onError(ex); + } + } + } + + @Override + public void onError(Throwable e) { + R v = value; + value = null; + if (v != null) { + s = SubscriptionHelper.CANCELLED; + actual.onError(e); + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + R v = value; + value = null; + if (v != null) { + s = SubscriptionHelper.CANCELLED; + actual.onSuccess(v); + } + } + + @Override + public void dispose() { + s.cancel(); + s = SubscriptionHelper.CANCELLED; + } + + @Override + public boolean isDisposed() { + return s == SubscriptionHelper.CANCELLED; + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReduceWithSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReduceWithSingle.java new file mode 100644 index 0000000000..e23da94119 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReduceWithSingle.java @@ -0,0 +1,61 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import java.util.concurrent.Callable; + +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.BiFunction; +import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.operators.flowable.FlowableReduceSeedSingle.ReduceSeedObserver; + +/** + * Reduce a sequence of values, starting from a generated seed value and by using + * an accumulator function and return the last accumulated value. + * + * @param the source value type + * @param the accumulated result type + */ +public final class FlowableReduceWithSingle extends Single { + + final Publisher source; + + final Callable seedSupplier; + + final BiFunction reducer; + + public FlowableReduceWithSingle(Publisher source, Callable seedSupplier, BiFunction reducer) { + this.source = source; + this.seedSupplier = seedSupplier; + this.reducer = reducer; + } + + @Override + protected void subscribeActual(SingleObserver observer) { + R seed; + + try { + seed = ObjectHelper.requireNonNull(seedSupplier.call(), "The seedSupplier returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + EmptyDisposable.error(ex, observer); + return; + } + source.subscribe(new ReduceSeedObserver(observer, reducer, seed)); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReduceMaybe.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReduceMaybe.java new file mode 100644 index 0000000000..c9d016a8c1 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReduceMaybe.java @@ -0,0 +1,127 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.BiFunction; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Reduce a sequence of values into a single value via an aggregator function and emit the final value or complete + * if the source is empty. + * + * @param the source and result value type + */ +public final class ObservableReduceMaybe extends Maybe { + + final ObservableSource source; + + final BiFunction reducer; + + public ObservableReduceMaybe(ObservableSource source, BiFunction reducer) { + this.source = source; + this.reducer = reducer; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + source.subscribe(new ReduceObserver(observer, reducer)); + } + + static final class ReduceObserver implements Observer, Disposable { + + final MaybeObserver actual; + + final BiFunction reducer; + + boolean done; + + T value; + + Disposable d; + + public ReduceObserver(MaybeObserver observer, BiFunction reducer) { + this.actual = observer; + this.reducer = reducer; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T value) { + if (!done) { + T v = this.value; + + if (v == null) { + this.value = value; + } else { + try { + this.value = ObjectHelper.requireNonNull(reducer.apply(v, value), "The reducer returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + d.dispose(); + onError(ex); + } + } + } + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPlugins.onError(e); + return; + } + done = true; + value = null; + actual.onError(e); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + T v = value; + value = null; + if (v != null) { + actual.onSuccess(v); + } else { + actual.onComplete(); + } + } + + @Override + public void dispose() { + d.dispose(); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReduceSeedSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReduceSeedSingle.java new file mode 100644 index 0000000000..a36fe4c672 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReduceSeedSingle.java @@ -0,0 +1,119 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.BiFunction; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Reduce a sequence of values, starting from a seed value and by using + * an accumulator function and return the last accumulated value. + * + * @param the source value type + * @param the accumulated result type + */ +public final class ObservableReduceSeedSingle extends Single { + + final ObservableSource source; + + final R seed; + + final BiFunction reducer; + + public ObservableReduceSeedSingle(ObservableSource source, R seed, BiFunction reducer) { + this.source = source; + this.seed = seed; + this.reducer = reducer; + } + + @Override + protected void subscribeActual(SingleObserver observer) { + source.subscribe(new ReduceSeedObserver(observer, reducer, seed)); + } + + static final class ReduceSeedObserver implements Observer, Disposable { + + final SingleObserver actual; + + final BiFunction reducer; + + R value; + + Disposable d; + + public ReduceSeedObserver(SingleObserver actual, BiFunction reducer, R value) { + this.actual = actual; + this.value = value; + this.reducer = reducer; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T value) { + R v = this.value; + if (v != null) { + try { + this.value = ObjectHelper.requireNonNull(reducer.apply(v, value), "The reducer returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + d.dispose(); + onError(ex); + } + } + } + + @Override + public void onError(Throwable e) { + R v = value; + value = null; + if (v != null) { + actual.onError(e); + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + R v = value; + value = null; + if (v != null) { + actual.onSuccess(v); + } + } + + @Override + public void dispose() { + d.dispose(); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReduceWithSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReduceWithSingle.java new file mode 100644 index 0000000000..d4aa507918 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReduceWithSingle.java @@ -0,0 +1,59 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import java.util.concurrent.Callable; + +import io.reactivex.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.BiFunction; +import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.operators.observable.ObservableReduceSeedSingle.ReduceSeedObserver; + +/** + * Reduce a sequence of values, starting from a generated seed value and by using + * an accumulator function and return the last accumulated value. + * + * @param the source value type + * @param the accumulated result type + */ +public final class ObservableReduceWithSingle extends Single { + + final ObservableSource source; + + final Callable seedSupplier; + + final BiFunction reducer; + + public ObservableReduceWithSingle(ObservableSource source, Callable seedSupplier, BiFunction reducer) { + this.source = source; + this.seedSupplier = seedSupplier; + this.reducer = reducer; + } + + @Override + protected void subscribeActual(SingleObserver observer) { + R seed; + + try { + seed = ObjectHelper.requireNonNull(seedSupplier.call(), "The seedSupplier returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + EmptyDisposable.error(ex, observer); + return; + } + source.subscribe(new ReduceSeedObserver(observer, reducer, seed)); + } +} diff --git a/src/perf/java/io/reactivex/ReducePerf.java b/src/perf/java/io/reactivex/ReducePerf.java new file mode 100644 index 0000000000..826d7cbc4e --- /dev/null +++ b/src/perf/java/io/reactivex/ReducePerf.java @@ -0,0 +1,80 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.functions.BiFunction; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1, jvmArgsAppend = { "-XX:MaxInlineLevel=20" }) +@State(Scope.Thread) +public class ReducePerf implements BiFunction { + @Param({ "1", "1000", "1000000" }) + public int times; + + Single obsSingle; + + Single flowSingle; + + Maybe obsMaybe; + + Maybe flowMaybe; + + @Override + public Integer apply(Integer t1, Integer t2) throws Exception { + return t1 + t2; + } + + @Setup + public void setup() { + Integer[] array = new Integer[times]; + Arrays.fill(array, 777); + + obsSingle = Observable.fromArray(array).reduce(0, this); + + obsMaybe = Observable.fromArray(array).reduce(this); + + flowSingle = Flowable.fromArray(array).reduce(0, this); + + flowMaybe = Flowable.fromArray(array).reduce(this); + } + + @Benchmark + public void obsSingle(Blackhole bh) { + obsSingle.subscribe(new PerfConsumer(bh)); + } + + @Benchmark + public void flowSingle(Blackhole bh) { + flowSingle.subscribe(new PerfConsumer(bh)); + } + + @Benchmark + public void obsMaybe(Blackhole bh) { + obsMaybe.subscribe(new PerfConsumer(bh)); + } + + @Benchmark + public void flowMaybe(Blackhole bh) { + flowMaybe.subscribe(new PerfConsumer(bh)); + } +} \ No newline at end of file diff --git a/src/test/java/io/reactivex/InternalWrongNaming.java b/src/test/java/io/reactivex/InternalWrongNaming.java index e1e43b8ccd..6c1bb13650 100644 --- a/src/test/java/io/reactivex/InternalWrongNaming.java +++ b/src/test/java/io/reactivex/InternalWrongNaming.java @@ -173,6 +173,8 @@ public void flowableNoObserver() throws Exception { "FlowableLastMaybe", "FlowableIgnoreElementsCompletable", "FlowableReduceMaybe", + "FlowableReduceWithSingle", + "FlowableReduceSeedSingle", "FlowableFlatMapCompletable", "FlowableFlatMapCompletableCompletable", "FlowableFlatMapSingle", diff --git a/src/test/java/io/reactivex/processors/DelayedFlowableProcessorTest.java b/src/test/java/io/reactivex/processors/DelayedFlowableProcessorTest.java index 8793be32a7..2c101a03b4 100644 --- a/src/test/java/io/reactivex/processors/DelayedFlowableProcessorTest.java +++ b/src/test/java/io/reactivex/processors/DelayedFlowableProcessorTest.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.processors; import io.reactivex.subscribers.TestSubscriber; diff --git a/src/test/java/io/reactivex/processors/FlowableProcessorTest.java b/src/test/java/io/reactivex/processors/FlowableProcessorTest.java index 058fc13f14..18e850f139 100644 --- a/src/test/java/io/reactivex/processors/FlowableProcessorTest.java +++ b/src/test/java/io/reactivex/processors/FlowableProcessorTest.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.processors; import org.junit.Test;