Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix NPE in KafkaSupervisor.checkpointTaskGroup #6206

Merged
merged 4 commits into from
Aug 27, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public class KafkaSupervisor implements Supervisor
*/
private class TaskGroup
{
final int groupId;

// This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
// in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
// this task group has completed successfully, at which point this will be destroyed and a new task group will be
Expand All @@ -161,11 +163,13 @@ private class TaskGroup
final String baseSequenceName;

TaskGroup(
int groupId,
ImmutableMap<Integer, Long> partitionOffsets,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime
)
{
this.groupId = groupId;
this.partitionOffsets = partitionOffsets;
this.minimumMessageTime = minimumMessageTime;
this.maximumMessageTime = maximumMessageTime;
Expand All @@ -187,9 +191,21 @@ Set<String> taskIds()

private static class TaskData
{
@Nullable
volatile TaskStatus status;
@Nullable
volatile DateTime startTime;
volatile Map<Integer, Long> currentOffsets = new HashMap<>();

@Override
public String toString()
{
return "TaskData{" +
"status=" + status +
", startTime=" + startTime +
", currentOffsets=" + currentOffsets +
'}';
}
}

// Map<{group ID}, {actively reading task group}>; see documentation for TaskGroup class
Expand Down Expand Up @@ -718,8 +734,8 @@ public void handle() throws ExecutionException, InterruptedException
log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
return;
}
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroup, false).get();
taskGroup.addNewCheckpoint(newCheckpoint);
log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
}
}
Expand Down Expand Up @@ -785,10 +801,13 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
: currentMetadata.getKafkaPartitions()
.getPartitionOffsetMap()
.get(resetPartitionOffset.getKey());
final TaskGroup partitionTaskGroup = taskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey()));
if (partitionOffsetInMetadataStore != null ||
(partitionTaskGroup != null && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
.equals(resetPartitionOffset.getValue()))) {
final TaskGroup partitionTaskGroup = taskGroups.get(
getTaskGroupIdForPartition(resetPartitionOffset.getKey())
);
final boolean isSameOffset = partitionTaskGroup != null
&& partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
.equals(resetPartitionOffset.getValue());
if (partitionOffsetInMetadataStore != null || isSameOffset) {
doReset = true;
break;
}
Expand Down Expand Up @@ -1012,7 +1031,7 @@ private void discoverTasks() throws ExecutionException, InterruptedException, Ti
List<String> futureTaskIds = Lists.newArrayList();
List<ListenableFuture<Boolean>> futures = Lists.newArrayList();
List<Task> tasks = taskStorage.getActiveTasks();
final Set<Integer> taskGroupsToVerify = new HashSet<>();
final Map<Integer, TaskGroup> taskGroupsToVerify = new HashMap<>();

for (Task task : tasks) {
if (!(task instanceof KafkaIndexTask) || !dataSource.equals(task.getDataSource())) {
Expand Down Expand Up @@ -1119,6 +1138,7 @@ public Boolean apply(KafkaIndexTask.Status status)
k -> {
log.info("Creating a new task group for taskGroupId[%d]", taskGroupId);
return new TaskGroup(
taskGroupId,
ImmutableMap.copyOf(
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()
),
Expand All @@ -1127,8 +1147,15 @@ public Boolean apply(KafkaIndexTask.Status status)
);
}
);
taskGroupsToVerify.add(taskGroupId);
taskGroup.tasks.putIfAbsent(taskId, new TaskData());
taskGroupsToVerify.put(taskGroupId, taskGroup);
final TaskData prevTaskGroup = taskGroup.tasks.putIfAbsent(taskId, new TaskData());
if (prevTaskGroup != null) {
throw new ISE(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be very surprising if this happened? Enough to stop the supervisor run (i.e.: probable bug)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never happen and even if it occurs, the supervisor would kill the task of corresponding taskId and respawn the same task. Please check https://github.com/apache/incubator-druid/pull/6206/files/c46d4681c334709caa5cddbd0ce0c67a7d22eaad#diff-6eee87b3aa4eb3a516965fe6e93e25a4L1138.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks.

"WTH? a taskGroup[%s] already exists for new task[%s]",
prevTaskGroup,
taskId
);
}
}
}
return true;
Expand Down Expand Up @@ -1156,7 +1183,7 @@ public Boolean apply(KafkaIndexTask.Status status)
log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", taskCount, dataSource);

// make sure the checkpoints are consistent with each other and with the metadata store
taskGroupsToVerify.forEach(this::verifyAndMergeCheckpoints);
taskGroupsToVerify.values().forEach(this::verifyAndMergeCheckpoints);
}

/**
Expand All @@ -1166,10 +1193,9 @@ public Boolean apply(KafkaIndexTask.Status status)
* 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly
* created tasks for the taskGroup start indexing from after the latest published offsets.
*/
private void verifyAndMergeCheckpoints(final Integer groupId)
private void verifyAndMergeCheckpoints(final TaskGroup taskGroup)
{
final TaskGroup taskGroup = taskGroups.get(groupId);

final int groupId = taskGroup.groupId;
// List<TaskId, Map -> {SequenceId, Checkpoints}>
final List<Pair<String, TreeMap<Integer, Map<Integer, Long>>>> taskSequences = new CopyOnWriteArrayList<>();
final List<ListenableFuture<TreeMap<Integer, Map<Integer, Long>>>> futures = new ArrayList<>();
Expand Down Expand Up @@ -1330,6 +1356,7 @@ private void addDiscoveredTaskToPendingCompletionTaskGroups(
// reading the minimumMessageTime & maximumMessageTime from the publishing task and setting it here is not necessary as this task cannot
// change to a state where it will read any more events
TaskGroup newTaskGroup = new TaskGroup(
groupId,
ImmutableMap.copyOf(startingPartitions),
Optional.absent(),
Optional.absent()
Expand Down Expand Up @@ -1367,8 +1394,8 @@ public Boolean apply(@Nullable DateTime startTime)
}

taskData.startTime = startTime;
long millisRemaining = ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis()
- taskData.startTime.getMillis());
long millisRemaining = ioConfig.getTaskDuration().getMillis() -
(System.currentTimeMillis() - taskData.startTime.getMillis());
if (millisRemaining > 0) {
scheduledExec.schedule(
buildRunTask(),
Expand Down Expand Up @@ -1421,7 +1448,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
// find the longest running task from this group
DateTime earliestTaskStart = DateTimes.nowUtc();
for (TaskData taskData : group.tasks.values()) {
if (earliestTaskStart.isAfter(taskData.startTime)) {
// startTime can be null if kafkaSupervisor is stopped gracefully before processing any runNotice
if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) {
earliestTaskStart = taskData.startTime;
}
}
Expand All @@ -1430,7 +1458,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration());
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(groupId, true));
futures.add(checkpointTaskGroup(group, true));
}
}

Expand Down Expand Up @@ -1468,10 +1496,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
}
}

private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final int groupId, final boolean finalize)
private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final TaskGroup taskGroup, final boolean finalize)
{
final TaskGroup taskGroup = taskGroups.get(groupId);

if (finalize) {
// 1) Check if any task completed (in which case we're done) and kill unassigned tasks
Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
Expand All @@ -1480,30 +1506,33 @@ private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final int group
String taskId = taskEntry.getKey();
TaskData task = taskEntry.getValue();

if (task.status.isSuccess()) {
// If any task in this group has already completed, stop the rest of the tasks in the group and return.
// This will cause us to create a new set of tasks next cycle that will start from the offsets in
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing
// failed and we need to re-ingest)
return Futures.transform(
stopTasksInGroup(taskGroup),
new Function<Object, Map<Integer, Long>>()
{
@Nullable
@Override
public Map<Integer, Long> apply(@Nullable Object input)
// task.status can be null if kafkaSupervisor is stopped gracefully before processing any runNotice.
if (task.status != null) {
if (task.status.isSuccess()) {
// If any task in this group has already completed, stop the rest of the tasks in the group and return.
// This will cause us to create a new set of tasks next cycle that will start from the offsets in
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if
// publishing failed and we need to re-ingest)
return Futures.transform(
stopTasksInGroup(taskGroup),
new Function<Object, Map<Integer, Long>>()
{
return null;
@Nullable
@Override
public Map<Integer, Long> apply(@Nullable Object input)
{
return null;
}
}
}
);
}
);
}

if (task.status.isRunnable()) {
if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
killTask(taskId);
i.remove();
if (task.status.isRunnable()) {
if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
killTask(taskId);
i.remove();
}
}
}
}
Expand Down Expand Up @@ -1550,7 +1579,7 @@ public Map<Integer, Long> apply(List<Map<Integer, Long>> input)
final List<String> setEndOffsetTaskIds = ImmutableList.copyOf(taskGroup.taskIds());

if (setEndOffsetTaskIds.isEmpty()) {
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
return null;
}

Expand All @@ -1561,11 +1590,15 @@ public Map<Integer, Long> apply(List<Map<Integer, Long>> input)
"Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]",
endOffsets,
taskGroup.sequenceOffsets.lastEntry().getValue(),
groupId
taskGroup.groupId
);
}

log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
log.info(
"Setting endOffsets for tasks in taskGroup [%d] to %s and resuming",
taskGroup.groupId,
endOffsets
);
for (final String taskId : setEndOffsetTaskIds) {
setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize));
}
Expand All @@ -1587,7 +1620,7 @@ public Map<Integer, Long> apply(List<Map<Integer, Long>> input)
}

if (taskGroup.tasks.isEmpty()) {
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
return null;
}

Expand Down Expand Up @@ -1627,11 +1660,15 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
continue;
}

Iterator<Map.Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
Iterator<Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
while (iTask.hasNext()) {
Map.Entry<String, TaskData> task = iTask.next();
final Entry<String, TaskData> entry = iTask.next();
final String taskId = entry.getKey();
final TaskData taskData = entry.getValue();

Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);

if (task.getValue().status.isFailure()) {
if (taskData.status.isFailure()) {
iTask.remove(); // remove failed task
if (group.tasks.isEmpty()) {
// if all tasks in the group have failed, just nuke all task groups with this partition set and restart
Expand All @@ -1640,10 +1677,10 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
}
}

if (task.getValue().status.isSuccess()) {
if (taskData.status.isSuccess()) {
// If one of the pending completion tasks was successful, stop the rest of the tasks in the group as
// we no longer need them to publish their segment.
log.info("Task [%s] completed successfully, stopping tasks %s", task.getKey(), group.taskIds());
log.info("Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds());
futures.add(stopTasksInGroup(group));
foundSuccess = true;
toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups
Expand Down Expand Up @@ -1714,6 +1751,8 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep
continue;
}

Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);

// remove failed tasks
if (taskData.status.isFailure()) {
iTasks.remove();
Expand Down Expand Up @@ -1741,7 +1780,7 @@ void createNewTasks() throws JsonProcessingException
taskGroups.entrySet()
.stream()
.filter(taskGroup -> taskGroup.getValue().tasks.size() < ioConfig.getReplicas())
.forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getKey()));
.forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getValue()));

// check that there is a current task group for each group of partitions in [partitionGroups]
for (Integer groupId : partitionGroups.keySet()) {
Expand All @@ -1757,6 +1796,7 @@ void createNewTasks() throws JsonProcessingException
) : Optional.absent());

final TaskGroup taskGroup = new TaskGroup(
groupId,
generateStartingOffsetsForPartitionGroup(groupId),
minimumMessageTime,
maximumMessageTime
Expand Down Expand Up @@ -1984,8 +2024,12 @@ private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup)

final List<ListenableFuture<Void>> futures = Lists.newArrayList();
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
if (!entry.getValue().status.isComplete()) {
futures.add(stopTask(entry.getKey(), false));
final String taskId = entry.getKey();
final TaskData taskData = entry.getValue();
if (taskData.status == null) {
killTask(taskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is killing the task the right idea here? If we don't know its status is it safe to leave it alone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null taskData.status means that the supervisor hasn't updated it yet, so its actual status can be anything. I think this should kill tasks if their status are unknown because this method is supposed to stop all tasks in the given taskGroup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, that sounds good.

} else if (!taskData.status.isComplete()) {
futures.add(stopTask(taskId, false));
}
}

Expand Down Expand Up @@ -2066,7 +2110,7 @@ private SupervisorReport<KafkaSupervisorReportPayload> generateReport(boolean in
for (TaskGroup taskGroup : taskGroups.values()) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
DateTime startTime = entry.getValue().startTime;
@Nullable DateTime startTime = entry.getValue().startTime;
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
Long remainingSeconds = null;
if (startTime != null) {
Expand All @@ -2093,7 +2137,7 @@ private SupervisorReport<KafkaSupervisorReportPayload> generateReport(boolean in
for (TaskGroup taskGroup : taskGroups) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
DateTime startTime = entry.getValue().startTime;
@Nullable DateTime startTime = entry.getValue().startTime;
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
Long remainingSeconds = null;
if (taskGroup.completionTimeout != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public TaskReportData(
String id,
@Nullable Map<Integer, Long> startingOffsets,
@Nullable Map<Integer, Long> currentOffsets,
DateTime startTime,
@Nullable DateTime startTime,
Long remainingSeconds,
TaskType type,
@Nullable Map<Integer, Long> lag
Expand Down