Skip to content

Commit

Permalink
Optimized scalar observeOn/subscribeOn
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Feb 23, 2015
1 parent dded0d2 commit cea2d9d
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 7 deletions.
6 changes: 6 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5175,6 +5175,9 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
* @see #subscribeOn
*/
public final Observable<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler));
}

Expand Down Expand Up @@ -7597,6 +7600,9 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
* @see #observeOn
*/
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.schedulers;
package rx.internal.schedulers;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.schedulers.NewThreadWorker;
import rx.internal.schedulers.ScheduledAction;
import rx.internal.util.RxThreadFactory;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/* package */class EventLoopsScheduler extends Scheduler {
public class EventLoopsScheduler extends Scheduler {
/** Manages a fixed number of workers. */
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-";
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);
Expand Down Expand Up @@ -76,14 +74,25 @@ public PoolWorker getEventLoop() {
* Create a scheduler with pool size equal to the available processor
* count and using least-recent worker selection policy.
*/
EventLoopsScheduler() {
public EventLoopsScheduler() {
pool = new FixedSchedulerPool();
}

@Override
public Worker createWorker() {
return new EventLoopWorker(pool.getEventLoop());
}

/**
* Schedules the action directly on one of the event loop workers
* without the additional infrastructure and checking.
* @param action the action to schedule
* @return the subscription
*/
public Subscription scheduleDirect(Action0 action) {
PoolWorker pw = pool.getEventLoop();
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS);
}

private static class EventLoopWorker extends Scheduler.Worker {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
Expand Down
71 changes: 69 additions & 2 deletions src/main/java/rx/internal/util/ScalarSynchronousObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package rx.internal.util;

import rx.Observable;
import rx.Subscriber;
import rx.*;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.internal.schedulers.EventLoopsScheduler;

public final class ScalarSynchronousObservable<T> extends Observable<T> {

Expand Down Expand Up @@ -49,5 +51,70 @@ public void call(Subscriber<? super T> s) {
public T get() {
return t;
}
/**
* Customized observeOn/subscribeOn implementation which emits the scalar
* value directly or with less overhead on the specified scheduler.
* @param scheduler the target scheduler
* @return the new observable
*/
public Observable<T> scalarScheduleOn(Scheduler scheduler) {
if (scheduler instanceof EventLoopsScheduler) {
EventLoopsScheduler es = (EventLoopsScheduler) scheduler;
return create(new DirectScheduledEmission<T>(es, t));
}
return create(new NormalScheduledEmission<T>(scheduler, t));
}

/** Optimized observeOn for scalar value observed on the EventLoopsScheduler. */
static final class DirectScheduledEmission<T> implements OnSubscribe<T> {
private final EventLoopsScheduler es;
private final T value;
DirectScheduledEmission(EventLoopsScheduler es, T value) {
this.es = es;
this.value = value;
}
@Override
public void call(final Subscriber<? super T> child) {
child.add(es.scheduleDirect(new ScalarSynchronousAction<T>(child, value)));
}
}
/** Emits a scalar value on a general scheduler. */
static final class NormalScheduledEmission<T> implements OnSubscribe<T> {
private final Scheduler scheduler;
private final T value;

NormalScheduledEmission(Scheduler scheduler, T value) {
this.scheduler = scheduler;
this.value = value;
}

@Override
public void call(final Subscriber<? super T> subscriber) {
Worker worker = scheduler.createWorker();
subscriber.add(worker);
worker.schedule(new ScalarSynchronousAction<T>(subscriber, value));
}
}
/** Action that emits a single value when called. */
static final class ScalarSynchronousAction<T> implements Action0 {
private final Subscriber<? super T> subscriber;
private final T value;

private ScalarSynchronousAction(Subscriber<? super T> subscriber,
T value) {
this.subscriber = subscriber;
this.value = value;
}

@Override
public void call() {
try {
subscriber.onNext(value);
} catch (Throwable t) {
subscriber.onError(t);
return;
}
subscriber.onCompleted();
}
}
}
1 change: 1 addition & 0 deletions src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.schedulers;

import rx.Scheduler;
import rx.internal.schedulers.EventLoopsScheduler;
import rx.plugins.RxJavaPlugins;

import java.util.concurrent.Executor;
Expand Down

0 comments on commit cea2d9d

Please sign in to comment.