Skip to content

Commit

Permalink
scheduleAtFixedRate would hang (#42993)
Browse files Browse the repository at this point in the history
Though not in use in elasticsearch currently, it seems surprising that
ThreadPool.scheduler().scheduleAtFixedRate would hang. A recurring
scheduled task is never completed (except on failure) and we test for
exceptions using RunnableFuture.get(), which hangs for periodic tasks.
Fixed by checking that task is done before calling .get().
  • Loading branch information
henningandersen committed Jun 11, 2019
1 parent 29615b3 commit 66f7740
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int
*/
public static Throwable rethrowErrors(Runnable runnable) {
if (runnable instanceof RunnableFuture) {
assert ((RunnableFuture) runnable).isDone();
try {
((RunnableFuture) runnable).get();
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.Delayed;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -298,7 +299,11 @@ protected void afterExecute(Runnable r, Throwable t) {
if (t != null) return;
// Scheduler only allows Runnable's so we expect no checked exceptions here. If anyone uses submit directly on `this`, we
// accept the wrapped exception in the output.
ExceptionsHelper.reThrowIfNotNull(EsExecutors.rethrowErrors(r));
if (r instanceof RunnableFuture && ((RunnableFuture<?>) r).isDone()) {
// only check this if task is done, which it always is except for periodic tasks. Periodic tasks will hang on
// RunnableFuture.get()
ExceptionsHelper.reThrowIfNotNull(EsExecutors.rethrowErrors(r));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,16 @@ public void testScheduledOnScheduler() throws InterruptedException {
Scheduler.terminate(executor, 10, TimeUnit.SECONDS);
}
}

public void testScheduleAtFixedRate() throws InterruptedException {
ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY);
try {
CountDownLatch missingExecutions = new CountDownLatch(randomIntBetween(1, 10));
executor.scheduleAtFixedRate(missingExecutions::countDown,
randomIntBetween(1, 10), randomIntBetween(1, 10), TimeUnit.MILLISECONDS);
assertTrue(missingExecutions.await(30, TimeUnit.SECONDS));
} finally {
Scheduler.terminate(executor, 10, TimeUnit.SECONDS);
}
}
}

0 comments on commit 66f7740

Please sign in to comment.