From 63c2abf2be0a2f460c1b3164635028b66ebb91e3 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 3 Feb 2015 22:15:26 +0100 Subject: [PATCH 1/3] Adding ScheduledAction to public API --- .../internal/schedulers/NewThreadWorker.java | 49 ++- .../internal/schedulers/ScheduledAction.java | 158 --------- .../rx/schedulers/CachedThreadScheduler.java | 2 +- .../rx/schedulers/EventLoopsScheduler.java | 2 +- .../java/rx/schedulers/ScheduledAction.java | 313 ++++++++++++++++++ .../rx/schedulers/NewThreadSchedulerTest.java | 85 ++++- 6 files changed, 441 insertions(+), 168 deletions(-) delete mode 100644 src/main/java/rx/internal/schedulers/ScheduledAction.java create mode 100644 src/main/java/rx/schedulers/ScheduledAction.java diff --git a/src/main/java/rx/internal/schedulers/NewThreadWorker.java b/src/main/java/rx/internal/schedulers/NewThreadWorker.java index 41144795cb..aca5fed5d8 100644 --- a/src/main/java/rx/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/rx/internal/schedulers/NewThreadWorker.java @@ -26,24 +26,51 @@ import rx.internal.util.RxThreadFactory; import rx.plugins.*; import rx.subscriptions.Subscriptions; +import rx.schedulers.ScheduledAction; /** - * @warn class description missing + * Represents a {@code Scheduler.Worker} which creates a new, single threaded {@code ScheduledExecutorService} with each instance. + * Calling {@code unsubscribe()} on the returned {@code Scheduler.Worker} + * shuts down the underlying {@code ScheduledExecutorService} with its {@code shutdownNow}, cancelling + * any pending or running tasks. + *

+ * This class can be embedded/extended to build various kinds of {@code Scheduler}s, but doesn't + * track submitted tasks directly because the termination of the underlying {@code ScheduledExecutorService} + * via {@code shutdownNow()} ensures all pending or running tasks are cancelled. However, since + * uses of this class may require additional task tracking, the {@code NewThreadWorker} exposes the + * {@link #scheduleActual(Action0, long, TimeUnit)} method which returns a {@link rx.schedulers.ScheduledAction} + * directly. See {@code ScheduledAction} for further details on the usage of the class. + *

System-wide properties: + *

+ * @see rx.schedulers.ScheduledAction */ public class NewThreadWorker extends Scheduler.Worker implements Subscription { + /** The underlying executor service. */ private final ScheduledExecutorService executor; + /** The hook to decorate each submitted task. */ private final RxJavaSchedulersHook schedulersHook; + /** Indicates the unsubscribed state of the worker. */ volatile boolean isUnsubscribed; /** The purge frequency in milliseconds. */ private static final String FREQUENCY_KEY = "rx.scheduler.jdk6.purge-frequency-millis"; /** Force the use of purge (true/false). */ private static final String PURGE_FORCE_KEY = "rx.scheduler.jdk6.purge-force"; + /** The thread name prefix for the purge thread. */ private static final String PURGE_THREAD_PREFIX = "RxSchedulerPurge-"; /** Forces the use of purge even if setRemoveOnCancelPolicy is available. */ private static final boolean PURGE_FORCE; /** The purge frequency in milliseconds. */ public static final int PURGE_FREQUENCY; + /** Tracks the instantiated executors for periodic purging. */ private static final ConcurrentHashMap EXECUTORS; + /** References the executor service which purges the registered executors periodically. */ private static final AtomicReference PURGE; static { EXECUTORS = new ConcurrentHashMap(); @@ -129,7 +156,11 @@ public static boolean tryEnableCancelPolicy(ScheduledExecutorService exec) { return false; } - /* package */ + /** + * Constructs a new {@code NewThreadWorker} and uses the given {@code ThreadFactory} for + * the underlying {@code ScheduledExecutorService}. + * @param threadFactory the thread factory to use + */ public NewThreadWorker(ThreadFactory threadFactory) { ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak @@ -155,11 +186,15 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit } /** - * @warn javadoc missing - * @param action - * @param delayTime - * @param unit - * @return + * Schedules the given action on the underlying executor and returns a {@code ScheduledAction} + * instance that allows tracking the task. + *

The aim of this method to allow direct access to the created ScheduledAction from + * other {@code Scheduler} implementations building upon a {@code NewThreadWorker}. Note that the method + * doesn't check if the worker instance has been unsubscribed or not for performance reasons. + * @param action the action to schedule + * @param delayTime the delay time in scheduling the action, negative value indicates an immediate scheduling + * @param unit the time unit for the {@code delayTime} parameter + * @return a new {@code ScheduledAction} instance */ public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) { Action0 decoratedAction = schedulersHook.onSchedule(action); diff --git a/src/main/java/rx/internal/schedulers/ScheduledAction.java b/src/main/java/rx/internal/schedulers/ScheduledAction.java deleted file mode 100644 index 24240096c9..0000000000 --- a/src/main/java/rx/internal/schedulers/ScheduledAction.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package rx.internal.schedulers; - -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import rx.Subscription; -import rx.exceptions.OnErrorNotImplementedException; -import rx.functions.Action0; -import rx.plugins.RxJavaPlugins; -import rx.subscriptions.CompositeSubscription; - -/** - * A {@code Runnable} that executes an {@code Action0} and can be cancelled. The analog is the - * {@code Subscriber} in respect of an {@code Observer}. - */ -public final class ScheduledAction extends AtomicReference implements Runnable, Subscription { - /** */ - private static final long serialVersionUID = -3962399486978279857L; - final CompositeSubscription cancel; - final Action0 action; - - public ScheduledAction(Action0 action) { - this.action = action; - this.cancel = new CompositeSubscription(); - } - - @Override - public void run() { - try { - lazySet(Thread.currentThread()); - action.call(); - } catch (Throwable e) { - // nothing to do but print a System error as this is fatal and there is nowhere else to throw this - IllegalStateException ie = null; - if (e instanceof OnErrorNotImplementedException) { - ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e); - } else { - ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e); - } - RxJavaPlugins.getInstance().getErrorHandler().handleError(ie); - Thread thread = Thread.currentThread(); - thread.getUncaughtExceptionHandler().uncaughtException(thread, ie); - } finally { - unsubscribe(); - } - } - - @Override - public boolean isUnsubscribed() { - return cancel.isUnsubscribed(); - } - - @Override - public void unsubscribe() { - if (!cancel.isUnsubscribed()) { - cancel.unsubscribe(); - } - } - - /** - * Adds a general Subscription to this {@code ScheduledAction} that will be unsubscribed - * if the underlying {@code action} completes or the this scheduled action is cancelled. - * - * @param s the Subscription to add - */ - public void add(Subscription s) { - cancel.add(s); - } - - /** - * Adds the given Future to the unsubscription composite in order to support - * cancelling the underlying task in the executor framework. - * @param f the future to add - */ - public void add(final Future f) { - cancel.add(new FutureCompleter(f)); - } - - /** - * Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is - * cancelled or terminates, it can remove itself from this parent. - * - * @param parent - * the parent {@code CompositeSubscription} to add - */ - public void addParent(CompositeSubscription parent) { - cancel.add(new Remover(this, parent)); - } - - /** - * Cancels the captured future if the caller of the call method - * is not the same as the runner of the outer ScheduledAction to - * prevent unnecessary self-interrupting if the unsubscription - * happens from the same thread. - */ - private final class FutureCompleter implements Subscription { - private final Future f; - - private FutureCompleter(Future f) { - this.f = f; - } - - @Override - public void unsubscribe() { - if (ScheduledAction.this.get() != Thread.currentThread()) { - f.cancel(true); - } else { - f.cancel(false); - } - } - @Override - public boolean isUnsubscribed() { - return f.isCancelled(); - } - } - - /** Remove a child subscription from a composite when unsubscribing. */ - private static final class Remover extends AtomicBoolean implements Subscription { - /** */ - private static final long serialVersionUID = 247232374289553518L; - final Subscription s; - final CompositeSubscription parent; - - public Remover(Subscription s, CompositeSubscription parent) { - this.s = s; - this.parent = parent; - } - - @Override - public boolean isUnsubscribed() { - return s.isUnsubscribed(); - } - - @Override - public void unsubscribe() { - if (compareAndSet(false, true)) { - parent.remove(s); - } - } - - } -} diff --git a/src/main/java/rx/schedulers/CachedThreadScheduler.java b/src/main/java/rx/schedulers/CachedThreadScheduler.java index f1cd815b64..b43d8d876d 100644 --- a/src/main/java/rx/schedulers/CachedThreadScheduler.java +++ b/src/main/java/rx/schedulers/CachedThreadScheduler.java @@ -19,7 +19,7 @@ import rx.Subscription; import rx.functions.Action0; import rx.internal.schedulers.NewThreadWorker; -import rx.internal.schedulers.ScheduledAction; +import rx.schedulers.ScheduledAction; import rx.internal.util.RxThreadFactory; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; diff --git a/src/main/java/rx/schedulers/EventLoopsScheduler.java b/src/main/java/rx/schedulers/EventLoopsScheduler.java index 004fbeea78..1126279233 100644 --- a/src/main/java/rx/schedulers/EventLoopsScheduler.java +++ b/src/main/java/rx/schedulers/EventLoopsScheduler.java @@ -19,7 +19,7 @@ import rx.Subscription; import rx.functions.Action0; import rx.internal.schedulers.NewThreadWorker; -import rx.internal.schedulers.ScheduledAction; +import rx.schedulers.ScheduledAction; import rx.internal.util.RxThreadFactory; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; diff --git a/src/main/java/rx/schedulers/ScheduledAction.java b/src/main/java/rx/schedulers/ScheduledAction.java new file mode 100644 index 0000000000..9b28f9f4af --- /dev/null +++ b/src/main/java/rx/schedulers/ScheduledAction.java @@ -0,0 +1,313 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.schedulers; + +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import rx.Subscription; +import rx.exceptions.OnErrorNotImplementedException; +import rx.functions.Action0; +import rx.plugins.RxJavaPlugins; +import rx.subscriptions.CompositeSubscription; + +/** + * A {@code Runnable} that executes an {@code Action0}, allows associating resources with it and can be unsubscribed. + *

System-wide properties: + *

    + *
  • {@code io.reactivex.rxjava.scheduler.interrupt-on-unsubscribe} + *
    Use {@code Future.cancel(true)} to interrupt a running action? {@code "true"} (default) or {@code "false"}.
    + *
  • + *
+ *

Usage

+ * The main focus of this class to be submitted to an {@link java.util.concurrent.ExecutorService} and + * to manage the cancellation with the returned {@link java.util.concurrent.Future} and (optionally) + * with a parent {@link rx.subscriptions.CompositeSubscription}. + *

+ * For example, when a new {@link rx.Scheduler.Worker} is to be implemented upon a custom scheduler, + * the following set of calls need to happen in order to ensure the cancellation and tracking requirements + * of the {@code Scheduler.Worker} is met: + *

+ * final CompositeSubscription parent = new CompositeSubscription();
+ * final ExecutorService executor = ...
+ * //...
+ * @Override
+ * public Subscription schedule(Action0 action) {
+ *     ScheduledAction sa = new ScheduledAction(action);
+ *     
+ *     // setup the tracking relation between the worker and the action
+ *     parent.add(sa);
+ *     sa.addParent(parent);
+ *     
+ *     // schedule the action
+ *     Future f = executor.submit(sa);
+ *     
+ *     // link the future with the action
+ *     sa.add(f);
+ *     
+ *     return sa;
+ * }
+ * @Override
+ * public void unsubscribe() {
+ *     // this will cancel all pending or running tasks
+ *     parent.unsubscribe(); 
+ * }
+ * 
+ *

+ * Depening on the lifecycle of the {@code ExecutorService}, one can avoid tracking the {@code ScheduledAction}s + * in case the service is shut down in the {@code unsubscribe()} call via {@code executor.shutdownNow()} since + * this call cancels all pending tasks and interrupts running tasks. The implementation simplifies to the following: + *

+ * final ExecutorService executor = ...
+ * //...
+ * @Override
+ * public Subscription schedule(Action0 action) {
+ *     ScheduledAction sa = new ScheduledAction(action);
+ *     
+ *     // schedule the action
+ *     Future f = executor.submit(sa);
+ *     
+ *     // link the future with the action
+ *     sa.add(f);
+ *     
+ *     return sa;
+ * }
+ * @Override
+ * public void unsubscribe() {
+ *     // this will cancel all pending or running tasks
+ *     executor.shutdownNow();
+ * }
+ * 
+ *

+ * In case a {@code Scheduler} implementation wants to decorate an existing scheduler, it becomes necessary + * to track {@code ScheduledAction}s on the decorator's level. The following code demonstrates + * the way of doing it: + *

+ * final CompositeSubscription outerParent = new CompositeSubscription();
+ * final Scheduler.Worker actualWorker = ...
+ * // ...
+ * @Override
+ * public Subscription schedule(Action0 action) {
+ *     Subscription s = actualWorker.schedule(action);
+ *     
+ *     // verify if it is indeed a Scheduled action
+ *     if (s instanceof ScheduledAction) {
+ *         ScheduledAction sa = (ScheduledAction)s;
+ *         // order here is important: add the action first
+ *         outerParent.add(sa);
+ *         // so in case it has already finished, this addParent() will remove it immediately
+ *         sa.addParent(outerParent);
+ *     } else {
+ *         throw new IllegalStateException("The worker didn't return a ScheduledAction");
+ *     }
+ *     
+ *     return s;
+ * }
+ * @Override
+ * public void unsubscribe() {
+ *     outerParent.unsubscribe();
+ *     // release the actual worker in some way
+ *     // actualWorker.unsubscribe();
+ *     // somePool.returnObject(actualWorker);
+ * }
+ * 
+ * Note, however, if the {@code actualWorker} above didn't return a ScheduledAction, there is no + * good way of untracking the returned {@code Subscription} (i.e., when to call {@code outerParent.remove(s)}). + */ +public final class ScheduledAction implements Runnable, Subscription { + /** Indicates if the ScheduledActions should be interrupted if cancelled from another thread. */ + static final boolean INTERRUPT_ON_UNSUBSCRIBE; + /** Key to the INTERRUPT_ON_UNSUBSCRIBE flag. */ + static final String KEY_INTERRUPT_ON_UNSUBSCRIBE = "io.reactivex.rxjava.scheduler.interrupt-on-unsubscribe"; + static { + String value = System.getProperty(KEY_INTERRUPT_ON_UNSUBSCRIBE); + INTERRUPT_ON_UNSUBSCRIBE = value == null || "true".equalsIgnoreCase(value); + } + /** The composite to allow unsubscribing associated resources. */ + final CompositeSubscription cancel; + /** The actual action to call. */ + final Action0 action; + /** Holds the thread executing the action. Using the highest order bit to indicate if unsubscribe should interrupt or not. */ + volatile Thread thread; + /** Updater to the {@link #thread} field. */ + static final AtomicReferenceFieldUpdater THREAD + = AtomicReferenceFieldUpdater.newUpdater(ScheduledAction.class, Thread.class, "thread"); + /** The interruptible flag (0, 1). */ + volatile int interruptible; + /** Updater to the {@link #interruptible} field. */ + static final AtomicIntegerFieldUpdater INTERRUPTIBLE + = AtomicIntegerFieldUpdater.newUpdater(ScheduledAction.class, "interruptible"); + /** + * Creates a new instance of ScheduledAction by wrapping an existing Action0 instance + * and allows interruption on unsubscription. + * @param action the action to wrap + */ + public ScheduledAction(Action0 action) { + this(action, INTERRUPT_ON_UNSUBSCRIBE); + } + /** + * Creates a new instance of ScheduledAction by wrapping an existing Action0 instance + * and with the interrupt policy. + * @param action the action to wrap + * @param interruptOnUnsubscribe allows interrupting the action when unsubscribe happens + * from a different thread the action is running on + */ + public ScheduledAction(Action0 action, boolean interruptOnUnsubscribe) { + this.action = action; + this.cancel = new CompositeSubscription(); + INTERRUPTIBLE.lazySet(this, interruptOnUnsubscribe ? 1 : 0); + } + + @Override + public void run() { + try { + saveCurrentThread(); + action.call(); + } catch (Throwable e) { + // nothing to do but print a System error as this is fatal and there is nowhere else to throw this + IllegalStateException ie = null; + if (e instanceof OnErrorNotImplementedException) { + ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e); + } else { + ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e); + } + RxJavaPlugins.getInstance().getErrorHandler().handleError(ie); + Thread thread = Thread.currentThread(); + thread.getUncaughtExceptionHandler().uncaughtException(thread, ie); + } finally { + unsubscribe(); + } + } + + @Override + public boolean isUnsubscribed() { + return cancel.isUnsubscribed(); + } + + @Override + public void unsubscribe() { + if (!cancel.isUnsubscribed()) { + cancel.unsubscribe(); + } + } + + /** + * Adds a general Subscription to this {@code ScheduledAction} that will be unsubscribed + * if the underlying {@code action} completes or the this scheduled action is cancelled. + * + * @param s the Subscription to add + */ + public void add(Subscription s) { + cancel.add(s); + } + + /** + * Adds the given Future to the unsubscription composite in order to support + * cancelling the underlying task in the executor framework. + * @param f the future to add + */ + public void add(final Future f) { + cancel.add(new FutureCompleter(f)); + } + + /** + * Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is + * cancelled or terminates, it can remove itself from this parent. + * + * @param parent + * the parent {@code CompositeSubscription} to add + */ + public void addParent(CompositeSubscription parent) { + cancel.add(new Remover(this, parent)); + } + + /** + * Sets the interrupt-on-unsubscribe policy for this ScheduledAction. + * @param allow {@code true} if unsubscribing this action from another thread should + * interrupt the thread the action is running on. + */ + public void setInterruptOnUnsubscribe(boolean allow) { + INTERRUPTIBLE.lazySet(this, allow ? 1 : 0); + } + /** + * Returns the current state of the interrupt-on-unsubscribe policy. + * @return {@code true} if unsubscribing this action from another thread will + * interrupt the thread the action is running on + */ + public boolean getInterruptOnUnsubscribe() { + return interruptible != 0; + } + + /** Atomically sets the current thread identifier and preserves the interruption allowed flag. */ + private void saveCurrentThread() { + THREAD.lazySet(this, Thread.currentThread()); + } + + /** + * Cancels the captured future if the caller of the call method + * is not the same as the runner of the outer ScheduledAction to + * prevent unnecessary self-interrupting if the unsubscription + * happens from the same thread. + */ + private final class FutureCompleter implements Subscription { + private final Future f; + + private FutureCompleter(Future f) { + this.f = f; + } + + @Override + public void unsubscribe() { + if (thread != Thread.currentThread()) { + f.cancel(interruptible != 0); + } else { + f.cancel(false); + } + } + @Override + public boolean isUnsubscribed() { + return f.isCancelled(); + } + } + + /** Remove a child subscription from a composite when unsubscribing. */ + private static final class Remover extends AtomicBoolean implements Subscription { + /** */ + private static final long serialVersionUID = 247232374289553518L; + final Subscription s; + final CompositeSubscription parent; + + public Remover(Subscription s, CompositeSubscription parent) { + this.s = s; + this.parent = parent; + } + + @Override + public boolean isUnsubscribed() { + return s.isUnsubscribed(); + } + + @Override + public void unsubscribe() { + if (compareAndSet(false, true)) { + parent.remove(s); + } + } + + } +} diff --git a/src/test/java/rx/schedulers/NewThreadSchedulerTest.java b/src/test/java/rx/schedulers/NewThreadSchedulerTest.java index 1c10843af0..dbd25a549e 100644 --- a/src/test/java/rx/schedulers/NewThreadSchedulerTest.java +++ b/src/test/java/rx/schedulers/NewThreadSchedulerTest.java @@ -26,7 +26,7 @@ import rx.Scheduler; import rx.functions.Action0; -import rx.internal.schedulers.ScheduledAction; +import rx.schedulers.ScheduledAction; import rx.subscriptions.Subscriptions; public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { @@ -60,6 +60,7 @@ public void call() { try { run.await(); } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); exception.set(ex); } } @@ -83,4 +84,86 @@ public void call() { worker.unsubscribe(); } } + @Test(timeout = 3000) + public void testDisableInterrupt() throws InterruptedException { + Scheduler.Worker worker = Schedulers.newThread().createWorker(); + try { + final CountDownLatch run = new CountDownLatch(1); + final CountDownLatch wait = new CountDownLatch(1); + final CountDownLatch done = new CountDownLatch(1); + final AtomicReference exception = new AtomicReference(); + final AtomicBoolean interruptFlag = new AtomicBoolean(); + + ScheduledAction sa = (ScheduledAction)worker.schedule(new Action0() { + @Override + public void call() { + try { + run.countDown(); + wait.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + exception.set(ex); + interruptFlag.set(true); + } + done.countDown(); + } + }); + + sa.setInterruptOnUnsubscribe(false); + + run.await(); + + sa.unsubscribe(); + + wait.countDown(); + + done.await(); + + Assert.assertEquals(null, exception.get()); + Assert.assertFalse("Interrupted?!", interruptFlag.get()); + } finally { + worker.unsubscribe(); + } + } + @Test(timeout = 3000) + public void testEnableInterrupt() throws InterruptedException { + Scheduler.Worker worker = Schedulers.newThread().createWorker(); + try { + final CountDownLatch run = new CountDownLatch(1); + final CountDownLatch wait = new CountDownLatch(1); + final CountDownLatch done = new CountDownLatch(1); + final AtomicReference exception = new AtomicReference(); + final AtomicBoolean interruptFlag = new AtomicBoolean(); + + ScheduledAction sa = (ScheduledAction)worker.schedule(new Action0() { + @Override + public void call() { + try { + run.countDown(); + wait.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + exception.set(ex); + interruptFlag.set(true); + } + done.countDown(); + } + }); + + sa.setInterruptOnUnsubscribe(true); + + run.await(); + + sa.unsubscribe(); + + wait.countDown(); + + done.await(); + + Assert.assertNotSame(null, exception.get()); + Assert.assertTrue("Not Interrupted?!", interruptFlag.get()); + } finally { + worker.unsubscribe(); + } + } } From 28bb887e2ee9b92e35703d69caed282580209786 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 3 Feb 2015 22:26:50 +0100 Subject: [PATCH 2/3] Changed system-parameter name. --- src/main/java/rx/internal/schedulers/NewThreadWorker.java | 4 ++-- src/main/java/rx/schedulers/ScheduledAction.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/rx/internal/schedulers/NewThreadWorker.java b/src/main/java/rx/internal/schedulers/NewThreadWorker.java index aca5fed5d8..583c9d4fde 100644 --- a/src/main/java/rx/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/rx/internal/schedulers/NewThreadWorker.java @@ -42,10 +42,10 @@ * directly. See {@code ScheduledAction} for further details on the usage of the class. *

System-wide properties: *

    - *
  • {@code io.reactivex.rxjava.scheduler.jdk6.purge-frequency-millis} + *
  • {@code rx.scheduler.jdk6.purge-frequency-millis} *
    Specifies the purge frequency (in milliseconds) to remove cancelled tasks on a JDK 6 {@code ScheduledExecutorService}. * Default is 1000 milliseconds. The purge Thread name is prefixed by {@code "RxSchedulerPurge-"}.
    - *
  • {@code io.reactivex.rxjava.scheduler.jdk6.purge-force} + *
  • {@code rx.scheduler.jdk6.purge-force} *
    Forces the use of {@code purge()} on JDK 7+ instead of the O(log n) {@code remove()} when a task is cancelled. {@code "true"} or {@code "false"} (default).
    *
  • *
diff --git a/src/main/java/rx/schedulers/ScheduledAction.java b/src/main/java/rx/schedulers/ScheduledAction.java index 9b28f9f4af..19e78b0bb5 100644 --- a/src/main/java/rx/schedulers/ScheduledAction.java +++ b/src/main/java/rx/schedulers/ScheduledAction.java @@ -30,7 +30,7 @@ * A {@code Runnable} that executes an {@code Action0}, allows associating resources with it and can be unsubscribed. *

System-wide properties: *

    - *
  • {@code io.reactivex.rxjava.scheduler.interrupt-on-unsubscribe} + *
  • {@code rx.scheduler.interrupt-on-unsubscribe} *
    Use {@code Future.cancel(true)} to interrupt a running action? {@code "true"} (default) or {@code "false"}.
    *
  • *
@@ -133,7 +133,7 @@ public final class ScheduledAction implements Runnable, Subscription { /** Indicates if the ScheduledActions should be interrupted if cancelled from another thread. */ static final boolean INTERRUPT_ON_UNSUBSCRIBE; /** Key to the INTERRUPT_ON_UNSUBSCRIBE flag. */ - static final String KEY_INTERRUPT_ON_UNSUBSCRIBE = "io.reactivex.rxjava.scheduler.interrupt-on-unsubscribe"; + static final String KEY_INTERRUPT_ON_UNSUBSCRIBE = "rx.scheduler.interrupt-on-unsubscribe"; static { String value = System.getProperty(KEY_INTERRUPT_ON_UNSUBSCRIBE); INTERRUPT_ON_UNSUBSCRIBE = value == null || "true".equalsIgnoreCase(value); From 8de298b7792e1f2b2602a8dcd237858229738295 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 4 Feb 2015 08:38:20 +0100 Subject: [PATCH 3/3] Experimental annotation --- src/main/java/rx/schedulers/ScheduledAction.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/rx/schedulers/ScheduledAction.java b/src/main/java/rx/schedulers/ScheduledAction.java index 19e78b0bb5..824af0b931 100644 --- a/src/main/java/rx/schedulers/ScheduledAction.java +++ b/src/main/java/rx/schedulers/ScheduledAction.java @@ -16,11 +16,10 @@ package rx.schedulers; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.*; import rx.Subscription; +import rx.annotations.Experimental; import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action0; import rx.plugins.RxJavaPlugins; @@ -129,6 +128,7 @@ * Note, however, if the {@code actualWorker} above didn't return a ScheduledAction, there is no * good way of untracking the returned {@code Subscription} (i.e., when to call {@code outerParent.remove(s)}). */ +@Experimental public final class ScheduledAction implements Runnable, Subscription { /** Indicates if the ScheduledActions should be interrupted if cancelled from another thread. */ static final boolean INTERRUPT_ON_UNSUBSCRIBE;