Skip to content

Commit

Permalink
Merge pull request ReactiveX#3141 from akarnokd/SchedulerLeakCheck
Browse files Browse the repository at this point in the history
Improved Scheduler.Worker memory leak detection
  • Loading branch information
akarnokd committed Aug 9, 2015
2 parents ad1fbc2 + 54bb588 commit c302c03
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 59 deletions.
63 changes: 13 additions & 50 deletions src/test/java/rx/schedulers/CachedThreadSchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,13 @@

package rx.schedulers;

import java.lang.management.*;
import java.util.concurrent.*;

import junit.framework.Assert;
import static org.junit.Assert.assertTrue;

import org.junit.Test;

import rx.Observable;
import rx.Scheduler;
import rx.*;
import rx.Scheduler.Worker;
import rx.functions.*;
import rx.internal.schedulers.NewThreadWorker;
import static org.junit.Assert.assertTrue;

public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {

Expand Down Expand Up @@ -74,49 +69,17 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru

@Test(timeout = 30000)
public void testCancelledTaskRetention() throws InterruptedException {
System.out.println("Wait before GC");
Thread.sleep(1000);

System.out.println("GC");
System.gc();

Thread.sleep(1000);


MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
long initial = memHeap.getUsed();

System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);

Scheduler.Worker w = Schedulers.io().createWorker();
for (int i = 0; i < 750000; i++) {
if (i % 50000 == 0) {
System.out.println(" -> still scheduling: " + i);
}
w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
Worker w = Schedulers.io().createWorker();
try {
ExecutorSchedulerTest.testCancelledRetention(w, false);
} finally {
w.unsubscribe();
}

memHeap = memoryMXBean.getHeapMemoryUsage();
long after = memHeap.getUsed();
System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0);

w.unsubscribe();

System.out.println("Wait before second GC");
Thread.sleep(NewThreadWorker.PURGE_FREQUENCY + 2000);

System.out.println("Second GC");
System.gc();

Thread.sleep(1000);

memHeap = memoryMXBean.getHeapMemoryUsage();
long finish = memHeap.getUsed();
System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0);

if (finish > initial * 5) {
Assert.fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d));
w = Schedulers.io().createWorker();
try {
ExecutorSchedulerTest.testCancelledRetention(w, true);
} finally {
w.unsubscribe();
}
}

Expand Down
17 changes: 17 additions & 0 deletions src/test/java/rx/schedulers/ComputationSchedulerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import rx.Observable;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
Expand Down Expand Up @@ -151,4 +152,20 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup
public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
}

@Test(timeout = 30000)
public void testCancelledTaskRetention() throws InterruptedException {
Worker w = Schedulers.computation().createWorker();
try {
ExecutorSchedulerTest.testCancelledRetention(w, false);
} finally {
w.unsubscribe();
}
w = Schedulers.computation().createWorker();
try {
ExecutorSchedulerTest.testCancelledRetention(w, true);
} finally {
w.unsubscribe();
}
}
}
89 changes: 80 additions & 9 deletions src/test/java/rx/schedulers/ExecutorSchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup
public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
}
@Test(timeout = 30000)
public void testCancelledTaskRetention() throws InterruptedException {

public static void testCancelledRetention(Scheduler.Worker w, boolean periodic) throws InterruptedException {
System.out.println("Wait before GC");
Thread.sleep(1000);

Expand All @@ -64,13 +64,32 @@ public void testCancelledTaskRetention() throws InterruptedException {
long initial = memHeap.getUsed();

System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);

Scheduler.Worker w = Schedulers.io().createWorker();
for (int i = 0; i < 500000; i++) {
if (i % 50000 == 0) {
System.out.println(" -> still scheduling: " + i);

int n = 500 * 1000;
if (periodic) {
final CountDownLatch cdl = new CountDownLatch(n);
final Action0 action = new Action0() {
@Override
public void call() {
cdl.countDown();
}
};
for (int i = 0; i < n; i++) {
if (i % 50000 == 0) {
System.out.println(" -> still scheduling: " + i);
}
w.schedulePeriodically(action, 0, 1, TimeUnit.DAYS);
}

System.out.println("Waiting for the first round to finish...");
cdl.await();
} else {
for (int i = 0; i < n; i++) {
if (i % 50000 == 0) {
System.out.println(" -> still scheduling: " + i);
}
w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
}
w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
}

memHeap = memoryMXBean.getHeapMemoryUsage();
Expand All @@ -95,7 +114,30 @@ public void testCancelledTaskRetention() throws InterruptedException {
fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d));
}
}


@Test(timeout = 30000)
public void testCancelledTaskRetention() throws InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
Scheduler s = Schedulers.from(exec);
try {
Scheduler.Worker w = s.createWorker();
try {
testCancelledRetention(w, false);
} finally {
w.unsubscribe();
}

w = s.createWorker();
try {
testCancelledRetention(w, true);
} finally {
w.unsubscribe();
}
} finally {
exec.shutdownNow();
}
}

/** A simple executor which queues tasks and executes them one-by-one if executeOne() is called. */
static final class TestExecutor implements Executor {
final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<Runnable>();
Expand Down Expand Up @@ -204,4 +246,33 @@ public void execute(Runnable command) {

assertFalse(w.tasks.hasSubscriptions());
}

@Test
public void testNoPeriodicTimedTaskPartRetention() throws InterruptedException {
Executor e = new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
};
ExecutorSchedulerWorker w = (ExecutorSchedulerWorker)Schedulers.from(e).createWorker();

final CountDownLatch cdl = new CountDownLatch(1);
final Action0 action = new Action0() {
@Override
public void call() {
cdl.countDown();
}
};

Subscription s = w.schedulePeriodically(action, 0, 1, TimeUnit.DAYS);

assertTrue(w.tasks.hasSubscriptions());

cdl.await();

s.unsubscribe();

assertFalse(w.tasks.hasSubscriptions());
}
}

0 comments on commit c302c03

Please sign in to comment.