From 3a4a0ecde004627dc29c548bc67a65da34615062 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 19 Dec 2013 23:13:44 +0100 Subject: [PATCH 1/4] Fixed OutOfMemoryError with CPU scheduler in recursive mode. --- .../java/rx/schedulers/ExecutorScheduler.java | 117 +++++++------ .../rx/schedulers/ReentrantScheduler.java | 151 ++++++++++++++++ .../schedulers/ReentrantSchedulerHelper.java | 54 ++++++ .../rx/subscriptions/ForwardSubscription.java | 162 ++++++++++++++++++ .../src/test/java/rx/SchedulersTest.java | 14 +- .../rx/schedulers/ReentrantSchedulerTest.java | 104 +++++++++++ .../ForwardSubscriptionTest.java | 120 +++++++++++++ 7 files changed, 661 insertions(+), 61 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java create mode 100644 rxjava-core/src/main/java/rx/schedulers/ReentrantSchedulerHelper.java create mode 100644 rxjava-core/src/main/java/rx/subscriptions/ForwardSubscription.java create mode 100644 rxjava-core/src/test/java/rx/schedulers/ReentrantSchedulerTest.java create mode 100644 rxjava-core/src/test/java/rx/subscriptions/ForwardSubscriptionTest.java diff --git a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java index 563e609612..dcf61cf0a4 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -25,6 +25,7 @@ import rx.Scheduler; import rx.Subscription; import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.ForwardSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; @@ -33,7 +34,7 @@ *

* Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then a system-wide Timer will be used to handle delayed events. */ -public class ExecutorScheduler extends Scheduler { +public class ExecutorScheduler extends Scheduler implements ReentrantSchedulerHelper { private final Executor executor; public ExecutorScheduler(Executor executor) { @@ -47,18 +48,17 @@ public ExecutorScheduler(ScheduledExecutorService executor) { @Override public Subscription schedulePeriodically(final T state, final Func2 action, long initialDelay, long period, TimeUnit unit) { if (executor instanceof ScheduledExecutorService) { - final CompositeSubscription subscriptions = new CompositeSubscription(); + CompositeSubscription subscription = new CompositeSubscription(); + final ForwardSubscription scheduleSub = new ForwardSubscription(); + final ForwardSubscription actionSub = new ForwardSubscription(); + subscription.add(scheduleSub); + subscription.add(actionSub); - ScheduledFuture f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - Subscription s = action.call(ExecutorScheduler.this, state); - subscriptions.add(s); - } - }, initialDelay, period, unit); + final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription); - subscriptions.add(Subscriptions.from(f)); - return subscriptions; + _scheduler.schedulePeriodically(state, action, initialDelay, period, unit); + + return subscription; } else { return super.schedulePeriodically(state, action, initialDelay, period, unit); @@ -67,81 +67,80 @@ public void run() { @Override public Subscription schedule(final T state, final Func2 action, long delayTime, TimeUnit unit) { - final DiscardableAction discardableAction = new DiscardableAction(state, action); - final Scheduler _scheduler = this; + CompositeSubscription subscription = new CompositeSubscription(); + final ForwardSubscription scheduleSub = new ForwardSubscription(); + final ForwardSubscription actionSub = new ForwardSubscription(); + subscription.add(scheduleSub); + subscription.add(actionSub); + + final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription); + + _scheduler.schedule(state, action, delayTime, unit); + + return subscription; + } + + @Override + public Subscription schedule(T state, Func2 action) { // all subscriptions that may need to be unsubscribed - final CompositeSubscription subscription = new CompositeSubscription(discardableAction); + CompositeSubscription subscription = new CompositeSubscription(); + final ForwardSubscription scheduleSub = new ForwardSubscription(); + final ForwardSubscription actionSub = new ForwardSubscription(); + subscription.add(scheduleSub); + subscription.add(actionSub); + + final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription); + + _scheduler.schedule(state, action); + return subscription; + } + + @Override + public void scheduleTask(Runnable r, ForwardSubscription out, long delayTime, TimeUnit unit) { + Subscription before = out.getSubscription(); if (executor instanceof ScheduledExecutorService) { // we are a ScheduledExecutorService so can do proper scheduling - ScheduledFuture f = ((ScheduledExecutorService) executor).schedule(new Runnable() { - @Override - public void run() { - // when the delay has passed we now do the work on the actual scheduler - Subscription s = discardableAction.call(_scheduler); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); - } - }, delayTime, unit); + ScheduledFuture f = ((ScheduledExecutorService) executor).schedule(r, delayTime, unit); // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens - subscription.add(Subscriptions.from(f)); + out.compareExchange(before, Subscriptions.from(f)); } else { // we are not a ScheduledExecutorService so can't directly schedule if (delayTime == 0) { // no delay so put on the thread-pool right now - Subscription s = schedule(state, action); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); + scheduleTask(r, out); } else { // there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService // to handle the scheduling and once it's ready then execute on this Executor - ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() { - - @Override - public void run() { - // now execute on the real Executor (by using the other overload that schedules for immediate execution) - Subscription s = _scheduler.schedule(state, action); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); - } - }, delayTime, unit); + ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(r, delayTime, unit); // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens - subscription.add(Subscriptions.from(f)); + out.compareExchange(before, Subscriptions.from(f)); } } - return subscription; } - + @Override - public Subscription schedule(T state, Func2 action) { - final DiscardableAction discardableAction = new DiscardableAction(state, action); - final Scheduler _scheduler = this; - // all subscriptions that may need to be unsubscribed - final CompositeSubscription subscription = new CompositeSubscription(discardableAction); - - // work to be done on a thread - Runnable r = new Runnable() { - @Override - public void run() { - Subscription s = discardableAction.call(_scheduler); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); - } - }; - + public void scheduleTask(Runnable r, ForwardSubscription out) { + Subscription before = out.getSubscription(); // submit for immediate execution if (executor instanceof ExecutorService) { // we are an ExecutorService so get a Future back that supports unsubscribe Future f = ((ExecutorService) executor).submit(r); // add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens - subscription.add(Subscriptions.from(f)); + out.compareExchange(before, Subscriptions.from(f)); } else { // we are the lowest common denominator so can't unsubscribe once we execute executor.execute(r); + out.compareExchange(before, Subscriptions.empty()); } + } - return subscription; + @Override + public void scheduleTask(Runnable r, ForwardSubscription out, long initialDelay, long period, TimeUnit unit) { + Subscription before = out.getSubscription(); + ScheduledFuture f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(r, initialDelay, period, unit); + out.compareExchange(before, Subscriptions.from(f)); } - + } diff --git a/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java new file mode 100644 index 0000000000..950d517585 --- /dev/null +++ b/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java @@ -0,0 +1,151 @@ + /** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import java.util.concurrent.TimeUnit; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.ForwardSubscription; +import rx.subscriptions.SerialSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +/** + * Do not re-enter the main scheduler's schedule() method as it will + * unnecessarily chain the subscriptions of every invocation. + */ +public final class ReentrantScheduler extends Scheduler { + final ReentrantSchedulerHelper scheduler; + final ForwardSubscription scheduleSub; + final ForwardSubscription actionSub; + final CompositeSubscription composite; + + public ReentrantScheduler( + ReentrantSchedulerHelper scheduler, + ForwardSubscription scheduleSub, + ForwardSubscription actionSub, + CompositeSubscription composite) { + this.scheduler = scheduler; + this.scheduleSub = scheduleSub; + this.actionSub = actionSub; + this.composite = composite; + } + + @Override + public Subscription schedule(T state, Func2 action) { + if (composite.isUnsubscribed()) { + // don't bother scheduling a task which wouldn't run anyway + return Subscriptions.empty(); + } + Subscription before = actionSub.getSubscription(); + final DiscardableAction discardableAction = new DiscardableAction(state, action); + + actionSub.compareExchange(before, discardableAction); + + Runnable r = new Runnable() { + @Override + public void run() { + Subscription sbefore = actionSub.getSubscription(); + Subscription s = discardableAction.call(ReentrantScheduler.this); + actionSub.compareExchange(sbefore, s); + } + }; + + scheduler.scheduleTask(r, scheduleSub); + + return composite; + } + + @Override + public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { + if (composite.isUnsubscribed()) { + // don't bother scheduling a task which wouldn't run anyway + return Subscriptions.empty(); + } + + Subscription before = actionSub.getSubscription(); + final DiscardableAction discardableAction = new DiscardableAction(state, action); + actionSub.compareExchange(before, discardableAction); + + Runnable r = new Runnable() { + @Override + public void run() { + Subscription sbefore = actionSub.getSubscription(); + Subscription s = discardableAction.call(ReentrantScheduler.this); + actionSub.compareExchange(sbefore, s); + } + }; + scheduler.scheduleTask(r, scheduleSub, delayTime, unit); + + return composite; + } + + @Override + public Subscription schedulePeriodically(T state, Func2 action, long initialDelay, long period, TimeUnit unit) { + if (composite.isUnsubscribed()) { + // don't bother scheduling a task which wouldn't run anyway + return Subscriptions.empty(); + } + + Subscription before = actionSub.getSubscription(); + final PeriodicAction periodicAction = new PeriodicAction(state, action); + actionSub.compareExchange(before, periodicAction); + + Runnable r = new Runnable() { + @Override + public void run() { + Subscription sbefore = actionSub.getSubscription(); + Subscription s = periodicAction.call(ReentrantScheduler.this); + actionSub.compareExchange(sbefore, s); + } + }; + scheduler.scheduleTask(r, scheduleSub, initialDelay, period, unit); + + return composite; + } + /** + * An action that calls the underlying function in a periodic environment. + * @param the state value type + */ + private static final class PeriodicAction implements Subscription, Func1 { + final T state; + final Func2 underlying; + final SerialSubscription ssub; + + public PeriodicAction(T state, Func2 underlying) { + this.state = state; + this.underlying = underlying; + this.ssub = new SerialSubscription(); + } + + @Override + public Subscription call(Scheduler scheduler) { + if (!ssub.isUnsubscribed()) { + Subscription s = underlying.call(scheduler, state); + ssub.setSubscription(s); + return ssub; + } + return Subscriptions.empty(); + } + + @Override + public void unsubscribe() { + ssub.unsubscribe(); + } + } +} diff --git a/rxjava-core/src/main/java/rx/schedulers/ReentrantSchedulerHelper.java b/rxjava-core/src/main/java/rx/schedulers/ReentrantSchedulerHelper.java new file mode 100644 index 0000000000..717d60581e --- /dev/null +++ b/rxjava-core/src/main/java/rx/schedulers/ReentrantSchedulerHelper.java @@ -0,0 +1,54 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import java.util.concurrent.TimeUnit; +import rx.subscriptions.ForwardSubscription; + +/** + * Simple scheduler API used by the ReentrantScheduler to + * communicate with the actual scheduler implementation. + */ +public interface ReentrantSchedulerHelper { + /** + * Schedule a task to be run immediately and update the subscription + * describing the schedule. + * @param r the task to run immediately + * @param out the subscription holding the current schedule subscription + */ + void scheduleTask(Runnable r, ForwardSubscription out); + + /** + * Schedule a task to be run after the delay time and update the subscription + * describing the schedule. + * @param r the task to schedule + * @param out the subscription holding the current schedule subscription + * @param delayTime the time to delay the execution + * @param unit the time unit + */ + void scheduleTask(Runnable r, ForwardSubscription out, long delayTime, TimeUnit unit); + + /** + * Schedule a task to be run after the delay time and after + * each period, then update the subscription describing the schedule. + * @param r the task to schedule + * @param out the subscription holding the current schedule subscription + * @param initialDelay the initial delay of the schedule + * @param period the between period of the schedule + * @param unit the time unit + */ + void scheduleTask(Runnable r, ForwardSubscription out, long initialDelay, long period, TimeUnit unit); +} diff --git a/rxjava-core/src/main/java/rx/subscriptions/ForwardSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/ForwardSubscription.java new file mode 100644 index 0000000000..7a64b260de --- /dev/null +++ b/rxjava-core/src/main/java/rx/subscriptions/ForwardSubscription.java @@ -0,0 +1,162 @@ + /** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import java.util.concurrent.atomic.AtomicReference; +import rx.Subscription; + +/** + * A subscription that holds another subscription and + * allows swapping it in compare-and-swap style and does + * not unsubscribe any replaced values by default. + *

+ * Overloads are provided to perform the unsubscription on + * the old value if required. + */ +public class ForwardSubscription implements Subscription { + /** The atomic reference. */ + final AtomicReference reference = new AtomicReference(); + /** The unsubscription sentinel. */ + private static final Subscription UNSUBSCRIBE_SENTINEL = new Subscription() { + @Override + public void unsubscribe() { + } + }; + /** + * Creates an empty ForwardSubscription. + */ + public ForwardSubscription() { + + } + /** + * Creates a ForwardSubscription with the initial subscription. + * @param initial the initial subscription + */ + public ForwardSubscription(Subscription initial) { + reference.set(initial); + } + /** + * Returns true if this subscription has been unsubscribed. + * @return true if this subscription has been unsubscribed + */ + public boolean isUnsubscribed() { + return reference.get() == UNSUBSCRIBE_SENTINEL; + } + /** + * Returns the current maintained subscription. + * @return the current maintained subscription + */ + public Subscription getSubscription() { + Subscription s = reference.get(); + if (s == UNSUBSCRIBE_SENTINEL) { + return Subscriptions.empty(); + } + return s; + } + /** + * Atomically replace the current subscription but + * don't unsubscribe the old value. + * @param newValue the new subscription to set + */ + public void setSubscription(Subscription newValue) { + setSubscription(newValue, false); + } + /** + * Atomically replace the current subscription and + * unsubscribe the old value id required. + * @param newValue the new subscription to set + */ + public void setSubscription(Subscription newValue, boolean unsubscribeOld) { + Subscription s = replace(newValue); + if (unsubscribeOld && s != null) { + s.unsubscribe(); + } + } + /** + * Atomically replace a new subscription and return the old one. + *

+ * If this subscription is unsubscribed, the newValue subscription + * is unsubscribed and an empty subscription is returned. + * @param newValue the new subscription + * @return the old subscription or empty if this ForwardSubscription is unsubscribed + */ + public Subscription replace(Subscription newValue) { + do { + Subscription old = reference.get(); + if (old == UNSUBSCRIBE_SENTINEL) { + if (newValue != null) { + newValue.unsubscribe(); + } + return Subscriptions.empty(); + } + if (reference.compareAndSet(old, newValue)) { + return old; + } + } while (true); + } + /** + * Atomically change the subscription only if it is the expected value + * but don't unsubscribe the old value. + * If this subscription is unsubscribed, the newValue is immediately + * unsubscribed. + * @param expected the expected subscription + * @param newValue the new subscription + * @return true if successfully replaced, false if this + * subscription is unsubscribed or it didn't contain + * the expected subscription. + */ + public boolean compareExchange(Subscription expected, Subscription newValue) { + return compareExchange(expected, newValue, false); + } + /** + * Atomically change the subscription only if it is the expected value + * and unsubscribe the old one if required. + * @param expected the expected subscription + * @param newValue the new subscription + * @param unsubscribeOld indicates to unsubscribe the old subscription if the exchange succeeded. + * @return true if successfully replaced, false if this + * subscription is unsubscribed or it didn't contain + * the expected subscription. + */ + public boolean compareExchange(Subscription expected, Subscription newValue, boolean unsubscribeOld) { + do { + Subscription old = reference.get(); + if (old == UNSUBSCRIBE_SENTINEL) { + if (newValue != null) { + newValue.unsubscribe(); + } + return false; + } + if (old != expected) { + return false; + } + if (reference.compareAndSet(old, newValue)) { + if (unsubscribeOld && old != null) { + old.unsubscribe(); + } + return true; + } + } while (true); + } + @Override + public void unsubscribe() { + Subscription s = reference.getAndSet(UNSUBSCRIBE_SENTINEL); + if (s != null) { + s.unsubscribe(); + } + } + +} \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/SchedulersTest.java b/rxjava-core/src/test/java/rx/SchedulersTest.java index 62e74f5798..f91dbbc908 100644 --- a/rxjava-core/src/test/java/rx/SchedulersTest.java +++ b/rxjava-core/src/test/java/rx/SchedulersTest.java @@ -327,12 +327,13 @@ public void testRecursiveScheduler2() throws InterruptedException { // use latches instead of Thread.sleep final CountDownLatch latch = new CountDownLatch(10); final CountDownLatch completionLatch = new CountDownLatch(1); + final BooleanSubscription cancel = new BooleanSubscription(); - Observable obs = Observable.create(new OnSubscribeFunc() { + Observable obs = Observable.create(new Observable.OnSubscribeFunc() { @Override public Subscription onSubscribe(final Observer observer) { - return Schedulers.threadPoolForComputation().schedule(new BooleanSubscription(), new Func2() { + return Schedulers.threadPoolForComputation().schedule(cancel, new Func2() { @Override public Subscription call(Scheduler scheduler, BooleanSubscription cancel) { if (cancel.isUnsubscribed()) { @@ -378,6 +379,15 @@ public void onNext(Integer args) { fail("Timed out waiting on onNext latch"); } + + // wait some turn to let the action run + Thread.sleep(100); + + cancel.unsubscribe(); + + // allow seeing the cancellation + Thread.sleep(100); + // now unsubscribe and ensure it stops the recursive loop subscribe.unsubscribe(); System.out.println("unsubscribe"); diff --git a/rxjava-core/src/test/java/rx/schedulers/ReentrantSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/ReentrantSchedulerTest.java new file mode 100644 index 0000000000..9bb03d07a7 --- /dev/null +++ b/rxjava-core/src/test/java/rx/schedulers/ReentrantSchedulerTest.java @@ -0,0 +1,104 @@ + /** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import org.junit.Test; +import rx.Observable; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action1; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +public class ReentrantSchedulerTest { + @Test + public void testReentrantSchedulerIsProvided() throws InterruptedException { + final AtomicReference ref = new AtomicReference(); + final CountDownLatch cdl = new CountDownLatch(1); + Scheduler scheduler = Schedulers.threadPoolForComputation(); + scheduler.schedule(1, new Func2() { + + @Override + public Subscription call(Scheduler t1, Integer t2) { + ref.set(t1); + cdl.countDown(); + return Subscriptions.empty(); + } + }); + + if (!cdl.await(1000, TimeUnit.MILLISECONDS)) { + fail("Should have countdown the latch!"); + } + + assertTrue(ref.get() instanceof ReentrantScheduler); + } + + @Test + public void testReentrantSchedulerIsProvided2() throws InterruptedException { + final AtomicReference ref = new AtomicReference(); + final CountDownLatch cdl = new CountDownLatch(1); + Scheduler scheduler = Schedulers.threadPoolForComputation(); + scheduler.schedule(1, new Func2() { + + @Override + public Subscription call(Scheduler t1, Integer t2) { + ref.set(t1); + cdl.countDown(); + return Subscriptions.empty(); + } + }, 100, TimeUnit.MILLISECONDS); + + if (!cdl.await(1000, TimeUnit.MILLISECONDS)) { + fail("Should have countdown the latch!"); + } + + assertTrue(ref.get() instanceof ReentrantScheduler); + } + + @Test + public void testReentrantSchedulerIsProvided3() throws InterruptedException { + final AtomicReference ref = new AtomicReference(); + final CountDownLatch cdl = new CountDownLatch(1); + Scheduler scheduler = Schedulers.threadPoolForComputation(); + Subscription s = scheduler.schedulePeriodically(1, new Func2() { + int count; + @Override + public Subscription call(Scheduler t1, Integer t2) { + if (count++ == 3) { + cdl.countDown(); + ref.set(t1); + } + return Subscriptions.empty(); + } + }, 100, 100, TimeUnit.MILLISECONDS); + + if (!cdl.await(5000, TimeUnit.MILLISECONDS)) { + fail("Should have countdown the latch!"); + } + + s.unsubscribe(); + + assertTrue(ref.get() instanceof ReentrantScheduler); + } +} diff --git a/rxjava-core/src/test/java/rx/subscriptions/ForwardSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/ForwardSubscriptionTest.java new file mode 100644 index 0000000000..56eb1e8745 --- /dev/null +++ b/rxjava-core/src/test/java/rx/subscriptions/ForwardSubscriptionTest.java @@ -0,0 +1,120 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Test; +import rx.Subscription; + +public class ForwardSubscriptionTest { + @Test + public void testSimple() { + BooleanSubscription b1 = new BooleanSubscription(); + BooleanSubscription b2 = new BooleanSubscription(); + + ForwardSubscription fs = new ForwardSubscription(b1); + + assertFalse(fs.isUnsubscribed()); + + Subscription old = fs.replace(b2); + + assertEquals(old, b1); + assertFalse(b1.isUnsubscribed()); + + fs.unsubscribe(); + + assertTrue(fs.isUnsubscribed()); + assertTrue(b2.isUnsubscribed()); + assertFalse(b1.isUnsubscribed()); + } + @Test + public void testExchange() { + BooleanSubscription b1 = new BooleanSubscription(); + BooleanSubscription b2 = new BooleanSubscription(); + + ForwardSubscription fs = new ForwardSubscription(); + + assertTrue(fs.compareExchange(null, b1)); + + assertFalse(fs.compareExchange(null, b2)); + assertFalse(b2.isUnsubscribed()); + + assertTrue(fs.compareExchange(b1, b2)); + + assertFalse(b1.isUnsubscribed()); + } + @Test + public void testSetAndUnsubscribe() { + BooleanSubscription b1 = new BooleanSubscription(); + BooleanSubscription b2 = new BooleanSubscription(); + + ForwardSubscription fs = new ForwardSubscription(b1); + + fs.setSubscription(b2); + assertFalse(b1.isUnsubscribed()); + + fs.setSubscription(b1, true); + + assertTrue(b2.isUnsubscribed()); + } + @Test + public void testExchangeAndUnsubscribe() { + BooleanSubscription b1 = new BooleanSubscription(); + BooleanSubscription b2 = new BooleanSubscription(); + + ForwardSubscription fs = new ForwardSubscription(); + + assertTrue(fs.compareExchange(null, b1, true)); + + assertFalse(fs.compareExchange(null, b2, true)); + + assertFalse(b1.isUnsubscribed()); + assertFalse(b2.isUnsubscribed()); + + assertTrue(fs.compareExchange(b1, b2, true)); + + assertTrue(b1.isUnsubscribed()); + } + @Test + public void testSetUnsubscribed() { + BooleanSubscription b1 = new BooleanSubscription(); + ForwardSubscription fs = new ForwardSubscription(); + fs.unsubscribe(); + fs.setSubscription(b1); + + assertTrue(b1.isUnsubscribed()); + } + @Test + public void testExchangeUnsubscribed() { + BooleanSubscription b1 = new BooleanSubscription(); + ForwardSubscription fs = new ForwardSubscription(); + fs.unsubscribe(); + fs.compareExchange(null, b1); + + assertTrue(b1.isUnsubscribed()); + } + @Test + public void testExchangeUnsubscribed2() { + BooleanSubscription b1 = new BooleanSubscription(); + ForwardSubscription fs = new ForwardSubscription(); + fs.unsubscribe(); + fs.compareExchange(null, b1, true); + + assertTrue(b1.isUnsubscribed()); + } +} From 3dd937be47db94a91f7b08acf58a2e7c3a386ba8 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 20 Dec 2013 19:19:17 +0100 Subject: [PATCH 2/4] Simplified and refactored the helper interface. --- .../java/rx/schedulers/ExecutorScheduler.java | 54 +++++++---- .../rx/schedulers/ReentrantScheduler.java | 90 +++++++++++++------ .../schedulers/ReentrantSchedulerHelper.java | 54 ----------- 3 files changed, 100 insertions(+), 98 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/schedulers/ReentrantSchedulerHelper.java diff --git a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java index dcf61cf0a4..0f0112482f 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -24,6 +24,7 @@ import rx.Scheduler; import rx.Subscription; +import rx.schedulers.ReentrantScheduler.ReentrantSchedulerHelper; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.ForwardSubscription; import rx.subscriptions.Subscriptions; @@ -34,8 +35,10 @@ *

* Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then a system-wide Timer will be used to handle delayed events. */ -public class ExecutorScheduler extends Scheduler implements ReentrantSchedulerHelper { +public class ExecutorScheduler extends Scheduler { private final Executor executor; + /** The reentrant scheduler helper. */ + private final ReentrantSchedulerHelper helper = new ESReentrantSchedulerHelper(); public ExecutorScheduler(Executor executor) { this.executor = executor; @@ -54,7 +57,7 @@ public Subscription schedulePeriodically(final T state, final Func2 Subscription schedule(final T state, final Func2 Subscription schedule(T state, Func2 f = ((ScheduledExecutorService) executor).schedule(r, delayTime, unit); // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens - out.compareExchange(before, Subscriptions.from(f)); + return Subscriptions.from(f); } else { // we are not a ScheduledExecutorService so can't directly schedule if (delayTime == 0) { // no delay so put on the thread-pool right now - scheduleTask(r, out); + return scheduleTask(r); } else { // there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService // to handle the scheduling and once it's ready then execute on this Executor ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(r, delayTime, unit); // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens - out.compareExchange(before, Subscriptions.from(f)); + return Subscriptions.from(f); } } } - @Override - public void scheduleTask(Runnable r, ForwardSubscription out) { - Subscription before = out.getSubscription(); + public Subscription scheduleTask(Runnable r) { // submit for immediate execution if (executor instanceof ExecutorService) { // we are an ExecutorService so get a Future back that supports unsubscribe Future f = ((ExecutorService) executor).submit(r); // add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens - out.compareExchange(before, Subscriptions.from(f)); + return Subscriptions.from(f); } else { // we are the lowest common denominator so can't unsubscribe once we execute executor.execute(r); - out.compareExchange(before, Subscriptions.empty()); + return Subscriptions.empty(); } } - @Override - public void scheduleTask(Runnable r, ForwardSubscription out, long initialDelay, long period, TimeUnit unit) { - Subscription before = out.getSubscription(); + public Subscription scheduleTask(Runnable r, long initialDelay, long period, TimeUnit unit) { ScheduledFuture f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(r, initialDelay, period, unit); - out.compareExchange(before, Subscriptions.from(f)); + return Subscriptions.from(f); } + /** The reentrant helper. */ + private final class ESReentrantSchedulerHelper implements ReentrantSchedulerHelper { + + @Override + public Subscription scheduleTask(Runnable r) { + return ExecutorScheduler.this.scheduleTask(r); + } + + @Override + public Subscription scheduleTask(Runnable r, long delayTime, TimeUnit unit) { + return ExecutorScheduler.this.scheduleTask(r, delayTime, unit); + } + + @Override + public Subscription scheduleTask(Runnable r, long initialDelay, long period, TimeUnit unit) { + return ExecutorScheduler.this.scheduleTask(r, initialDelay, period, unit); + } + + } } diff --git a/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java index 950d517585..c51e7d862d 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java @@ -1,18 +1,18 @@ - /** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.schedulers; import java.util.concurrent.TimeUnit; @@ -66,9 +66,11 @@ public void run() { } }; - scheduler.scheduleTask(r, scheduleSub); + Subscription sbefore = scheduleSub.getSubscription(); + Subscription s = scheduler.scheduleTask(r); + scheduleSub.compareExchange(sbefore, s); - return composite; + return s; } @Override @@ -77,7 +79,7 @@ public Subscription schedule(T state, Func2 discardableAction = new DiscardableAction(state, action); actionSub.compareExchange(before, discardableAction); @@ -90,11 +92,13 @@ public void run() { actionSub.compareExchange(sbefore, s); } }; - scheduler.scheduleTask(r, scheduleSub, delayTime, unit); + Subscription sbefore = scheduleSub.getSubscription(); + Subscription s = scheduler.scheduleTask(r, delayTime, unit);; + scheduleSub.compareExchange(sbefore, s); - return composite; + return s; } - + @Override public Subscription schedulePeriodically(T state, Func2 action, long initialDelay, long period, TimeUnit unit) { if (composite.isUnsubscribed()) { @@ -114,9 +118,11 @@ public void run() { actionSub.compareExchange(sbefore, s); } }; - scheduler.scheduleTask(r, scheduleSub, initialDelay, period, unit); + Subscription sbefore = scheduleSub.getSubscription(); + Subscription s = scheduler.scheduleTask(r, initialDelay, period, unit); + scheduleSub.compareExchange(sbefore, s); - return composite; + return s; } /** * An action that calls the underlying function in a periodic environment. @@ -126,7 +132,7 @@ private static final class PeriodicAction implements Subscription, Func1 underlying; final SerialSubscription ssub; - + public PeriodicAction(T state, Func2 underlying) { this.state = state; this.underlying = underlying; @@ -142,10 +148,44 @@ public Subscription call(Scheduler scheduler) { } return Subscriptions.empty(); } - + @Override public void unsubscribe() { ssub.unsubscribe(); } } + /** + * Simple scheduler API used by the ReentrantScheduler to + * communicate with the actual scheduler implementation. + */ + public interface ReentrantSchedulerHelper { + /** + * Schedule a task to be run immediately and update the subscription + * describing the schedule. + * @param r the task to run immediately + * @return the subscription to cancel the schedule + */ + Subscription scheduleTask(Runnable r); + + /** + * Schedule a task to be run after the delay time and update the subscription + * describing the schedule. + * @param r the task to schedule + * @param delayTime the time to delay the execution + * @param unit the time unit + * @return the subscription to cancel the schedule + */ + Subscription scheduleTask(Runnable r, long delayTime, TimeUnit unit); + + /** + * Schedule a task to be run after the delay time and after + * each period, then update the subscription describing the schedule. + * @param r the task to schedule + * @param initialDelay the initial delay of the schedule + * @param period the between period of the schedule + * @param unit the time unit + * @return the subscription to cancel the schedule + */ + Subscription scheduleTask(Runnable r, long initialDelay, long period, TimeUnit unit); + } } diff --git a/rxjava-core/src/main/java/rx/schedulers/ReentrantSchedulerHelper.java b/rxjava-core/src/main/java/rx/schedulers/ReentrantSchedulerHelper.java deleted file mode 100644 index 717d60581e..0000000000 --- a/rxjava-core/src/main/java/rx/schedulers/ReentrantSchedulerHelper.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.schedulers; - -import java.util.concurrent.TimeUnit; -import rx.subscriptions.ForwardSubscription; - -/** - * Simple scheduler API used by the ReentrantScheduler to - * communicate with the actual scheduler implementation. - */ -public interface ReentrantSchedulerHelper { - /** - * Schedule a task to be run immediately and update the subscription - * describing the schedule. - * @param r the task to run immediately - * @param out the subscription holding the current schedule subscription - */ - void scheduleTask(Runnable r, ForwardSubscription out); - - /** - * Schedule a task to be run after the delay time and update the subscription - * describing the schedule. - * @param r the task to schedule - * @param out the subscription holding the current schedule subscription - * @param delayTime the time to delay the execution - * @param unit the time unit - */ - void scheduleTask(Runnable r, ForwardSubscription out, long delayTime, TimeUnit unit); - - /** - * Schedule a task to be run after the delay time and after - * each period, then update the subscription describing the schedule. - * @param r the task to schedule - * @param out the subscription holding the current schedule subscription - * @param initialDelay the initial delay of the schedule - * @param period the between period of the schedule - * @param unit the time unit - */ - void scheduleTask(Runnable r, ForwardSubscription out, long initialDelay, long period, TimeUnit unit); -} From 487eeca617c1360a98ca81849f1523a92d792e4a Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 20 Dec 2013 20:02:14 +0100 Subject: [PATCH 3/4] Refactorings. --- rxjava-core/src/main/java/rx/Scheduler.java | 48 ++++++++++ .../java/rx/schedulers/ExecutorScheduler.java | 65 ++++++------- .../rx/schedulers/ReentrantScheduler.java | 91 ++++++------------- 3 files changed, 109 insertions(+), 95 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index b01872f226..5d9bba7b7f 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -255,4 +255,52 @@ public long now() { public int degreeOfParallelism() { return Runtime.getRuntime().availableProcessors(); } + /** + * Schedule a task to be run immediately. + * @param r the task to run immediately + * @return the subscription to cancel the schedule + */ + public Subscription scheduleRunnable(final Runnable r) { + return schedule(new Action0() { + @Override + public void call() { + r.run(); + } + }); + } + + /** + * Schedule a task to be run after the delay time. + * @param r the task to schedule + * @param delayTime the time to delay the execution + * @param unit the time unit + * @return the subscription to cancel the schedule + */ + public Subscription scheduleRunnable(final Runnable r, long delayTime, TimeUnit unit) { + return schedule(new Action0() { + @Override + public void call() { + r.run(); + } + }, delayTime, unit); + } + + /** + * Schedule a task to be run after the delay time and after + * each period. + * @param r the task to schedule + * @param initialDelay the initial delay of the schedule + * @param period the between period of the schedule + * @param unit the time unit + * @return the subscription to cancel the schedule + */ + public Subscription scheduleRunnable(final Runnable r, long initialDelay, long period, TimeUnit unit) { + return schedulePeriodically(new Action0() { + @Override + public void call() { + r.run(); + } + }, initialDelay, period, unit); + + } } diff --git a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java index 0f0112482f..c37bc0c5c1 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -24,7 +24,6 @@ import rx.Scheduler; import rx.Subscription; -import rx.schedulers.ReentrantScheduler.ReentrantSchedulerHelper; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.ForwardSubscription; import rx.subscriptions.Subscriptions; @@ -37,8 +36,6 @@ */ public class ExecutorScheduler extends Scheduler { private final Executor executor; - /** The reentrant scheduler helper. */ - private final ReentrantSchedulerHelper helper = new ESReentrantSchedulerHelper(); public ExecutorScheduler(Executor executor) { this.executor = executor; @@ -57,7 +54,7 @@ public Subscription schedulePeriodically(final T state, final Func2 Subscription schedule(final T state, final Func2 Subscription schedule(T state, Func2 f = ((ScheduledExecutorService) executor).schedule(r, delayTime, unit); @@ -109,7 +107,7 @@ protected Subscription scheduleTask(Runnable r, long delayTime, TimeUnit unit) { // we are not a ScheduledExecutorService so can't directly schedule if (delayTime == 0) { // no delay so put on the thread-pool right now - return scheduleTask(r); + return scheduleRunnable(r); } else { // there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService // to handle the scheduling and once it's ready then execute on this Executor @@ -120,7 +118,8 @@ protected Subscription scheduleTask(Runnable r, long delayTime, TimeUnit unit) { } } - public Subscription scheduleTask(Runnable r) { + @Override + public Subscription scheduleRunnable(Runnable r) { // submit for immediate execution if (executor instanceof ExecutorService) { // we are an ExecutorService so get a Future back that supports unsubscribe @@ -134,29 +133,33 @@ public Subscription scheduleTask(Runnable r) { } } - public Subscription scheduleTask(Runnable r, long initialDelay, long period, TimeUnit unit) { - ScheduledFuture f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(r, initialDelay, period, unit); - - return Subscriptions.from(f); - } - - /** The reentrant helper. */ - private final class ESReentrantSchedulerHelper implements ReentrantSchedulerHelper { - - @Override - public Subscription scheduleTask(Runnable r) { - return ExecutorScheduler.this.scheduleTask(r); - } - - @Override - public Subscription scheduleTask(Runnable r, long delayTime, TimeUnit unit) { - return ExecutorScheduler.this.scheduleTask(r, delayTime, unit); - } + @Override + public Subscription scheduleRunnable(final Runnable r, long initialDelay, + final long period, final TimeUnit unit) { + if (executor instanceof ScheduledExecutorService) { + ScheduledFuture f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(r, initialDelay, period, unit); - @Override - public Subscription scheduleTask(Runnable r, long initialDelay, long period, TimeUnit unit) { - return ExecutorScheduler.this.scheduleTask(r, initialDelay, period, unit); + return Subscriptions.from(f); + } else { + final ForwardSubscription fs = new ForwardSubscription(); + Runnable rerun = new Runnable() { + @Override + public void run() { + if (!fs.isUnsubscribed()) { + long time = System.nanoTime(); + r.run(); + long delta = Math.max(0L, System.nanoTime() - time); + long periodNanos = Math.max(0L, unit.toNanos(period) - delta); + + Subscription before = fs.getSubscription(); + Subscription s = scheduleRunnable(this, periodNanos, TimeUnit.NANOSECONDS); + fs.compareExchange(before, s); + } + } + }; + Subscription s = scheduleRunnable(rerun, initialDelay, unit); + fs.compareExchange(null, s); + return fs; } - } } diff --git a/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java index c51e7d862d..2a9cda0916 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java @@ -30,17 +30,17 @@ * unnecessarily chain the subscriptions of every invocation. */ public final class ReentrantScheduler extends Scheduler { - final ReentrantSchedulerHelper scheduler; + final Scheduler parent; final ForwardSubscription scheduleSub; final ForwardSubscription actionSub; final CompositeSubscription composite; public ReentrantScheduler( - ReentrantSchedulerHelper scheduler, + Scheduler parent, ForwardSubscription scheduleSub, ForwardSubscription actionSub, CompositeSubscription composite) { - this.scheduler = scheduler; + this.parent = parent; this.scheduleSub = scheduleSub; this.actionSub = actionSub; this.composite = composite; @@ -57,17 +57,10 @@ public Subscription schedule(T state, Func2 Subscription schedule(T state, Func2 discardableAction = new DiscardableAction(state, action); actionSub.compareExchange(before, discardableAction); - Runnable r = new Runnable() { - @Override - public void run() { - Subscription sbefore = actionSub.getSubscription(); - Subscription s = discardableAction.call(ReentrantScheduler.this); - actionSub.compareExchange(sbefore, s); - } - }; + Runnable r = new RunTask(discardableAction); + Subscription sbefore = scheduleSub.getSubscription(); - Subscription s = scheduler.scheduleTask(r, delayTime, unit);; + Subscription s = parent.scheduleRunnable(r, delayTime, unit); scheduleSub.compareExchange(sbefore, s); return s; @@ -110,20 +97,30 @@ public Subscription schedulePeriodically(T state, Func2 periodicAction = new PeriodicAction(state, action); actionSub.compareExchange(before, periodicAction); - Runnable r = new Runnable() { - @Override - public void run() { - Subscription sbefore = actionSub.getSubscription(); - Subscription s = periodicAction.call(ReentrantScheduler.this); - actionSub.compareExchange(sbefore, s); - } - }; + Runnable r = new RunTask(periodicAction); + Subscription sbefore = scheduleSub.getSubscription(); - Subscription s = scheduler.scheduleTask(r, initialDelay, period, unit); + Subscription s = parent.scheduleRunnable(r, initialDelay, period, unit); scheduleSub.compareExchange(sbefore, s); return s; } + /** The task runner. */ + private final class RunTask implements Runnable { + final Func1 action; + + public RunTask(Func1 action) { + this.action = action; + } + + @Override + public void run() { + Subscription sbefore = actionSub.getSubscription(); + Subscription s = action.call(ReentrantScheduler.this); + actionSub.compareExchange(sbefore, s); + } + + } /** * An action that calls the underlying function in a periodic environment. * @param the state value type @@ -154,38 +151,4 @@ public void unsubscribe() { ssub.unsubscribe(); } } - /** - * Simple scheduler API used by the ReentrantScheduler to - * communicate with the actual scheduler implementation. - */ - public interface ReentrantSchedulerHelper { - /** - * Schedule a task to be run immediately and update the subscription - * describing the schedule. - * @param r the task to run immediately - * @return the subscription to cancel the schedule - */ - Subscription scheduleTask(Runnable r); - - /** - * Schedule a task to be run after the delay time and update the subscription - * describing the schedule. - * @param r the task to schedule - * @param delayTime the time to delay the execution - * @param unit the time unit - * @return the subscription to cancel the schedule - */ - Subscription scheduleTask(Runnable r, long delayTime, TimeUnit unit); - - /** - * Schedule a task to be run after the delay time and after - * each period, then update the subscription describing the schedule. - * @param r the task to schedule - * @param initialDelay the initial delay of the schedule - * @param period the between period of the schedule - * @param unit the time unit - * @return the subscription to cancel the schedule - */ - Subscription scheduleTask(Runnable r, long initialDelay, long period, TimeUnit unit); - } } From 0ea90fe569f2e9205a8764b32563a93742bcd1d8 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 20 Dec 2013 21:20:59 +0100 Subject: [PATCH 4/4] Revised structure and behavior --- .../java/rx/schedulers/ExecutorScheduler.java | 27 +-- .../rx/schedulers/ReentrantScheduler.java | 40 ++-- .../rx/subscriptions/ForwardSubscription.java | 162 ----------------- .../IncrementalSubscription.java | 171 ++++++++++++++++++ .../ForwardSubscriptionTest.java | 120 ------------ .../IncrementalSubscriptionTest.java | 73 ++++++++ 6 files changed, 279 insertions(+), 314 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/subscriptions/ForwardSubscription.java create mode 100644 rxjava-core/src/main/java/rx/subscriptions/IncrementalSubscription.java delete mode 100644 rxjava-core/src/test/java/rx/subscriptions/ForwardSubscriptionTest.java create mode 100644 rxjava-core/src/test/java/rx/subscriptions/IncrementalSubscriptionTest.java diff --git a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java index c37bc0c5c1..a56e90eff5 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -25,7 +25,7 @@ import rx.Scheduler; import rx.Subscription; import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.ForwardSubscription; +import rx.subscriptions.IncrementalSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; @@ -49,8 +49,8 @@ public ExecutorScheduler(ScheduledExecutorService executor) { public Subscription schedulePeriodically(final T state, final Func2 action, long initialDelay, long period, TimeUnit unit) { if (executor instanceof ScheduledExecutorService) { CompositeSubscription subscription = new CompositeSubscription(); - final ForwardSubscription scheduleSub = new ForwardSubscription(); - final ForwardSubscription actionSub = new ForwardSubscription(); + final IncrementalSubscription scheduleSub = new IncrementalSubscription(); + final IncrementalSubscription actionSub = new IncrementalSubscription(); subscription.add(scheduleSub); subscription.add(actionSub); @@ -68,8 +68,8 @@ public Subscription schedulePeriodically(final T state, final Func2 Subscription schedule(final T state, final Func2 action, long delayTime, TimeUnit unit) { CompositeSubscription subscription = new CompositeSubscription(); - final ForwardSubscription scheduleSub = new ForwardSubscription(); - final ForwardSubscription actionSub = new ForwardSubscription(); + final IncrementalSubscription scheduleSub = new IncrementalSubscription(); + final IncrementalSubscription actionSub = new IncrementalSubscription(); subscription.add(scheduleSub); subscription.add(actionSub); @@ -84,8 +84,8 @@ public Subscription schedule(final T state, final Func2 Subscription schedule(T state, Func2 action) { // all subscriptions that may need to be unsubscribed CompositeSubscription subscription = new CompositeSubscription(); - final ForwardSubscription scheduleSub = new ForwardSubscription(); - final ForwardSubscription actionSub = new ForwardSubscription(); + final IncrementalSubscription scheduleSub = new IncrementalSubscription(); + final IncrementalSubscription actionSub = new IncrementalSubscription(); subscription.add(scheduleSub); subscription.add(actionSub); @@ -141,7 +141,7 @@ public Subscription scheduleRunnable(final Runnable r, long initialDelay, return Subscriptions.from(f); } else { - final ForwardSubscription fs = new ForwardSubscription(); + final IncrementalSubscription fs = new IncrementalSubscription(); Runnable rerun = new Runnable() { @Override public void run() { @@ -150,15 +150,18 @@ public void run() { r.run(); long delta = Math.max(0L, System.nanoTime() - time); long periodNanos = Math.max(0L, unit.toNanos(period) - delta); - - Subscription before = fs.getSubscription(); + + long index = fs.nextIndex(); Subscription s = scheduleRunnable(this, periodNanos, TimeUnit.NANOSECONDS); - fs.compareExchange(before, s); + fs.compareExchange(index, s, false); } } }; + + long index = fs.nextIndex(); + Subscription s = scheduleRunnable(rerun, initialDelay, unit); - fs.compareExchange(null, s); + fs.compareExchange(index, s, false); return fs; } } diff --git a/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java index 2a9cda0916..87680fedb1 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java @@ -19,7 +19,7 @@ import rx.Scheduler; import rx.Subscription; import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.ForwardSubscription; +import rx.subscriptions.IncrementalSubscription; import rx.subscriptions.SerialSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Func1; @@ -31,14 +31,14 @@ */ public final class ReentrantScheduler extends Scheduler { final Scheduler parent; - final ForwardSubscription scheduleSub; - final ForwardSubscription actionSub; + final IncrementalSubscription scheduleSub; + final IncrementalSubscription actionSub; final CompositeSubscription composite; public ReentrantScheduler( Scheduler parent, - ForwardSubscription scheduleSub, - ForwardSubscription actionSub, + IncrementalSubscription scheduleSub, + IncrementalSubscription actionSub, CompositeSubscription composite) { this.parent = parent; this.scheduleSub = scheduleSub; @@ -52,16 +52,16 @@ public Subscription schedule(T state, Func2 discardableAction = new DiscardableAction(state, action); - actionSub.compareExchange(before, discardableAction); + actionSub.compareExchange(index, discardableAction, false); Runnable r = new RunTask(discardableAction); - Subscription sbefore = scheduleSub.getSubscription(); + long sindex = scheduleSub.nextIndex(); Subscription s = parent.scheduleRunnable(r); - scheduleSub.compareExchange(sbefore, s); + scheduleSub.compareExchange(sindex, s, false); return s; } @@ -73,15 +73,15 @@ public Subscription schedule(T state, Func2 discardableAction = new DiscardableAction(state, action); - actionSub.compareExchange(before, discardableAction); + actionSub.compareExchange(index, discardableAction, false); Runnable r = new RunTask(discardableAction); - Subscription sbefore = scheduleSub.getSubscription(); + long sindex = scheduleSub.nextIndex(); Subscription s = parent.scheduleRunnable(r, delayTime, unit); - scheduleSub.compareExchange(sbefore, s); + scheduleSub.compareExchange(sindex, s, false); return s; } @@ -92,16 +92,16 @@ public Subscription schedulePeriodically(T state, Func2 periodicAction = new PeriodicAction(state, action); - actionSub.compareExchange(before, periodicAction); + actionSub.compareExchange(index, periodicAction, false); Runnable r = new RunTask(periodicAction); - Subscription sbefore = scheduleSub.getSubscription(); + long sindex = scheduleSub.nextIndex(); Subscription s = parent.scheduleRunnable(r, initialDelay, period, unit); - scheduleSub.compareExchange(sbefore, s); + scheduleSub.compareExchange(sindex, s, false); return s; } @@ -115,9 +115,9 @@ public RunTask(Func1 action) { @Override public void run() { - Subscription sbefore = actionSub.getSubscription(); + long index = actionSub.nextIndex(); Subscription s = action.call(ReentrantScheduler.this); - actionSub.compareExchange(sbefore, s); + actionSub.compareExchange(index, s, false); } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/ForwardSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/ForwardSubscription.java deleted file mode 100644 index 7a64b260de..0000000000 --- a/rxjava-core/src/main/java/rx/subscriptions/ForwardSubscription.java +++ /dev/null @@ -1,162 +0,0 @@ - /** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.subscriptions; - -import java.util.concurrent.atomic.AtomicReference; -import rx.Subscription; - -/** - * A subscription that holds another subscription and - * allows swapping it in compare-and-swap style and does - * not unsubscribe any replaced values by default. - *

- * Overloads are provided to perform the unsubscription on - * the old value if required. - */ -public class ForwardSubscription implements Subscription { - /** The atomic reference. */ - final AtomicReference reference = new AtomicReference(); - /** The unsubscription sentinel. */ - private static final Subscription UNSUBSCRIBE_SENTINEL = new Subscription() { - @Override - public void unsubscribe() { - } - }; - /** - * Creates an empty ForwardSubscription. - */ - public ForwardSubscription() { - - } - /** - * Creates a ForwardSubscription with the initial subscription. - * @param initial the initial subscription - */ - public ForwardSubscription(Subscription initial) { - reference.set(initial); - } - /** - * Returns true if this subscription has been unsubscribed. - * @return true if this subscription has been unsubscribed - */ - public boolean isUnsubscribed() { - return reference.get() == UNSUBSCRIBE_SENTINEL; - } - /** - * Returns the current maintained subscription. - * @return the current maintained subscription - */ - public Subscription getSubscription() { - Subscription s = reference.get(); - if (s == UNSUBSCRIBE_SENTINEL) { - return Subscriptions.empty(); - } - return s; - } - /** - * Atomically replace the current subscription but - * don't unsubscribe the old value. - * @param newValue the new subscription to set - */ - public void setSubscription(Subscription newValue) { - setSubscription(newValue, false); - } - /** - * Atomically replace the current subscription and - * unsubscribe the old value id required. - * @param newValue the new subscription to set - */ - public void setSubscription(Subscription newValue, boolean unsubscribeOld) { - Subscription s = replace(newValue); - if (unsubscribeOld && s != null) { - s.unsubscribe(); - } - } - /** - * Atomically replace a new subscription and return the old one. - *

- * If this subscription is unsubscribed, the newValue subscription - * is unsubscribed and an empty subscription is returned. - * @param newValue the new subscription - * @return the old subscription or empty if this ForwardSubscription is unsubscribed - */ - public Subscription replace(Subscription newValue) { - do { - Subscription old = reference.get(); - if (old == UNSUBSCRIBE_SENTINEL) { - if (newValue != null) { - newValue.unsubscribe(); - } - return Subscriptions.empty(); - } - if (reference.compareAndSet(old, newValue)) { - return old; - } - } while (true); - } - /** - * Atomically change the subscription only if it is the expected value - * but don't unsubscribe the old value. - * If this subscription is unsubscribed, the newValue is immediately - * unsubscribed. - * @param expected the expected subscription - * @param newValue the new subscription - * @return true if successfully replaced, false if this - * subscription is unsubscribed or it didn't contain - * the expected subscription. - */ - public boolean compareExchange(Subscription expected, Subscription newValue) { - return compareExchange(expected, newValue, false); - } - /** - * Atomically change the subscription only if it is the expected value - * and unsubscribe the old one if required. - * @param expected the expected subscription - * @param newValue the new subscription - * @param unsubscribeOld indicates to unsubscribe the old subscription if the exchange succeeded. - * @return true if successfully replaced, false if this - * subscription is unsubscribed or it didn't contain - * the expected subscription. - */ - public boolean compareExchange(Subscription expected, Subscription newValue, boolean unsubscribeOld) { - do { - Subscription old = reference.get(); - if (old == UNSUBSCRIBE_SENTINEL) { - if (newValue != null) { - newValue.unsubscribe(); - } - return false; - } - if (old != expected) { - return false; - } - if (reference.compareAndSet(old, newValue)) { - if (unsubscribeOld && old != null) { - old.unsubscribe(); - } - return true; - } - } while (true); - } - @Override - public void unsubscribe() { - Subscription s = reference.getAndSet(UNSUBSCRIBE_SENTINEL); - if (s != null) { - s.unsubscribe(); - } - } - -} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/subscriptions/IncrementalSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/IncrementalSubscription.java new file mode 100644 index 0000000000..5ebdf8c1ce --- /dev/null +++ b/rxjava-core/src/main/java/rx/subscriptions/IncrementalSubscription.java @@ -0,0 +1,171 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import rx.Subscription; + +/** + * Subscription that swaps the underlying subscription only if + * the associated index is greater than the current. + * + * Usage example: + *

+ * IncrementalSubscription is = ...
+ * 
+ * long index = is.nextIndex();
+ * 
+ * // do some recursive work
+ * Subscription s = ...
+ * 
+ * is.compareExchange(index, s, false);
+ * 
+ * + * This will replace the current subscription only if its index + * is less than the new index. This way, if a recursive call + * has already set a more recent subscription, this won't + * replace it back to an older one. + */ +public class IncrementalSubscription implements Subscription { + /** The current index. */ + protected final AtomicLong index = new AtomicLong(); + /** The current reference. */ + protected final AtomicReference reference = new AtomicReference(); + /** The unsubscription sentinel. */ + private static final IndexedRef UNSUBSCRIBE_SENTINEL + = new IndexedRef(Long.MAX_VALUE, Subscriptions.empty()); + + public boolean isUnsubscribed() { + return reference.get() == UNSUBSCRIBE_SENTINEL; + } + + public IncrementalSubscription() { + this(Subscriptions.empty()); + } + public IncrementalSubscription(Subscription initial) { + reference.set(new IndexedRef(0L, initial)); + } + + public Subscription getSubscription() { + return reference.get().value(); // the sentinel holds empty anyway + } + + /** + * Generate the next index. + * @return the next index + */ + public long nextIndex() { + return index.incrementAndGet(); + } + + /** + * Return the current index. For testing purposes only. + * @return the current index + */ + /* public */protected long index() { + return index.get(); + } + /** + * Sets the given subscription as the latest value. + * @param newValue the new subscription to set + * @param unsubscribeOld unsubscribe the old subscription? + */ + public void setSubscription(Subscription newValue, boolean unsubscribeOld) { + do { + IndexedRef r = reference.get(); + if (r == UNSUBSCRIBE_SENTINEL) { + if (newValue != null) { + newValue.unsubscribe(); + } + return; + } + long newIndex = nextIndex(); + if (r.index() < newIndex) { + IndexedRef newRef = new IndexedRef(newIndex, newValue); + if (reference.compareAndSet(r, newRef)) { + if (unsubscribeOld) { + r.unsubscribe(); + } + return; + } + } + } while (true); + } + /** + * Compare the current index with the new index and if newIndex + * is greater, exchange the subscription with the new value + * and optionally unsubscribe the old one. + * @param newIndex + * @param newValue + * @param unsubscribeOld + * @return true if the exchange succeeded, false if not. + */ + public boolean compareExchange(long newIndex, Subscription newValue, boolean unsubscribeOld) { + do { + IndexedRef r = reference.get(); + if (r == UNSUBSCRIBE_SENTINEL) { + if (newValue != null) { + newValue.unsubscribe(); + } + return false; + } + if (r.index >= newIndex) { + return false; + } + IndexedRef newRef = new IndexedRef(newIndex, newValue); + if (reference.compareAndSet(r, newRef)) { + if (unsubscribeOld) { + r.unsubscribe(); + } + return true; + } + } while (true); + } + + + /** The indexed reference object. */ + protected static final class IndexedRef implements Subscription { + private final long index; + private final Subscription value; + + public IndexedRef(long index, Subscription value) { + this.index = index; + this.value = value; + } + + public long index() { + return index; + } + + public Subscription value() { + return value; + } + + @Override + public void unsubscribe() { + if (value != null) { + value.unsubscribe(); + } + } + } + + @Override + public void unsubscribe() { + reference.getAndSet(UNSUBSCRIBE_SENTINEL).unsubscribe(); + } + +} diff --git a/rxjava-core/src/test/java/rx/subscriptions/ForwardSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/ForwardSubscriptionTest.java deleted file mode 100644 index 56eb1e8745..0000000000 --- a/rxjava-core/src/test/java/rx/subscriptions/ForwardSubscriptionTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.subscriptions; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import org.junit.Test; -import rx.Subscription; - -public class ForwardSubscriptionTest { - @Test - public void testSimple() { - BooleanSubscription b1 = new BooleanSubscription(); - BooleanSubscription b2 = new BooleanSubscription(); - - ForwardSubscription fs = new ForwardSubscription(b1); - - assertFalse(fs.isUnsubscribed()); - - Subscription old = fs.replace(b2); - - assertEquals(old, b1); - assertFalse(b1.isUnsubscribed()); - - fs.unsubscribe(); - - assertTrue(fs.isUnsubscribed()); - assertTrue(b2.isUnsubscribed()); - assertFalse(b1.isUnsubscribed()); - } - @Test - public void testExchange() { - BooleanSubscription b1 = new BooleanSubscription(); - BooleanSubscription b2 = new BooleanSubscription(); - - ForwardSubscription fs = new ForwardSubscription(); - - assertTrue(fs.compareExchange(null, b1)); - - assertFalse(fs.compareExchange(null, b2)); - assertFalse(b2.isUnsubscribed()); - - assertTrue(fs.compareExchange(b1, b2)); - - assertFalse(b1.isUnsubscribed()); - } - @Test - public void testSetAndUnsubscribe() { - BooleanSubscription b1 = new BooleanSubscription(); - BooleanSubscription b2 = new BooleanSubscription(); - - ForwardSubscription fs = new ForwardSubscription(b1); - - fs.setSubscription(b2); - assertFalse(b1.isUnsubscribed()); - - fs.setSubscription(b1, true); - - assertTrue(b2.isUnsubscribed()); - } - @Test - public void testExchangeAndUnsubscribe() { - BooleanSubscription b1 = new BooleanSubscription(); - BooleanSubscription b2 = new BooleanSubscription(); - - ForwardSubscription fs = new ForwardSubscription(); - - assertTrue(fs.compareExchange(null, b1, true)); - - assertFalse(fs.compareExchange(null, b2, true)); - - assertFalse(b1.isUnsubscribed()); - assertFalse(b2.isUnsubscribed()); - - assertTrue(fs.compareExchange(b1, b2, true)); - - assertTrue(b1.isUnsubscribed()); - } - @Test - public void testSetUnsubscribed() { - BooleanSubscription b1 = new BooleanSubscription(); - ForwardSubscription fs = new ForwardSubscription(); - fs.unsubscribe(); - fs.setSubscription(b1); - - assertTrue(b1.isUnsubscribed()); - } - @Test - public void testExchangeUnsubscribed() { - BooleanSubscription b1 = new BooleanSubscription(); - ForwardSubscription fs = new ForwardSubscription(); - fs.unsubscribe(); - fs.compareExchange(null, b1); - - assertTrue(b1.isUnsubscribed()); - } - @Test - public void testExchangeUnsubscribed2() { - BooleanSubscription b1 = new BooleanSubscription(); - ForwardSubscription fs = new ForwardSubscription(); - fs.unsubscribe(); - fs.compareExchange(null, b1, true); - - assertTrue(b1.isUnsubscribed()); - } -} diff --git a/rxjava-core/src/test/java/rx/subscriptions/IncrementalSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/IncrementalSubscriptionTest.java new file mode 100644 index 0000000000..30b778c231 --- /dev/null +++ b/rxjava-core/src/test/java/rx/subscriptions/IncrementalSubscriptionTest.java @@ -0,0 +1,73 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; + +public class IncrementalSubscriptionTest { + IncrementalSubscription is; + @Before + public void before() { + is = new IncrementalSubscription(); + } + @Test + public void testSimple() { + assertFalse(is.isUnsubscribed()); + + BooleanSubscription b1 = new BooleanSubscription(); + + is.setSubscription(b1, false); + + assertEquals(1L, is.index()); + + long index = is.nextIndex(); + + assertTrue(is.compareExchange(index, b1, false)); + + assertFalse(is.compareExchange(index, b1, false)); + + assertEquals(2L, is.index()); + + index = is.nextIndex(); + assertTrue(is.compareExchange(index, b1, false)); + + is.unsubscribe(); + + BooleanSubscription b2 = new BooleanSubscription(); + + assertFalse(is.compareExchange(index, b2, false)); + + assertTrue(is.isUnsubscribed()); + assertTrue(b2.isUnsubscribed()); + + assertEquals(3L, is.index()); + } + @Test + public void testSwapOrder() { + BooleanSubscription b1 = new BooleanSubscription(); + + long idx1 = is.nextIndex(); + + long idx2 = is.nextIndex(); + assertTrue(is.compareExchange(idx2, b1, false)); + + assertFalse(is.compareExchange(idx1, b1, false)); + } +}