From e8966a7224076b63aaa12c6e258e299838d8502f Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 27 Aug 2018 08:01:59 -0700 Subject: [PATCH] Fix NPE in KafkaSupervisor.checkpointTaskGroup (#6206) (#6245) * Fix NPE in KafkaSupervisor.checkpointTaskGroup * address comments * address comment --- .../kafka/supervisor/KafkaSupervisor.java | 154 +++++++++++------- .../kafka/supervisor/TaskReportData.java | 2 +- 2 files changed, 100 insertions(+), 56 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 63868230c3dd..431fe0f4d1ad 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -146,6 +146,8 @@ public class KafkaSupervisor implements Supervisor */ private class TaskGroup { + final int groupId; + // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in // this task group has completed successfully, at which point this will be destroyed and a new task group will be @@ -161,11 +163,13 @@ private class TaskGroup final String baseSequenceName; TaskGroup( + int groupId, ImmutableMap partitionOffsets, Optional minimumMessageTime, Optional maximumMessageTime ) { + this.groupId = groupId; this.partitionOffsets = partitionOffsets; this.minimumMessageTime = minimumMessageTime; this.maximumMessageTime = maximumMessageTime; @@ -187,9 +191,21 @@ Set taskIds() private static class TaskData { + @Nullable volatile TaskStatus status; + @Nullable volatile DateTime startTime; volatile Map currentOffsets = new HashMap<>(); + + @Override + public String toString() + { + return "TaskData{" + + "status=" + status + + ", startTime=" + startTime + + ", currentOffsets=" + currentOffsets + + '}'; + } } // Map<{group ID}, {actively reading task group}>; see documentation for TaskGroup class @@ -697,8 +713,8 @@ public void handle() throws ExecutionException, InterruptedException, TimeoutExc log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue()); return; } - final Map newCheckpoint = checkpointTaskGroup(taskGroupId, false).get(); - taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint); + final Map newCheckpoint = checkpointTaskGroup(taskGroup, false).get(); + taskGroup.addNewCheckpoint(newCheckpoint); log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId); } } @@ -764,10 +780,13 @@ void resetInternal(DataSourceMetadata dataSourceMetadata) : currentMetadata.getKafkaPartitions() .getPartitionOffsetMap() .get(resetPartitionOffset.getKey()); - final TaskGroup partitionTaskGroup = taskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey())); - if (partitionOffsetInMetadataStore != null || - (partitionTaskGroup != null && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey()) - .equals(resetPartitionOffset.getValue()))) { + final TaskGroup partitionTaskGroup = taskGroups.get( + getTaskGroupIdForPartition(resetPartitionOffset.getKey()) + ); + final boolean isSameOffset = partitionTaskGroup != null + && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey()) + .equals(resetPartitionOffset.getValue()); + if (partitionOffsetInMetadataStore != null || isSameOffset) { doReset = true; break; } @@ -991,7 +1010,7 @@ private void discoverTasks() throws ExecutionException, InterruptedException, Ti List futureTaskIds = Lists.newArrayList(); List> futures = Lists.newArrayList(); List tasks = taskStorage.getActiveTasks(); - final Set taskGroupsToVerify = new HashSet<>(); + final Map taskGroupsToVerify = new HashMap<>(); for (Task task : tasks) { if (!(task instanceof KafkaIndexTask) || !dataSource.equals(task.getDataSource())) { @@ -1098,6 +1117,7 @@ public Boolean apply(KafkaIndexTask.Status status) k -> { log.info("Creating a new task group for taskGroupId[%d]", taskGroupId); return new TaskGroup( + taskGroupId, ImmutableMap.copyOf( kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap() ), @@ -1106,8 +1126,15 @@ public Boolean apply(KafkaIndexTask.Status status) ); } ); - taskGroupsToVerify.add(taskGroupId); - taskGroup.tasks.putIfAbsent(taskId, new TaskData()); + taskGroupsToVerify.put(taskGroupId, taskGroup); + final TaskData prevTaskGroup = taskGroup.tasks.putIfAbsent(taskId, new TaskData()); + if (prevTaskGroup != null) { + throw new ISE( + "WTH? a taskGroup[%s] already exists for new task[%s]", + prevTaskGroup, + taskId + ); + } } } return true; @@ -1135,7 +1162,7 @@ public Boolean apply(KafkaIndexTask.Status status) log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", taskCount, dataSource); // make sure the checkpoints are consistent with each other and with the metadata store - taskGroupsToVerify.forEach(this::verifyAndMergeCheckpoints); + taskGroupsToVerify.values().forEach(this::verifyAndMergeCheckpoints); } /** @@ -1145,10 +1172,9 @@ public Boolean apply(KafkaIndexTask.Status status) * 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly * created tasks for the taskGroup start indexing from after the latest published offsets. */ - private void verifyAndMergeCheckpoints(final Integer groupId) + private void verifyAndMergeCheckpoints(final TaskGroup taskGroup) { - final TaskGroup taskGroup = taskGroups.get(groupId); - + final int groupId = taskGroup.groupId; // List {SequenceId, Checkpoints}> final List>>> taskSequences = new CopyOnWriteArrayList<>(); final List>>> futures = new ArrayList<>(); @@ -1302,6 +1328,7 @@ private void addDiscoveredTaskToPendingCompletionTaskGroups( // reading the minimumMessageTime & maximumMessageTime from the publishing task and setting it here is not necessary as this task cannot // change to a state where it will read any more events TaskGroup newTaskGroup = new TaskGroup( + groupId, ImmutableMap.copyOf(startingPartitions), Optional.absent(), Optional.absent() @@ -1339,8 +1366,8 @@ public Boolean apply(@Nullable DateTime startTime) } taskData.startTime = startTime; - long millisRemaining = ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis() - - taskData.startTime.getMillis()); + long millisRemaining = ioConfig.getTaskDuration().getMillis() - + (System.currentTimeMillis() - taskData.startTime.getMillis()); if (millisRemaining > 0) { scheduledExec.schedule( buildRunTask(), @@ -1393,7 +1420,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException // find the longest running task from this group DateTime earliestTaskStart = DateTimes.nowUtc(); for (TaskData taskData : group.tasks.values()) { - if (earliestTaskStart.isAfter(taskData.startTime)) { + // startTime can be null if kafkaSupervisor is stopped gracefully before processing any runNotice + if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) { earliestTaskStart = taskData.startTime; } } @@ -1402,7 +1430,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) { log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration()); futureGroupIds.add(groupId); - futures.add(checkpointTaskGroup(groupId, true)); + futures.add(checkpointTaskGroup(group, true)); } } @@ -1440,10 +1468,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException } } - private ListenableFuture> checkpointTaskGroup(final int groupId, final boolean finalize) + private ListenableFuture> checkpointTaskGroup(final TaskGroup taskGroup, final boolean finalize) { - final TaskGroup taskGroup = taskGroups.get(groupId); - if (finalize) { // 1) Check if any task completed (in which case we're done) and kill unassigned tasks Iterator> i = taskGroup.tasks.entrySet().iterator(); @@ -1452,30 +1478,33 @@ private ListenableFuture> checkpointTaskGroup(final int group String taskId = taskEntry.getKey(); TaskData task = taskEntry.getValue(); - if (task.status.isSuccess()) { - // If any task in this group has already completed, stop the rest of the tasks in the group and return. - // This will cause us to create a new set of tasks next cycle that will start from the offsets in - // metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing - // failed and we need to re-ingest) - return Futures.transform( - stopTasksInGroup(taskGroup), - new Function>() - { - @Nullable - @Override - public Map apply(@Nullable Object input) + // task.status can be null if kafkaSupervisor is stopped gracefully before processing any runNotice. + if (task.status != null) { + if (task.status.isSuccess()) { + // If any task in this group has already completed, stop the rest of the tasks in the group and return. + // This will cause us to create a new set of tasks next cycle that will start from the offsets in + // metadata store (which will have advanced if we succeeded in publishing and will remain the same if + // publishing failed and we need to re-ingest) + return Futures.transform( + stopTasksInGroup(taskGroup), + new Function>() { - return null; + @Nullable + @Override + public Map apply(@Nullable Object input) + { + return null; + } } - } - ); - } + ); + } - if (task.status.isRunnable()) { - if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) { - log.info("Killing task [%s] which hasn't been assigned to a worker", taskId); - killTask(taskId); - i.remove(); + if (task.status.isRunnable()) { + if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) { + log.info("Killing task [%s] which hasn't been assigned to a worker", taskId); + killTask(taskId); + i.remove(); + } } } } @@ -1522,7 +1551,7 @@ public Map apply(List> input) final List setEndOffsetTaskIds = ImmutableList.copyOf(taskGroup.taskIds()); if (setEndOffsetTaskIds.isEmpty()) { - log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId); + log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId); return null; } @@ -1533,11 +1562,15 @@ public Map apply(List> input) "Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]", endOffsets, taskGroup.sequenceOffsets.lastEntry().getValue(), - groupId + taskGroup.groupId ); } - log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets); + log.info( + "Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", + taskGroup.groupId, + endOffsets + ); for (final String taskId : setEndOffsetTaskIds) { setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize)); } @@ -1559,7 +1592,7 @@ public Map apply(List> input) } if (taskGroup.tasks.isEmpty()) { - log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId); + log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId); return null; } @@ -1599,11 +1632,15 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte continue; } - Iterator> iTask = group.tasks.entrySet().iterator(); + Iterator> iTask = group.tasks.entrySet().iterator(); while (iTask.hasNext()) { - Map.Entry task = iTask.next(); + final Entry entry = iTask.next(); + final String taskId = entry.getKey(); + final TaskData taskData = entry.getValue(); + + Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId); - if (task.getValue().status.isFailure()) { + if (taskData.status.isFailure()) { iTask.remove(); // remove failed task if (group.tasks.isEmpty()) { // if all tasks in the group have failed, just nuke all task groups with this partition set and restart @@ -1612,10 +1649,10 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte } } - if (task.getValue().status.isSuccess()) { + if (taskData.status.isSuccess()) { // If one of the pending completion tasks was successful, stop the rest of the tasks in the group as // we no longer need them to publish their segment. - log.info("Task [%s] completed successfully, stopping tasks %s", task.getKey(), group.taskIds()); + log.info("Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds()); futures.add(stopTasksInGroup(group)); foundSuccess = true; toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups @@ -1686,6 +1723,8 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep continue; } + Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId); + // remove failed tasks if (taskData.status.isFailure()) { iTasks.remove(); @@ -1713,7 +1752,7 @@ void createNewTasks() throws JsonProcessingException taskGroups.entrySet() .stream() .filter(taskGroup -> taskGroup.getValue().tasks.size() < ioConfig.getReplicas()) - .forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getKey())); + .forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getValue())); // check that there is a current task group for each group of partitions in [partitionGroups] for (Integer groupId : partitionGroups.keySet()) { @@ -1729,6 +1768,7 @@ void createNewTasks() throws JsonProcessingException ) : Optional.absent()); final TaskGroup taskGroup = new TaskGroup( + groupId, generateStartingOffsetsForPartitionGroup(groupId), minimumMessageTime, maximumMessageTime @@ -1955,8 +1995,12 @@ private ListenableFuture stopTasksInGroup(@Nullable TaskGroup taskGroup) final List> futures = Lists.newArrayList(); for (Map.Entry entry : taskGroup.tasks.entrySet()) { - if (!entry.getValue().status.isComplete()) { - futures.add(stopTask(entry.getKey(), false)); + final String taskId = entry.getKey(); + final TaskData taskData = entry.getValue(); + if (taskData.status == null) { + killTask(taskId); + } else if (!taskData.status.isComplete()) { + futures.add(stopTask(taskId, false)); } } @@ -2033,7 +2077,7 @@ private KafkaSupervisorReport generateReport(boolean includeOffsets) for (TaskGroup taskGroup : taskGroups.values()) { for (Map.Entry entry : taskGroup.tasks.entrySet()) { String taskId = entry.getKey(); - DateTime startTime = entry.getValue().startTime; + @Nullable DateTime startTime = entry.getValue().startTime; Map currentOffsets = entry.getValue().currentOffsets; Long remainingSeconds = null; if (startTime != null) { @@ -2060,7 +2104,7 @@ private KafkaSupervisorReport generateReport(boolean includeOffsets) for (TaskGroup taskGroup : taskGroups) { for (Map.Entry entry : taskGroup.tasks.entrySet()) { String taskId = entry.getKey(); - DateTime startTime = entry.getValue().startTime; + @Nullable DateTime startTime = entry.getValue().startTime; Map currentOffsets = entry.getValue().currentOffsets; Long remainingSeconds = null; if (taskGroup.completionTimeout != null) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java index 923f69ec401c..bc127e173381 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java @@ -45,7 +45,7 @@ public TaskReportData( String id, @Nullable Map startingOffsets, @Nullable Map currentOffsets, - DateTime startTime, + @Nullable DateTime startTime, Long remainingSeconds, TaskType type, @Nullable Map lag