Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reentrant scheduling2 #648

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,52 @@ public long now() {
public int degreeOfParallelism() {
return Runtime.getRuntime().availableProcessors();
}
/**
* Schedule a task to be run immediately.
* @param r the task to run immediately
* @return the subscription to cancel the schedule
*/
public Subscription scheduleRunnable(final Runnable r) {
return schedule(new Action0() {
@Override
public void call() {
r.run();
}
});
}

/**
* Schedule a task to be run after the delay time.
* @param r the task to schedule
* @param delayTime the time to delay the execution
* @param unit the time unit
* @return the subscription to cancel the schedule
*/
public Subscription scheduleRunnable(final Runnable r, long delayTime, TimeUnit unit) {
return schedule(new Action0() {
@Override
public void call() {
r.run();
}
}, delayTime, unit);
}

/**
* Schedule a task to be run after the delay time and after
* each period.
* @param r the task to schedule
* @param initialDelay the initial delay of the schedule
* @param period the between period of the schedule
* @param unit the time unit
* @return the subscription to cancel the schedule
*/
public Subscription scheduleRunnable(final Runnable r, long initialDelay, long period, TimeUnit unit) {
return schedulePeriodically(new Action0() {
@Override
public void call() {
r.run();
}
}, initialDelay, period, unit);

}
}
137 changes: 79 additions & 58 deletions rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.IncrementalSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func2;

Expand All @@ -47,18 +48,17 @@ public ExecutorScheduler(ScheduledExecutorService executor) {
@Override
public <T> Subscription schedulePeriodically(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
if (executor instanceof ScheduledExecutorService) {
final CompositeSubscription subscriptions = new CompositeSubscription();
CompositeSubscription subscription = new CompositeSubscription();
final IncrementalSubscription scheduleSub = new IncrementalSubscription();
final IncrementalSubscription actionSub = new IncrementalSubscription();
subscription.add(scheduleSub);
subscription.add(actionSub);

ScheduledFuture<?> f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Subscription s = action.call(ExecutorScheduler.this, state);
subscriptions.add(s);
}
}, initialDelay, period, unit);
final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription);

subscriptions.add(Subscriptions.from(f));
return subscriptions;
_scheduler.schedulePeriodically(state, action, initialDelay, period, unit);

return subscription;

} else {
return super.schedulePeriodically(state, action, initialDelay, period, unit);
Expand All @@ -67,81 +67,102 @@ public void run() {

@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
final Scheduler _scheduler = this;
CompositeSubscription subscription = new CompositeSubscription();
final IncrementalSubscription scheduleSub = new IncrementalSubscription();
final IncrementalSubscription actionSub = new IncrementalSubscription();
subscription.add(scheduleSub);
subscription.add(actionSub);

final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription);

_scheduler.schedule(state, action, delayTime, unit);

return subscription;
}

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
// all subscriptions that may need to be unsubscribed
final CompositeSubscription subscription = new CompositeSubscription(discardableAction);
CompositeSubscription subscription = new CompositeSubscription();
final IncrementalSubscription scheduleSub = new IncrementalSubscription();
final IncrementalSubscription actionSub = new IncrementalSubscription();
subscription.add(scheduleSub);
subscription.add(actionSub);

final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription);

_scheduler.schedule(state, action);

return subscription;
}

@Override
public Subscription scheduleRunnable(Runnable r, long delayTime, TimeUnit unit) {
if (executor instanceof ScheduledExecutorService) {
// we are a ScheduledExecutorService so can do proper scheduling
ScheduledFuture<?> f = ((ScheduledExecutorService) executor).schedule(new Runnable() {
@Override
public void run() {
// when the delay has passed we now do the work on the actual scheduler
Subscription s = discardableAction.call(_scheduler);
// add the subscription to the CompositeSubscription so it is unsubscribed
subscription.add(s);
}
}, delayTime, unit);
ScheduledFuture<?> f = ((ScheduledExecutorService) executor).schedule(r, delayTime, unit);
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
subscription.add(Subscriptions.from(f));
return Subscriptions.from(f);
} else {
// we are not a ScheduledExecutorService so can't directly schedule
if (delayTime == 0) {
// no delay so put on the thread-pool right now
Subscription s = schedule(state, action);
// add the subscription to the CompositeSubscription so it is unsubscribed
subscription.add(s);
return scheduleRunnable(r);
} else {
// there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService
// to handle the scheduling and once it's ready then execute on this Executor
ScheduledFuture<?> f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() {

@Override
public void run() {
// now execute on the real Executor (by using the other overload that schedules for immediate execution)
Subscription s = _scheduler.schedule(state, action);
// add the subscription to the CompositeSubscription so it is unsubscribed
subscription.add(s);
}
}, delayTime, unit);
ScheduledFuture<?> f = GenericScheduledExecutorService.getInstance().schedule(r, delayTime, unit);
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
subscription.add(Subscriptions.from(f));
return Subscriptions.from(f);
}
}
return subscription;
}

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
final Scheduler _scheduler = this;
// all subscriptions that may need to be unsubscribed
final CompositeSubscription subscription = new CompositeSubscription(discardableAction);

// work to be done on a thread
Runnable r = new Runnable() {
@Override
public void run() {
Subscription s = discardableAction.call(_scheduler);
// add the subscription to the CompositeSubscription so it is unsubscribed
subscription.add(s);
}
};

public Subscription scheduleRunnable(Runnable r) {
// submit for immediate execution
if (executor instanceof ExecutorService) {
// we are an ExecutorService so get a Future back that supports unsubscribe
Future<?> f = ((ExecutorService) executor).submit(r);
// add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens
subscription.add(Subscriptions.from(f));
return Subscriptions.from(f);
} else {
// we are the lowest common denominator so can't unsubscribe once we execute
executor.execute(r);
return Subscriptions.empty();
}
}

return subscription;
@Override
public Subscription scheduleRunnable(final Runnable r, long initialDelay,
final long period, final TimeUnit unit) {
if (executor instanceof ScheduledExecutorService) {
ScheduledFuture<?> f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(r, initialDelay, period, unit);

return Subscriptions.from(f);
} else {
final IncrementalSubscription fs = new IncrementalSubscription();
Runnable rerun = new Runnable() {
@Override
public void run() {
if (!fs.isUnsubscribed()) {
long time = System.nanoTime();
r.run();
long delta = Math.max(0L, System.nanoTime() - time);
long periodNanos = Math.max(0L, unit.toNanos(period) - delta);

long index = fs.nextIndex();
Subscription s = scheduleRunnable(this, periodNanos, TimeUnit.NANOSECONDS);
fs.compareExchange(index, s, false);
}
}
};

long index = fs.nextIndex();

Subscription s = scheduleRunnable(rerun, initialDelay, unit);
fs.compareExchange(index, s, false);
return fs;
}
}

}
154 changes: 154 additions & 0 deletions rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.schedulers;

import java.util.concurrent.TimeUnit;
import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.IncrementalSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Do not re-enter the main scheduler's schedule() method as it will
* unnecessarily chain the subscriptions of every invocation.
*/
public final class ReentrantScheduler extends Scheduler {
final Scheduler parent;
final IncrementalSubscription scheduleSub;
final IncrementalSubscription actionSub;
final CompositeSubscription composite;

public ReentrantScheduler(
Scheduler parent,
IncrementalSubscription scheduleSub,
IncrementalSubscription actionSub,
CompositeSubscription composite) {
this.parent = parent;
this.scheduleSub = scheduleSub;
this.actionSub = actionSub;
this.composite = composite;
}

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
if (composite.isUnsubscribed()) {
// don't bother scheduling a task which wouldn't run anyway
return Subscriptions.empty();
}
long index = actionSub.nextIndex();
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);

actionSub.compareExchange(index, discardableAction, false);

Runnable r = new RunTask(discardableAction);

long sindex = scheduleSub.nextIndex();
Subscription s = parent.scheduleRunnable(r);
scheduleSub.compareExchange(sindex, s, false);

return s;
}

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
if (composite.isUnsubscribed()) {
// don't bother scheduling a task which wouldn't run anyway
return Subscriptions.empty();
}

long index = actionSub.nextIndex();
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
actionSub.compareExchange(index, discardableAction, false);

Runnable r = new RunTask(discardableAction);

long sindex = scheduleSub.nextIndex();
Subscription s = parent.scheduleRunnable(r, delayTime, unit);
scheduleSub.compareExchange(sindex, s, false);

return s;
}

@Override
public <T> Subscription schedulePeriodically(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
if (composite.isUnsubscribed()) {
// don't bother scheduling a task which wouldn't run anyway
return Subscriptions.empty();
}

long index = actionSub.nextIndex();
final PeriodicAction<T> periodicAction = new PeriodicAction<T>(state, action);
actionSub.compareExchange(index, periodicAction, false);

Runnable r = new RunTask(periodicAction);

long sindex = scheduleSub.nextIndex();
Subscription s = parent.scheduleRunnable(r, initialDelay, period, unit);
scheduleSub.compareExchange(sindex, s, false);

return s;
}
/** The task runner. */
private final class RunTask implements Runnable {
final Func1<Scheduler, Subscription> action;

public RunTask(Func1<Scheduler, Subscription> action) {
this.action = action;
}

@Override
public void run() {
long index = actionSub.nextIndex();
Subscription s = action.call(ReentrantScheduler.this);
actionSub.compareExchange(index, s, false);
}

}
/**
* An action that calls the underlying function in a periodic environment.
* @param <T> the state value type
*/
private static final class PeriodicAction<T> implements Subscription, Func1<Scheduler, Subscription> {
final T state;
final Func2<? super Scheduler, ? super T, ? extends Subscription> underlying;
final SerialSubscription ssub;

public PeriodicAction(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> underlying) {
this.state = state;
this.underlying = underlying;
this.ssub = new SerialSubscription();
}

@Override
public Subscription call(Scheduler scheduler) {
if (!ssub.isUnsubscribed()) {
Subscription s = underlying.call(scheduler, state);
ssub.setSubscription(s);
return ssub;
}
return Subscriptions.empty();
}

@Override
public void unsubscribe() {
ssub.unsubscribe();
}
}
}
Loading