From 78572f35e7ae1d41523ae888656d91f8ac76f3d0 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 2 Jul 2014 09:45:10 -0700 Subject: [PATCH] Update Perf Tests matching with work being done for 0.20 to allow comparisons --- .../src/perf/java/rx/PerfBaseline.java | 10 +++ .../rx/jmh/InputWithIncrementingInteger.java | 25 +++++++- .../java/rx/operators/OperatorMergePerf.java | 62 ++++++++++++++----- 3 files changed, 80 insertions(+), 17 deletions(-) diff --git a/rxjava-core/src/perf/java/rx/PerfBaseline.java b/rxjava-core/src/perf/java/rx/PerfBaseline.java index 6242b70645..828f690b7a 100644 --- a/rxjava-core/src/perf/java/rx/PerfBaseline.java +++ b/rxjava-core/src/perf/java/rx/PerfBaseline.java @@ -39,6 +39,16 @@ public void observableConsumption(Input input) throws InterruptedException { public void observableViaRange(Input input) throws InterruptedException { input.observable.subscribe(input.observer); } + + @Benchmark + public void observableConsumptionUnsafe(Input input) throws InterruptedException { + input.firehose.unsafeSubscribe(input.newSubscriber()); + } + + @Benchmark + public void observableViaRangeUnsafe(Input input) throws InterruptedException { + input.observable.unsafeSubscribe(input.newSubscriber()); + } @Benchmark public void iterableViaForLoopConsumption(Input input) throws InterruptedException { diff --git a/rxjava-core/src/perf/java/rx/jmh/InputWithIncrementingInteger.java b/rxjava-core/src/perf/java/rx/jmh/InputWithIncrementingInteger.java index e069b71c90..f86bc28117 100644 --- a/rxjava-core/src/perf/java/rx/jmh/InputWithIncrementingInteger.java +++ b/rxjava-core/src/perf/java/rx/jmh/InputWithIncrementingInteger.java @@ -32,11 +32,11 @@ public abstract class InputWithIncrementingInteger { public Iterable iterable; public Observable observable; public Observable firehose; - private Blackhole bh; + public Blackhole bh; public Observer observer; public abstract int getSize(); - + @Setup public void setup(final Blackhole bh) { this.bh = bh; @@ -106,4 +106,25 @@ public LatchedObserver newLatchedObserver() { return new LatchedObserver(bh); } + public Subscriber newSubscriber() { + return new Subscriber() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Integer t) { + bh.consume(t); + } + + }; + } + } diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorMergePerf.java b/rxjava-core/src/perf/java/rx/operators/OperatorMergePerf.java index 687d81e789..7b770bee9e 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorMergePerf.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorMergePerf.java @@ -19,8 +19,8 @@ import java.util.List; import java.util.concurrent.TimeUnit; -import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; @@ -39,21 +39,25 @@ @OutputTimeUnit(TimeUnit.SECONDS) public class OperatorMergePerf { - @State(Scope.Thread) - public static class Input extends InputWithIncrementingInteger { - - @Param({ "1", "1000" }) - public int size; + // flatMap + @Benchmark + public void oneStreamOfNthatMergesIn1(final InputMillion input) throws InterruptedException { + Observable> os = Observable.range(1, input.size).map(new Func1>() { - @Override - public int getSize() { - return size; - } + @Override + public Observable call(Integer i) { + return Observable.just(i); + } + }); + LatchedObserver o = input.newLatchedObserver(); + Observable.merge(os).subscribe(o); + o.latch.await(); } + // flatMap @Benchmark - public void merge1SyncStreamOfN(final Input input) throws InterruptedException { + public void merge1SyncStreamOfN(final InputMillion input) throws InterruptedException { Observable> os = Observable.just(1).map(new Func1>() { @Override @@ -66,9 +70,9 @@ public Observable call(Integer i) { Observable.merge(os).subscribe(o); o.latch.await(); } - + @Benchmark - public void mergeNSyncStreamsOfN(final Input input) throws InterruptedException { + public void mergeNSyncStreamsOfN(final InputThousand input) throws InterruptedException { Observable> os = input.observable.map(new Func1>() { @Override @@ -83,7 +87,7 @@ public Observable call(Integer i) { } @Benchmark - public void mergeNAsyncStreamsOfN(final Input input) throws InterruptedException { + public void mergeNAsyncStreamsOfN(final InputThousand input) throws InterruptedException { Observable> os = input.observable.map(new Func1>() { @Override @@ -98,7 +102,7 @@ public Observable call(Integer i) { } @Benchmark - public void mergeTwoAsyncStreamsOfN(final Input input) throws InterruptedException { + public void mergeTwoAsyncStreamsOfN(final InputThousand input) throws InterruptedException { LatchedObserver o = input.newLatchedObserver(); Observable ob = Observable.range(0, input.size).subscribeOn(Schedulers.computation()); Observable.merge(ob, ob).subscribe(o); @@ -115,6 +119,7 @@ public void mergeNSyncStreamsOf1(final InputForMergeN input) throws InterruptedE @State(Scope.Thread) public static class InputForMergeN { @Param({ "1", "100", "1000" }) + // @Param({ "1000" }) public int size; private Blackhole bh; @@ -134,4 +139,31 @@ public LatchedObserver newLatchedObserver() { } } + @State(Scope.Thread) + public static class InputMillion extends InputWithIncrementingInteger { + + @Param({ "1", "1000", "1000000" }) + // @Param({ "1000" }) + public int size; + + @Override + public int getSize() { + return size; + } + + } + + @State(Scope.Thread) + public static class InputThousand extends InputWithIncrementingInteger { + + @Param({ "1", "1000" }) + // @Param({ "1000" }) + public int size; + + @Override + public int getSize() { + return size; + } + + } }