diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index fc02c6b00762..f45425a84f18 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -145,7 +145,6 @@ public void close() { Thread.currentThread().interrupt(); } commandRunnerStatusMetric.close(); - commandStore.close(); } /** @@ -284,6 +283,8 @@ public void run() { if (!closed) { throw wue; } + } finally { + commandStore.close(); } } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java index c50da1676442..d27833a6ac8f 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandRunnerTest.java @@ -49,6 +49,8 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -85,6 +87,8 @@ public class CommandRunnerTest { private QueuedCommand queuedCommand3; @Mock private ExecutorService executor; + @Captor + private ArgumentCaptor threadTaskCaptor; private CommandRunner commandRunner; @Before @@ -141,9 +145,11 @@ public void shouldRunThePriorCommandsWithTerminateCorrectly() { commandRunner.processPriorCommands(); // Then: - verify(serverState).setTerminating(); - verify(commandStore).close(); - verify(clusterTerminator).terminateCluster(anyList()); + final InOrder inOrder = inOrder(serverState, commandStore, clusterTerminator, statementExecutor); + inOrder.verify(serverState).setTerminating(); + inOrder.verify(commandStore).wakeup(); + inOrder.verify(clusterTerminator).terminateCluster(anyList()); + verify(statementExecutor, never()).handleRestore(any()); } @@ -295,6 +301,9 @@ public void shouldSubmitTaskOnStart() { @Test public void shouldCloseTheCommandRunnerCorrectly() throws Exception { + // Given: + commandRunner.start(); + // When: commandRunner.close(); @@ -302,16 +311,17 @@ public void shouldCloseTheCommandRunnerCorrectly() throws Exception { final InOrder inOrder = inOrder(executor, commandStore); inOrder.verify(commandStore).wakeup(); inOrder.verify(executor).awaitTermination(anyLong(), any()); - inOrder.verify(commandStore).close(); - } + inOrder.verify(commandStore, never()).close(); // commandStore must be closed by runner thread - @Test(expected = RuntimeException.class) - public void shouldThrowExceptionIfCannotCloseCommandStore() { - // Given: - doThrow(RuntimeException.class).when(commandStore).close(); + final Runnable threadTask = getThreadTask(); + threadTask.run(); - // When: - commandRunner.close(); + verify(commandStore).close(); + } + + private Runnable getThreadTask() { + verify(executor).execute(threadTaskCaptor.capture()); + return threadTaskCaptor.getValue(); } private void givenQueuedCommands(final QueuedCommand... cmds) {