Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Perf Tests #1399

Merged
merged 1 commit into from
Jul 2, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions rxjava-core/src/perf/java/rx/PerfBaseline.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ public void observableConsumption(Input input) throws InterruptedException {
public void observableViaRange(Input input) throws InterruptedException {
input.observable.subscribe(input.observer);
}

@Benchmark
public void observableConsumptionUnsafe(Input input) throws InterruptedException {
input.firehose.unsafeSubscribe(input.newSubscriber());
}

@Benchmark
public void observableViaRangeUnsafe(Input input) throws InterruptedException {
input.observable.unsafeSubscribe(input.newSubscriber());
}

@Benchmark
public void iterableViaForLoopConsumption(Input input) throws InterruptedException {
Expand Down
25 changes: 23 additions & 2 deletions rxjava-core/src/perf/java/rx/jmh/InputWithIncrementingInteger.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public abstract class InputWithIncrementingInteger {
public Iterable<Integer> iterable;
public Observable<Integer> observable;
public Observable<Integer> firehose;
private Blackhole bh;
public Blackhole bh;
public Observer<Integer> observer;

public abstract int getSize();

@Setup
public void setup(final Blackhole bh) {
this.bh = bh;
Expand Down Expand Up @@ -106,4 +106,25 @@ public LatchedObserver<Integer> newLatchedObserver() {
return new LatchedObserver<Integer>(bh);
}

public Subscriber<Integer> newSubscriber() {
return new Subscriber<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer t) {
bh.consume(t);
}

};
}

}
62 changes: 47 additions & 15 deletions rxjava-core/src/perf/java/rx/operators/OperatorMergePerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
Expand All @@ -39,21 +39,25 @@
@OutputTimeUnit(TimeUnit.SECONDS)
public class OperatorMergePerf {

@State(Scope.Thread)
public static class Input extends InputWithIncrementingInteger {

@Param({ "1", "1000" })
public int size;
// flatMap
@Benchmark
public void oneStreamOfNthatMergesIn1(final InputMillion input) throws InterruptedException {
Observable<Observable<Integer>> os = Observable.range(1, input.size).map(new Func1<Integer, Observable<Integer>>() {

@Override
public int getSize() {
return size;
}
@Override
public Observable<Integer> call(Integer i) {
return Observable.just(i);
}

});
LatchedObserver<Integer> o = input.newLatchedObserver();
Observable.merge(os).subscribe(o);
o.latch.await();
}

// flatMap
@Benchmark
public void merge1SyncStreamOfN(final Input input) throws InterruptedException {
public void merge1SyncStreamOfN(final InputMillion input) throws InterruptedException {
Observable<Observable<Integer>> os = Observable.just(1).map(new Func1<Integer, Observable<Integer>>() {

@Override
Expand All @@ -66,9 +70,9 @@ public Observable<Integer> call(Integer i) {
Observable.merge(os).subscribe(o);
o.latch.await();
}

@Benchmark
public void mergeNSyncStreamsOfN(final Input input) throws InterruptedException {
public void mergeNSyncStreamsOfN(final InputThousand input) throws InterruptedException {
Observable<Observable<Integer>> os = input.observable.map(new Func1<Integer, Observable<Integer>>() {

@Override
Expand All @@ -83,7 +87,7 @@ public Observable<Integer> call(Integer i) {
}

@Benchmark
public void mergeNAsyncStreamsOfN(final Input input) throws InterruptedException {
public void mergeNAsyncStreamsOfN(final InputThousand input) throws InterruptedException {
Observable<Observable<Integer>> os = input.observable.map(new Func1<Integer, Observable<Integer>>() {

@Override
Expand All @@ -98,7 +102,7 @@ public Observable<Integer> call(Integer i) {
}

@Benchmark
public void mergeTwoAsyncStreamsOfN(final Input input) throws InterruptedException {
public void mergeTwoAsyncStreamsOfN(final InputThousand input) throws InterruptedException {
LatchedObserver<Integer> o = input.newLatchedObserver();
Observable<Integer> ob = Observable.range(0, input.size).subscribeOn(Schedulers.computation());
Observable.merge(ob, ob).subscribe(o);
Expand All @@ -115,6 +119,7 @@ public void mergeNSyncStreamsOf1(final InputForMergeN input) throws InterruptedE
@State(Scope.Thread)
public static class InputForMergeN {
@Param({ "1", "100", "1000" })
// @Param({ "1000" })
public int size;

private Blackhole bh;
Expand All @@ -134,4 +139,31 @@ public LatchedObserver<Integer> newLatchedObserver() {
}
}

@State(Scope.Thread)
public static class InputMillion extends InputWithIncrementingInteger {

@Param({ "1", "1000", "1000000" })
// @Param({ "1000" })
public int size;

@Override
public int getSize() {
return size;
}

}

@State(Scope.Thread)
public static class InputThousand extends InputWithIncrementingInteger {

@Param({ "1", "1000" })
// @Param({ "1000" })
public int size;

@Override
public int getSize() {
return size;
}

}
}