Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ScheduledAction to public API #2592

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions src/main/java/rx/internal/schedulers/NewThreadWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,51 @@
import rx.internal.util.RxThreadFactory;
import rx.plugins.*;
import rx.subscriptions.Subscriptions;
import rx.schedulers.ScheduledAction;

/**
* @warn class description missing
* Represents a {@code Scheduler.Worker} which creates a new, single threaded {@code ScheduledExecutorService} with each instance.
* Calling {@code unsubscribe()} on the returned {@code Scheduler.Worker}
* shuts down the underlying {@code ScheduledExecutorService} with its {@code shutdownNow}, cancelling
* any pending or running tasks.
* <p>
* This class can be embedded/extended to build various kinds of {@code Scheduler}s, but doesn't
* track submitted tasks directly because the termination of the underlying {@code ScheduledExecutorService}
* via {@code shutdownNow()} ensures all pending or running tasks are cancelled. However, since
* uses of this class may require additional task tracking, the {@code NewThreadWorker} exposes the
* {@link #scheduleActual(Action0, long, TimeUnit)} method which returns a {@link rx.schedulers.ScheduledAction}
* directly. See {@code ScheduledAction} for further details on the usage of the class.
* <p><b>System-wide properties:</b>
* <ul>
* <li>{@code rx.scheduler.jdk6.purge-frequency-millis}
* <dd>Specifies the purge frequency (in milliseconds) to remove cancelled tasks on a JDK 6 {@code ScheduledExecutorService}.
* Default is 1000 milliseconds. The purge Thread name is prefixed by {@code "RxSchedulerPurge-"}.</br>
* <li>{@code rx.scheduler.jdk6.purge-force}
* <dd>Forces the use of {@code purge()} on JDK 7+ instead of the O(log n) {@code remove()} when a task is cancelled. {@code "true"} or {@code "false"} (default).</br>
* </li>
* </ul>
* @see rx.schedulers.ScheduledAction
*/
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
/** The underlying executor service. */
private final ScheduledExecutorService executor;
/** The hook to decorate each submitted task. */
private final RxJavaSchedulersHook schedulersHook;
/** Indicates the unsubscribed state of the worker. */
volatile boolean isUnsubscribed;
/** The purge frequency in milliseconds. */
private static final String FREQUENCY_KEY = "rx.scheduler.jdk6.purge-frequency-millis";
/** Force the use of purge (true/false). */
private static final String PURGE_FORCE_KEY = "rx.scheduler.jdk6.purge-force";
/** The thread name prefix for the purge thread. */
private static final String PURGE_THREAD_PREFIX = "RxSchedulerPurge-";
/** Forces the use of purge even if setRemoveOnCancelPolicy is available. */
private static final boolean PURGE_FORCE;
/** The purge frequency in milliseconds. */
public static final int PURGE_FREQUENCY;
/** Tracks the instantiated executors for periodic purging. */
private static final ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> EXECUTORS;
/** References the executor service which purges the registered executors periodically. */
private static final AtomicReference<ScheduledExecutorService> PURGE;
static {
EXECUTORS = new ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor>();
Expand Down Expand Up @@ -129,7 +156,11 @@ public static boolean tryEnableCancelPolicy(ScheduledExecutorService exec) {
return false;
}

/* package */
/**
* Constructs a new {@code NewThreadWorker} and uses the given {@code ThreadFactory} for
* the underlying {@code ScheduledExecutorService}.
* @param threadFactory the thread factory to use
*/
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
Expand All @@ -155,11 +186,15 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
}

/**
* @warn javadoc missing
* @param action
* @param delayTime
* @param unit
* @return
* Schedules the given action on the underlying executor and returns a {@code ScheduledAction}
* instance that allows tracking the task.
* <p>The aim of this method to allow direct access to the created ScheduledAction from
* other {@code Scheduler} implementations building upon a {@code NewThreadWorker}. Note that the method
* doesn't check if the worker instance has been unsubscribed or not for performance reasons.
* @param action the action to schedule
* @param delayTime the delay time in scheduling the action, negative value indicates an immediate scheduling
* @param unit the time unit for the {@code delayTime} parameter
* @return a new {@code ScheduledAction} instance
*/
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
Expand Down
158 changes: 0 additions & 158 deletions src/main/java/rx/internal/schedulers/ScheduledAction.java

This file was deleted.

2 changes: 1 addition & 1 deletion src/main/java/rx/schedulers/CachedThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.schedulers.NewThreadWorker;
import rx.internal.schedulers.ScheduledAction;
import rx.schedulers.ScheduledAction;
import rx.internal.util.RxThreadFactory;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/schedulers/EventLoopsScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.schedulers.NewThreadWorker;
import rx.internal.schedulers.ScheduledAction;
import rx.schedulers.ScheduledAction;
import rx.internal.util.RxThreadFactory;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
Expand Down
Loading