diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 51ed6a5a12ea..a54d9abb8cf9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -249,7 +249,14 @@ public void onFailure(Throwable throwable) } } // Schedule cleanup for task status of the workers that might have disconnected while overlord was not running - for (String worker : cf.getChildren().forPath(indexerZkConfig.getStatusPath())) { + List workers; + try { + workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath()); + } catch (KeeperException.NoNodeException e) { + // statusPath doesn't exist yet; can occur if no middleManagers have started. + workers = ImmutableList.of(); + } + for (String worker : workers) { if (!zkWorkers.containsKey(worker) && cf.checkExists().forPath(JOINER.join(indexerZkConfig.getAnnouncementsPath(), worker)) == null) { scheduleTasksCleanupForWorker( diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index 4f2fd718e7f5..974c1d7da503 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -26,9 +26,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; -import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; -import com.metamx.common.lifecycle.Lifecycle; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import io.druid.common.guava.DSuppliers; @@ -63,7 +61,6 @@ import java.util.Collection; import java.util.Set; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -84,7 +81,6 @@ public class RemoteTaskRunnerTest private TestMergeTask task; private Worker worker; - private RemoteTaskRunnerConfig config; @Before public void setUp() throws Exception @@ -100,8 +96,6 @@ public void setUp() throws Exception cf.start(); cf.create().creatingParentsIfNeeded().forPath(basePath); cf.create().creatingParentsIfNeeded().forPath(tasksPath); - cf.create().creatingParentsIfNeeded().forPath(statusPath); - task = TestMergeTask.createDummyTask("task"); } @@ -131,6 +125,12 @@ public void testRun() throws Exception Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode()); } + @Test + public void testStartWithNoWorker() throws Exception + { + makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S"))); + } + @Test public void testRunExistingTaskThatHasntStartedRunning() throws Exception { @@ -191,15 +191,33 @@ public void testRunSameAvailabilityGroup() throws Exception { doSetup(); - TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"), jsonMapper); + TestRealtimeTask task1 = new TestRealtimeTask( + "rt1", + new TaskResource("rt1", 1), + "foo", + TaskStatus.running("rt1"), + jsonMapper + ); remoteTaskRunner.run(task1); Assert.assertTrue(taskAnnounced(task1.getId())); mockWorkerRunningTask(task1); - TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"), jsonMapper); + TestRealtimeTask task2 = new TestRealtimeTask( + "rt2", + new TaskResource("rt1", 1), + "foo", + TaskStatus.running("rt2"), + jsonMapper + ); remoteTaskRunner.run(task2); - TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"), jsonMapper); + TestRealtimeTask task3 = new TestRealtimeTask( + "rt3", + new TaskResource("rt2", 1), + "foo", + TaskStatus.running("rt3"), + jsonMapper + ); remoteTaskRunner.run(task3); Assert.assertTrue( @@ -236,15 +254,33 @@ public void testRunWithCapacity() throws Exception { doSetup(); - TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"), jsonMapper); + TestRealtimeTask task1 = new TestRealtimeTask( + "rt1", + new TaskResource("rt1", 1), + "foo", + TaskStatus.running("rt1"), + jsonMapper + ); remoteTaskRunner.run(task1); Assert.assertTrue(taskAnnounced(task1.getId())); mockWorkerRunningTask(task1); - TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"), jsonMapper); + TestRealtimeTask task2 = new TestRealtimeTask( + "rt2", + new TaskResource("rt2", 3), + "foo", + TaskStatus.running("rt2"), + jsonMapper + ); remoteTaskRunner.run(task2); - TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"), jsonMapper); + TestRealtimeTask task3 = new TestRealtimeTask( + "rt3", + new TaskResource("rt3", 2), + "foo", + TaskStatus.running("rt3"), + jsonMapper + ); remoteTaskRunner.run(task3); Assert.assertTrue(taskAnnounced(task3.getId())); mockWorkerRunningTask(task3); @@ -507,7 +543,9 @@ private void mockWorkerRunningTask(final Task task) throws Exception cf.delete().forPath(joiner.join(tasksPath, task.getId())); TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.running(task.getId())); - cf.create().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement)); + cf.create() + .creatingParentsIfNeeded() + .forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement)); } private void mockWorkerCompleteSuccessfulTask(final Task task) throws Exception