From 88ea0923a4edde88e98787ec9825ef868819e064 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 5 Nov 2015 09:30:25 +0100 Subject: [PATCH] 1.x: benchmark just() and its optimizations. --- src/perf/java/rx/ScalarJustPerf.java | 197 +++++++++++++++++++++++++++ 1 file changed, 197 insertions(+) create mode 100644 src/perf/java/rx/ScalarJustPerf.java diff --git a/src/perf/java/rx/ScalarJustPerf.java b/src/perf/java/rx/ScalarJustPerf.java new file mode 100644 index 0000000000..24543852ff --- /dev/null +++ b/src/perf/java/rx/ScalarJustPerf.java @@ -0,0 +1,197 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx; + +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import rx.functions.Func1; +import rx.jmh.LatchedObserver; +import rx.schedulers.Schedulers; + +/** + * Benchmark the cost of just and its various optimizations. + *

+ * gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*ScalarJustPerf.*" + *

+ * gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*ScalarJustPerf.*" + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@State(Scope.Thread) +public class ScalarJustPerf { + /** A subscriber without a CountDownLatch; use it for synchronous testing only. */ + static final class PlainSubscriber extends Subscriber { + final Blackhole bh; + public PlainSubscriber(Blackhole bh) { + this.bh = bh; + } + + @Override + public void onNext(Integer t) { + bh.consume(t); + } + + @Override + public void onError(Throwable e) { + bh.consume(e); + } + + @Override + public void onCompleted() { + bh.consume(false); + } + } + + /** This is a simple just. */ + Observable simple; + /** + * This is a simple just observed on the computation scheduler. + * The current computation scheduler supports direct scheduling and should have + * lower overhead than a regular createWorker-use-unsubscribe. + */ + Observable observeOn; + /** This is a simple just observed on the IO thread. */ + Observable observeOnIO; + + /** + * This is a simple just subscribed to on the computation scheduler. + * In theory, for non-backpressured just(), this should be the + * same as observeOn. + */ + Observable subscribeOn; + /** This is a simple just subscribed to on the IO scheduler. */ + Observable subscribeOnIO; + + /** This is a just mapped to itself which should skip the operator flatMap completely. */ + Observable justFlatMapJust; + /** + * This is a just mapped to a range of 2 elements; it tests the case where the inner + * Observable isn't a just(). + */ + Observable justFlatMapRange; + + @Setup + public void setup() { + simple = Observable.just(1); + + observeOn = simple.observeOn(Schedulers.computation()); + observeOnIO = simple.observeOn(Schedulers.io()); + + subscribeOn = simple.subscribeOn(Schedulers.computation()); + subscribeOnIO = simple.subscribeOn(Schedulers.io()); + + justFlatMapJust = simple.flatMap(new Func1>() { + @Override + public Observable call(Integer v) { + return Observable.just(v); + } + }); + + justFlatMapRange = simple.flatMap(new Func1>() { + @Override + public Observable call(Integer v) { + return Observable.range(v, 2); + } + }); + } + + /** + * Common routine to create a latched observer, subscribe it to the + * given source and spin-wait for its completion. + *

Don't use this with long sources. The spin-wait is there + * to avoid operating-system level scheduling-wakeup granularity problems with + * short sources. + * @param bh the black hole to sink values and prevent dead code elimination + * @param source the source observable to observe + */ + void runAsync(Blackhole bh, Observable source) { + LatchedObserver lo = new LatchedObserver(bh); + + source.subscribe(lo); + + while (lo.latch.getCount() != 0L); + } + + @Benchmark + public void simple(Blackhole bh) { + PlainSubscriber s = new PlainSubscriber(bh); + simple.subscribe(s); + } + + @Benchmark + public void simpleEscape(Blackhole bh) { + PlainSubscriber s = new PlainSubscriber(bh); + bh.consume(s); + simple.subscribe(s); + } + + @Benchmark + public Object simpleEscapeAll(Blackhole bh) { + PlainSubscriber s = new PlainSubscriber(bh); + bh.consume(s); + return simple.subscribe(s); + } + + @Benchmark + public void observeOn(Blackhole bh) { + runAsync(bh, observeOn); + } + + @Benchmark + public void observeOnIO(Blackhole bh) { + runAsync(bh, observeOnIO); + } + + @Benchmark + public void subscribeOn(Blackhole bh) { + runAsync(bh, subscribeOn); + } + + @Benchmark + public void subscribeOnIO(Blackhole bh) { + runAsync(bh, subscribeOnIO); + } + + @Benchmark + public void justFlatMapJust(Blackhole bh) { + PlainSubscriber s = new PlainSubscriber(bh); + justFlatMapJust.subscribe(s); + } + + @Benchmark + public void justFlatMapJustEscape(Blackhole bh) { + PlainSubscriber s = new PlainSubscriber(bh); + bh.consume(s); + justFlatMapJust.subscribe(s); + } + + @Benchmark + public void justFlatMapRange(Blackhole bh) { + PlainSubscriber s = new PlainSubscriber(bh); + justFlatMapRange.subscribe(s); + } + + @Benchmark + public void justFlatMapRangeEscape(Blackhole bh) { + PlainSubscriber s = new PlainSubscriber(bh); + bh.consume(s); + justFlatMapRange.subscribe(s); + } +}