diff --git a/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java b/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java index 2b22e78068..9abb52b7ec 100644 --- a/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java +++ b/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java @@ -16,18 +16,13 @@ package rx.schedulers; -import java.lang.management.*; -import java.util.concurrent.*; - -import junit.framework.Assert; +import static org.junit.Assert.assertTrue; import org.junit.Test; -import rx.Observable; -import rx.Scheduler; +import rx.*; +import rx.Scheduler.Worker; import rx.functions.*; -import rx.internal.schedulers.NewThreadWorker; -import static org.junit.Assert.assertTrue; public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { @@ -74,49 +69,17 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru @Test(timeout = 30000) public void testCancelledTaskRetention() throws InterruptedException { - System.out.println("Wait before GC"); - Thread.sleep(1000); - - System.out.println("GC"); - System.gc(); - - Thread.sleep(1000); - - - MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); - MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); - long initial = memHeap.getUsed(); - - System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0); - - Scheduler.Worker w = Schedulers.io().createWorker(); - for (int i = 0; i < 750000; i++) { - if (i % 50000 == 0) { - System.out.println(" -> still scheduling: " + i); - } - w.schedule(Actions.empty(), 1, TimeUnit.DAYS); + Worker w = Schedulers.io().createWorker(); + try { + ExecutorSchedulerTest.testCancelledRetention(w, false); + } finally { + w.unsubscribe(); } - - memHeap = memoryMXBean.getHeapMemoryUsage(); - long after = memHeap.getUsed(); - System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0); - - w.unsubscribe(); - - System.out.println("Wait before second GC"); - Thread.sleep(NewThreadWorker.PURGE_FREQUENCY + 2000); - - System.out.println("Second GC"); - System.gc(); - - Thread.sleep(1000); - - memHeap = memoryMXBean.getHeapMemoryUsage(); - long finish = memHeap.getUsed(); - System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0); - - if (finish > initial * 5) { - Assert.fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d)); + w = Schedulers.io().createWorker(); + try { + ExecutorSchedulerTest.testCancelledRetention(w, true); + } finally { + w.unsubscribe(); } } diff --git a/src/test/java/rx/schedulers/ComputationSchedulerTests.java b/src/test/java/rx/schedulers/ComputationSchedulerTests.java index 881224cfac..7191f60015 100644 --- a/src/test/java/rx/schedulers/ComputationSchedulerTests.java +++ b/src/test/java/rx/schedulers/ComputationSchedulerTests.java @@ -26,6 +26,7 @@ import rx.Observable; import rx.Scheduler; +import rx.Scheduler.Worker; import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func1; @@ -151,4 +152,20 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException { SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler()); } + + @Test(timeout = 30000) + public void testCancelledTaskRetention() throws InterruptedException { + Worker w = Schedulers.computation().createWorker(); + try { + ExecutorSchedulerTest.testCancelledRetention(w, false); + } finally { + w.unsubscribe(); + } + w = Schedulers.computation().createWorker(); + try { + ExecutorSchedulerTest.testCancelledRetention(w, true); + } finally { + w.unsubscribe(); + } + } } diff --git a/src/test/java/rx/schedulers/ExecutorSchedulerTest.java b/src/test/java/rx/schedulers/ExecutorSchedulerTest.java index cdefabc757..ed4e03213d 100644 --- a/src/test/java/rx/schedulers/ExecutorSchedulerTest.java +++ b/src/test/java/rx/schedulers/ExecutorSchedulerTest.java @@ -48,8 +48,8 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException { SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler()); } - @Test(timeout = 30000) - public void testCancelledTaskRetention() throws InterruptedException { + + public static void testCancelledRetention(Scheduler.Worker w, boolean periodic) throws InterruptedException { System.out.println("Wait before GC"); Thread.sleep(1000); @@ -64,13 +64,32 @@ public void testCancelledTaskRetention() throws InterruptedException { long initial = memHeap.getUsed(); System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0); - - Scheduler.Worker w = Schedulers.io().createWorker(); - for (int i = 0; i < 500000; i++) { - if (i % 50000 == 0) { - System.out.println(" -> still scheduling: " + i); + + int n = 500 * 1000; + if (periodic) { + final CountDownLatch cdl = new CountDownLatch(n); + final Action0 action = new Action0() { + @Override + public void call() { + cdl.countDown(); + } + }; + for (int i = 0; i < n; i++) { + if (i % 50000 == 0) { + System.out.println(" -> still scheduling: " + i); + } + w.schedulePeriodically(action, 0, 1, TimeUnit.DAYS); + } + + System.out.println("Waiting for the first round to finish..."); + cdl.await(); + } else { + for (int i = 0; i < n; i++) { + if (i % 50000 == 0) { + System.out.println(" -> still scheduling: " + i); + } + w.schedule(Actions.empty(), 1, TimeUnit.DAYS); } - w.schedule(Actions.empty(), 1, TimeUnit.DAYS); } memHeap = memoryMXBean.getHeapMemoryUsage(); @@ -95,7 +114,30 @@ public void testCancelledTaskRetention() throws InterruptedException { fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d)); } } - + + @Test(timeout = 30000) + public void testCancelledTaskRetention() throws InterruptedException { + ExecutorService exec = Executors.newSingleThreadExecutor(); + Scheduler s = Schedulers.from(exec); + try { + Scheduler.Worker w = s.createWorker(); + try { + testCancelledRetention(w, false); + } finally { + w.unsubscribe(); + } + + w = s.createWorker(); + try { + testCancelledRetention(w, true); + } finally { + w.unsubscribe(); + } + } finally { + exec.shutdownNow(); + } + } + /** A simple executor which queues tasks and executes them one-by-one if executeOne() is called. */ static final class TestExecutor implements Executor { final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); @@ -204,4 +246,33 @@ public void execute(Runnable command) { assertFalse(w.tasks.hasSubscriptions()); } + + @Test + public void testNoPeriodicTimedTaskPartRetention() throws InterruptedException { + Executor e = new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }; + ExecutorSchedulerWorker w = (ExecutorSchedulerWorker)Schedulers.from(e).createWorker(); + + final CountDownLatch cdl = new CountDownLatch(1); + final Action0 action = new Action0() { + @Override + public void call() { + cdl.countDown(); + } + }; + + Subscription s = w.schedulePeriodically(action, 0, 1, TimeUnit.DAYS); + + assertTrue(w.tasks.hasSubscriptions()); + + cdl.await(); + + s.unsubscribe(); + + assertFalse(w.tasks.hasSubscriptions()); + } }