diff --git a/src/main/java/io/vertx/core/impl/TaskQueue.java b/src/main/java/io/vertx/core/impl/TaskQueue.java index e1b5352dacc..7898a66d78f 100644 --- a/src/main/java/io/vertx/core/impl/TaskQueue.java +++ b/src/main/java/io/vertx/core/impl/TaskQueue.java @@ -16,6 +16,7 @@ import java.util.LinkedList; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import java.util.function.Consumer; /** @@ -124,11 +125,27 @@ public Consumer unschedule() { */ public void execute(Runnable task, Executor executor) { synchronized (tasks) { - tasks.add(new ExecuteTask(task, executor)); - if (this.currentExecutor == null) { - this.currentExecutor = executor; - executor.execute(runner); + if (currentExecutor == null) { + currentExecutor = executor; + try { + executor.execute(runner); + } catch (RejectedExecutionException e) { + currentExecutor = null; + throw e; + } } + // Add the task after the runner has been accepted to the executor + // to cover the case of a rejected execution exception. + tasks.add(new ExecuteTask(task, executor)); + } + } + + /** + * Test if the task queue is empty and no current executor is running anymore. + */ + public boolean isEmpty() { + synchronized (tasks) { + return tasks.isEmpty() && currentExecutor == null; } } diff --git a/src/test/java/io/vertx/core/impl/TaskQueueTest.java b/src/test/java/io/vertx/core/impl/TaskQueueTest.java new file mode 100644 index 00000000000..36b2092f1df --- /dev/null +++ b/src/test/java/io/vertx/core/impl/TaskQueueTest.java @@ -0,0 +1,34 @@ +package io.vertx.core.impl; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * @author Alexander Schwartz + */ +public class TaskQueueTest { + + Executor executorThatAlwaysThrowsRejectedExceptions = new Executor() { + @Override + public void execute(Runnable command) { + throw new RejectedExecutionException(); + } + }; + + TaskQueue taskQueue = new TaskQueue(); + + @Test + public void shouldNotHaveTaskInQueueWhenTaskHasBeenRejected() { + assertThatThrownBy( + () -> taskQueue.execute(new Thread(), executorThatAlwaysThrowsRejectedExceptions) + ).isInstanceOf(RejectedExecutionException.class); + + Assertions.assertThat(taskQueue.isEmpty()).isTrue(); + } + +}