From e52627c088a13ab1bf085ee92e5c6299ecd2db80 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 18 Feb 2017 11:42:37 +0100 Subject: [PATCH 1/2] 2.x: benchmark the new strict/interop mode --- .../io/reactivex/PerfInteropConsumer.java | 62 +++++++++++++++++++ src/perf/java/io/reactivex/StrictPerf.java | 51 +++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 src/perf/java/io/reactivex/PerfInteropConsumer.java create mode 100644 src/perf/java/io/reactivex/StrictPerf.java diff --git a/src/perf/java/io/reactivex/PerfInteropConsumer.java b/src/perf/java/io/reactivex/PerfInteropConsumer.java new file mode 100644 index 0000000000..2ec7b801a5 --- /dev/null +++ b/src/perf/java/io/reactivex/PerfInteropConsumer.java @@ -0,0 +1,62 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.*; + +import io.reactivex.disposables.Disposable; + +/** + * A multi-type synchronous consumer that doesn't implement FlowableSubscriber and + * thus should be treated by Flowable as a candidate for strict interop. + */ +public final class PerfInteropConsumer implements Subscriber, Observer, +SingleObserver, CompletableObserver, MaybeObserver { + + final Blackhole bh; + + public PerfInteropConsumer(Blackhole bh) { + this.bh = bh; + } + + @Override + public void onSuccess(Object value) { + bh.consume(value); + } + + @Override + public void onSubscribe(Disposable d) { + } + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Object t) { + bh.consume(t); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + @Override + public void onComplete() { + bh.consume(true); + } +} diff --git a/src/perf/java/io/reactivex/StrictPerf.java b/src/perf/java/io/reactivex/StrictPerf.java new file mode 100644 index 0000000000..3d879b1698 --- /dev/null +++ b/src/perf/java/io/reactivex/StrictPerf.java @@ -0,0 +1,51 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class StrictPerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Flowable source; + + @Setup + public void setup() { + Integer[] array = new Integer[count]; + Arrays.fill(array, 777); + + source = Flowable.fromArray(array); + } + + @Benchmark + public void internal(Blackhole bh) { + source.subscribe(new PerfConsumer(bh)); + } + + @Benchmark + public void external(Blackhole bh) { + source.subscribe(new PerfInteropConsumer(bh)); + } +} From 95ed71a3a1a433ba849f06b785fa9b6d182f3e0c Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 18 Feb 2017 14:00:44 +0100 Subject: [PATCH 2/2] consider CPU usage --- src/perf/java/io/reactivex/StrictPerf.java | 72 +++++++++++++++++++++- 1 file changed, 70 insertions(+), 2 deletions(-) diff --git a/src/perf/java/io/reactivex/StrictPerf.java b/src/perf/java/io/reactivex/StrictPerf.java index 3d879b1698..aaba2d2107 100644 --- a/src/perf/java/io/reactivex/StrictPerf.java +++ b/src/perf/java/io/reactivex/StrictPerf.java @@ -18,6 +18,7 @@ import org.openjdk.jmh.annotations.*; import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.*; @BenchmarkMode(Mode.Throughput) @Warmup(iterations = 5) @@ -29,6 +30,9 @@ public class StrictPerf { @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) public int count; + @Param({ "1", "10", "100", "1000", "10000" }) + public int cpu; + Flowable source; @Setup @@ -41,11 +45,75 @@ public void setup() { @Benchmark public void internal(Blackhole bh) { - source.subscribe(new PerfConsumer(bh)); + source.subscribe(new InternalConsumer(bh, cpu)); } @Benchmark public void external(Blackhole bh) { - source.subscribe(new PerfInteropConsumer(bh)); + source.subscribe(new ExternalConsumer(bh, cpu)); + } + + static final class InternalConsumer implements FlowableSubscriber { + final Blackhole bh; + + final int cycles; + + InternalConsumer(Blackhole bh, int cycles) { + this.bh = bh; + this.cycles = cycles; + } + + @Override + public void onNext(Object t) { + bh.consume(t); + Blackhole.consumeCPU(cycles); + } + + @Override + public void onError(Throwable t) { + bh.consume(t); + } + + @Override + public void onComplete() { + bh.consume(true); + } + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + } + + static final class ExternalConsumer implements Subscriber { + final Blackhole bh; + + final int cycles; + + ExternalConsumer(Blackhole bh, int cycles) { + this.bh = bh; + this.cycles = cycles; + } + + @Override + public void onNext(Object t) { + bh.consume(t); + Blackhole.consumeCPU(cycles); + } + + @Override + public void onError(Throwable t) { + bh.consume(t); + } + + @Override + public void onComplete() { + bh.consume(true); + } + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } } }