Skip to content

Commit

Permalink
Merge pull request apache#1772 from gianm/fix-overlord-startup
Browse files Browse the repository at this point in the history
RemoteTaskRunner: Fix for starting an overlord before any workers ever existed.
  • Loading branch information
fjy committed Sep 25, 2015
2 parents 9dfc74b + 63bf021 commit aa9d903
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,14 @@ public void onFailure(Throwable throwable)
}
}
// Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
for (String worker : cf.getChildren().forPath(indexerZkConfig.getStatusPath())) {
List<String> workers;
try {
workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
} catch (KeeperException.NoNodeException e) {
// statusPath doesn't exist yet; can occur if no middleManagers have started.
workers = ImmutableList.of();
}
for (String worker : workers) {
if (!zkWorkers.containsKey(worker)
&& cf.checkExists().forPath(JOINER.join(indexerZkConfig.getAnnouncementsPath(), worker)) == null) {
scheduleTasksCleanupForWorker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.DSuppliers;
Expand Down Expand Up @@ -63,7 +61,6 @@
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -84,7 +81,6 @@ public class RemoteTaskRunnerTest
private TestMergeTask task;

private Worker worker;
private RemoteTaskRunnerConfig config;

@Before
public void setUp() throws Exception
Expand All @@ -100,8 +96,6 @@ public void setUp() throws Exception
cf.start();
cf.create().creatingParentsIfNeeded().forPath(basePath);
cf.create().creatingParentsIfNeeded().forPath(tasksPath);
cf.create().creatingParentsIfNeeded().forPath(statusPath);


task = TestMergeTask.createDummyTask("task");
}
Expand Down Expand Up @@ -131,6 +125,12 @@ public void testRun() throws Exception
Assert.assertEquals(TaskStatus.Status.SUCCESS, result.get().getStatusCode());
}

@Test
public void testStartWithNoWorker() throws Exception
{
makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S")));
}

@Test
public void testRunExistingTaskThatHasntStartedRunning() throws Exception
{
Expand Down Expand Up @@ -191,15 +191,33 @@ public void testRunSameAvailabilityGroup() throws Exception
{
doSetup();

TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"), jsonMapper);
TestRealtimeTask task1 = new TestRealtimeTask(
"rt1",
new TaskResource("rt1", 1),
"foo",
TaskStatus.running("rt1"),
jsonMapper
);
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);

TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"), jsonMapper);
TestRealtimeTask task2 = new TestRealtimeTask(
"rt2",
new TaskResource("rt1", 1),
"foo",
TaskStatus.running("rt2"),
jsonMapper
);
remoteTaskRunner.run(task2);

TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"), jsonMapper);
TestRealtimeTask task3 = new TestRealtimeTask(
"rt3",
new TaskResource("rt2", 1),
"foo",
TaskStatus.running("rt3"),
jsonMapper
);
remoteTaskRunner.run(task3);

Assert.assertTrue(
Expand Down Expand Up @@ -236,15 +254,33 @@ public void testRunWithCapacity() throws Exception
{
doSetup();

TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"), jsonMapper);
TestRealtimeTask task1 = new TestRealtimeTask(
"rt1",
new TaskResource("rt1", 1),
"foo",
TaskStatus.running("rt1"),
jsonMapper
);
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);

TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"), jsonMapper);
TestRealtimeTask task2 = new TestRealtimeTask(
"rt2",
new TaskResource("rt2", 3),
"foo",
TaskStatus.running("rt2"),
jsonMapper
);
remoteTaskRunner.run(task2);

TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"), jsonMapper);
TestRealtimeTask task3 = new TestRealtimeTask(
"rt3",
new TaskResource("rt3", 2),
"foo",
TaskStatus.running("rt3"),
jsonMapper
);
remoteTaskRunner.run(task3);
Assert.assertTrue(taskAnnounced(task3.getId()));
mockWorkerRunningTask(task3);
Expand Down Expand Up @@ -507,7 +543,9 @@ private void mockWorkerRunningTask(final Task task) throws Exception
cf.delete().forPath(joiner.join(tasksPath, task.getId()));

TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.running(task.getId()));
cf.create().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
cf.create()
.creatingParentsIfNeeded()
.forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
}

private void mockWorkerCompleteSuccessfulTask(final Task task) throws Exception
Expand Down

0 comments on commit aa9d903

Please sign in to comment.