-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix NPE in KafkaSupervisor.checkpointTaskGroup #6206
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -146,6 +146,8 @@ public class KafkaSupervisor implements Supervisor | |
*/ | ||
private class TaskGroup | ||
{ | ||
final int groupId; | ||
|
||
// This specifies the partitions and starting offsets for this task group. It is set on group creation from the data | ||
// in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in | ||
// this task group has completed successfully, at which point this will be destroyed and a new task group will be | ||
|
@@ -161,11 +163,13 @@ private class TaskGroup | |
final String baseSequenceName; | ||
|
||
TaskGroup( | ||
int groupId, | ||
ImmutableMap<Integer, Long> partitionOffsets, | ||
Optional<DateTime> minimumMessageTime, | ||
Optional<DateTime> maximumMessageTime | ||
) | ||
{ | ||
this.groupId = groupId; | ||
this.partitionOffsets = partitionOffsets; | ||
this.minimumMessageTime = minimumMessageTime; | ||
this.maximumMessageTime = maximumMessageTime; | ||
|
@@ -187,9 +191,21 @@ Set<String> taskIds() | |
|
||
private static class TaskData | ||
{ | ||
@Nullable | ||
volatile TaskStatus status; | ||
@Nullable | ||
volatile DateTime startTime; | ||
volatile Map<Integer, Long> currentOffsets = new HashMap<>(); | ||
|
||
@Override | ||
public String toString() | ||
{ | ||
return "TaskData{" + | ||
"status=" + status + | ||
", startTime=" + startTime + | ||
", currentOffsets=" + currentOffsets + | ||
'}'; | ||
} | ||
} | ||
|
||
// Map<{group ID}, {actively reading task group}>; see documentation for TaskGroup class | ||
|
@@ -718,8 +734,8 @@ public void handle() throws ExecutionException, InterruptedException | |
log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue()); | ||
return; | ||
} | ||
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get(); | ||
taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint); | ||
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroup, false).get(); | ||
taskGroup.addNewCheckpoint(newCheckpoint); | ||
log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId); | ||
} | ||
} | ||
|
@@ -785,10 +801,13 @@ void resetInternal(DataSourceMetadata dataSourceMetadata) | |
: currentMetadata.getKafkaPartitions() | ||
.getPartitionOffsetMap() | ||
.get(resetPartitionOffset.getKey()); | ||
final TaskGroup partitionTaskGroup = taskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey())); | ||
if (partitionOffsetInMetadataStore != null || | ||
(partitionTaskGroup != null && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey()) | ||
.equals(resetPartitionOffset.getValue()))) { | ||
final TaskGroup partitionTaskGroup = taskGroups.get( | ||
getTaskGroupIdForPartition(resetPartitionOffset.getKey()) | ||
); | ||
final boolean isSameOffset = partitionTaskGroup != null | ||
&& partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey()) | ||
.equals(resetPartitionOffset.getValue()); | ||
if (partitionOffsetInMetadataStore != null || isSameOffset) { | ||
doReset = true; | ||
break; | ||
} | ||
|
@@ -1012,7 +1031,7 @@ private void discoverTasks() throws ExecutionException, InterruptedException, Ti | |
List<String> futureTaskIds = Lists.newArrayList(); | ||
List<ListenableFuture<Boolean>> futures = Lists.newArrayList(); | ||
List<Task> tasks = taskStorage.getActiveTasks(); | ||
final Set<Integer> taskGroupsToVerify = new HashSet<>(); | ||
final Map<Integer, TaskGroup> taskGroupsToVerify = new HashMap<>(); | ||
|
||
for (Task task : tasks) { | ||
if (!(task instanceof KafkaIndexTask) || !dataSource.equals(task.getDataSource())) { | ||
|
@@ -1119,6 +1138,7 @@ public Boolean apply(KafkaIndexTask.Status status) | |
k -> { | ||
log.info("Creating a new task group for taskGroupId[%d]", taskGroupId); | ||
return new TaskGroup( | ||
taskGroupId, | ||
ImmutableMap.copyOf( | ||
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap() | ||
), | ||
|
@@ -1127,8 +1147,15 @@ public Boolean apply(KafkaIndexTask.Status status) | |
); | ||
} | ||
); | ||
taskGroupsToVerify.add(taskGroupId); | ||
taskGroup.tasks.putIfAbsent(taskId, new TaskData()); | ||
taskGroupsToVerify.put(taskGroupId, taskGroup); | ||
final TaskData prevTaskGroup = taskGroup.tasks.putIfAbsent(taskId, new TaskData()); | ||
if (prevTaskGroup != null) { | ||
throw new ISE( | ||
"WTH? a taskGroup[%s] already exists for new task[%s]", | ||
prevTaskGroup, | ||
taskId | ||
); | ||
} | ||
} | ||
} | ||
return true; | ||
|
@@ -1156,7 +1183,7 @@ public Boolean apply(KafkaIndexTask.Status status) | |
log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", taskCount, dataSource); | ||
|
||
// make sure the checkpoints are consistent with each other and with the metadata store | ||
taskGroupsToVerify.forEach(this::verifyAndMergeCheckpoints); | ||
taskGroupsToVerify.values().forEach(this::verifyAndMergeCheckpoints); | ||
} | ||
|
||
/** | ||
|
@@ -1166,10 +1193,9 @@ public Boolean apply(KafkaIndexTask.Status status) | |
* 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly | ||
* created tasks for the taskGroup start indexing from after the latest published offsets. | ||
*/ | ||
private void verifyAndMergeCheckpoints(final Integer groupId) | ||
private void verifyAndMergeCheckpoints(final TaskGroup taskGroup) | ||
{ | ||
final TaskGroup taskGroup = taskGroups.get(groupId); | ||
|
||
final int groupId = taskGroup.groupId; | ||
// List<TaskId, Map -> {SequenceId, Checkpoints}> | ||
final List<Pair<String, TreeMap<Integer, Map<Integer, Long>>>> taskSequences = new CopyOnWriteArrayList<>(); | ||
final List<ListenableFuture<TreeMap<Integer, Map<Integer, Long>>>> futures = new ArrayList<>(); | ||
|
@@ -1330,6 +1356,7 @@ private void addDiscoveredTaskToPendingCompletionTaskGroups( | |
// reading the minimumMessageTime & maximumMessageTime from the publishing task and setting it here is not necessary as this task cannot | ||
// change to a state where it will read any more events | ||
TaskGroup newTaskGroup = new TaskGroup( | ||
groupId, | ||
ImmutableMap.copyOf(startingPartitions), | ||
Optional.absent(), | ||
Optional.absent() | ||
|
@@ -1367,8 +1394,8 @@ public Boolean apply(@Nullable DateTime startTime) | |
} | ||
|
||
taskData.startTime = startTime; | ||
long millisRemaining = ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis() | ||
- taskData.startTime.getMillis()); | ||
long millisRemaining = ioConfig.getTaskDuration().getMillis() - | ||
(System.currentTimeMillis() - taskData.startTime.getMillis()); | ||
if (millisRemaining > 0) { | ||
scheduledExec.schedule( | ||
buildRunTask(), | ||
|
@@ -1421,7 +1448,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException | |
// find the longest running task from this group | ||
DateTime earliestTaskStart = DateTimes.nowUtc(); | ||
for (TaskData taskData : group.tasks.values()) { | ||
if (earliestTaskStart.isAfter(taskData.startTime)) { | ||
// startTime can be null if kafkaSupervisor is stopped gracefully before processing any runNotice | ||
if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) { | ||
earliestTaskStart = taskData.startTime; | ||
} | ||
} | ||
|
@@ -1430,7 +1458,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException | |
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) { | ||
log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration()); | ||
futureGroupIds.add(groupId); | ||
futures.add(checkpointTaskGroup(groupId, true)); | ||
futures.add(checkpointTaskGroup(group, true)); | ||
} | ||
} | ||
|
||
|
@@ -1468,10 +1496,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException | |
} | ||
} | ||
|
||
private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final int groupId, final boolean finalize) | ||
private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final TaskGroup taskGroup, final boolean finalize) | ||
{ | ||
final TaskGroup taskGroup = taskGroups.get(groupId); | ||
|
||
if (finalize) { | ||
// 1) Check if any task completed (in which case we're done) and kill unassigned tasks | ||
Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator(); | ||
|
@@ -1480,31 +1506,38 @@ private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final int group | |
String taskId = taskEntry.getKey(); | ||
TaskData task = taskEntry.getValue(); | ||
|
||
if (task.status.isSuccess()) { | ||
// If any task in this group has already completed, stop the rest of the tasks in the group and return. | ||
// This will cause us to create a new set of tasks next cycle that will start from the offsets in | ||
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing | ||
// failed and we need to re-ingest) | ||
return Futures.transform( | ||
stopTasksInGroup(taskGroup), | ||
new Function<Object, Map<Integer, Long>>() | ||
{ | ||
@Nullable | ||
@Override | ||
public Map<Integer, Long> apply(@Nullable Object input) | ||
// task.status can be null if any runNotice is processed before kafkaSupervisor is stopped gracefully. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this comment right? It sounds like it would probably be the other way around (stop gracefully happens before run notices are processed) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. Fixed. |
||
if (task.status != null) { | ||
if (task.status.isSuccess()) { | ||
// If any task in this group has already completed, stop the rest of the tasks in the group and return. | ||
// This will cause us to create a new set of tasks next cycle that will start from the offsets in | ||
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if | ||
// publishing failed and we need to re-ingest) | ||
return Futures.transform( | ||
stopTasksInGroup(taskGroup), | ||
new Function<Object, Map<Integer, Long>>() | ||
{ | ||
return null; | ||
@Nullable | ||
@Override | ||
public Map<Integer, Long> apply(@Nullable Object input) | ||
{ | ||
return null; | ||
} | ||
} | ||
} | ||
); | ||
} | ||
); | ||
} | ||
|
||
if (task.status.isRunnable()) { | ||
if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) { | ||
log.info("Killing task [%s] which hasn't been assigned to a worker", taskId); | ||
killTask(taskId); | ||
i.remove(); | ||
if (task.status.isRunnable()) { | ||
if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) { | ||
log.info("Killing task [%s] which hasn't been assigned to a worker", taskId); | ||
killTask(taskId); | ||
i.remove(); | ||
} | ||
} | ||
} else { | ||
log.info("Killing task [%s] of unknown status", taskId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really want to kill the task in this case -- I thought it could only happen for a supervisor that is stopping gracefully? Maybe we should just ignore the task, and log a warning, rather than killing it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the perspective of checkpointing, if I understand this code correctly, the supervisor is checkpointing because one of tasks in a taskGroup has processed all assigned events, so all tasks in the taskGroup can be stopped or killed. I'm not sure why this code is called when stopping the supervisor though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably we shouldn't checkpoint while stopping the supervisor, and this would be a different issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm maybe it makes sense to checkpoint because the supervisor should wait for tasks to finish their jobs and they should be able to checkpoint in the middle of indexing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that we don't want to stop/kill all tasks in the taskGroup just because one of them has processed all assigned events. It could be a checkpoint for an incremental handoff, and we want all tasks to continue running even after the checkpoint. Am I understanding this right? In other words, it sounds to me like we want to stop/kill all other tasks if any of them has finished (status = success) but we don't want to stop/kill them if it was an incremental handoff. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I forgot to say one more thing. This code is called only when tasks are running more than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I read the code more closely and now I see that the idea is that at With your patch, checkpointTaskGroup, when finalize is true, will now kill any task that has null status. I don't see why this is a good thing. After the Am I wrong -- is there a reason it's a good idea to kill tasks with unknown status in this case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, good point. I thought it makes sense to kill them because the supervisor is currently killing running tasks if they are not allocated to middleManagers yet. Maybe it makes more sense to keep them because the unknown task status indicates that the supervisor hasn't updated it yet. I'll fix this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think killing unassigned running tasks does make sense, since if a task hasn't even started running yet, it has no hope of catching up so we should just cancel it. However, if there is some risk that the task actually is running but the supervisor just doesn't know where yet, this killing might be over-eager. If that's the case I think it'd be an issue for a separate PR though. |
||
killTask(taskId); | ||
i.remove(); | ||
} | ||
} | ||
} | ||
|
@@ -1550,7 +1583,7 @@ public Map<Integer, Long> apply(List<Map<Integer, Long>> input) | |
final List<String> setEndOffsetTaskIds = ImmutableList.copyOf(taskGroup.taskIds()); | ||
|
||
if (setEndOffsetTaskIds.isEmpty()) { | ||
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId); | ||
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId); | ||
return null; | ||
} | ||
|
||
|
@@ -1561,11 +1594,15 @@ public Map<Integer, Long> apply(List<Map<Integer, Long>> input) | |
"Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]", | ||
endOffsets, | ||
taskGroup.sequenceOffsets.lastEntry().getValue(), | ||
groupId | ||
taskGroup.groupId | ||
); | ||
} | ||
|
||
log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets); | ||
log.info( | ||
"Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", | ||
taskGroup.groupId, | ||
endOffsets | ||
); | ||
for (final String taskId : setEndOffsetTaskIds) { | ||
setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize)); | ||
} | ||
|
@@ -1587,7 +1624,7 @@ public Map<Integer, Long> apply(List<Map<Integer, Long>> input) | |
} | ||
|
||
if (taskGroup.tasks.isEmpty()) { | ||
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId); | ||
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId); | ||
return null; | ||
} | ||
|
||
|
@@ -1627,11 +1664,15 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte | |
continue; | ||
} | ||
|
||
Iterator<Map.Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator(); | ||
Iterator<Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator(); | ||
while (iTask.hasNext()) { | ||
Map.Entry<String, TaskData> task = iTask.next(); | ||
final Entry<String, TaskData> entry = iTask.next(); | ||
final String taskId = entry.getKey(); | ||
final TaskData taskData = entry.getValue(); | ||
|
||
Preconditions.checkNotNull(taskData.status, "task[%s] has a null status", taskId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When will this get thrown and what will happen when it gets thrown? I'm wondering what the user experience is going to be like. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should never happen. NPE would throw if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. |
||
|
||
if (task.getValue().status.isFailure()) { | ||
if (taskData.status.isFailure()) { | ||
iTask.remove(); // remove failed task | ||
if (group.tasks.isEmpty()) { | ||
// if all tasks in the group have failed, just nuke all task groups with this partition set and restart | ||
|
@@ -1640,10 +1681,10 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte | |
} | ||
} | ||
|
||
if (task.getValue().status.isSuccess()) { | ||
if (taskData.status.isSuccess()) { | ||
// If one of the pending completion tasks was successful, stop the rest of the tasks in the group as | ||
// we no longer need them to publish their segment. | ||
log.info("Task [%s] completed successfully, stopping tasks %s", task.getKey(), group.taskIds()); | ||
log.info("Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds()); | ||
futures.add(stopTasksInGroup(group)); | ||
foundSuccess = true; | ||
toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups | ||
|
@@ -1714,6 +1755,8 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep | |
continue; | ||
} | ||
|
||
Preconditions.checkNotNull(taskData.status, "task[%s] has a null status", taskId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar question here -- when will this get thrown and what will happen when it gets thrown? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. This should never happen. NPE would throw if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. |
||
|
||
// remove failed tasks | ||
if (taskData.status.isFailure()) { | ||
iTasks.remove(); | ||
|
@@ -1741,7 +1784,7 @@ void createNewTasks() throws JsonProcessingException | |
taskGroups.entrySet() | ||
.stream() | ||
.filter(taskGroup -> taskGroup.getValue().tasks.size() < ioConfig.getReplicas()) | ||
.forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getKey())); | ||
.forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getValue())); | ||
|
||
// check that there is a current task group for each group of partitions in [partitionGroups] | ||
for (Integer groupId : partitionGroups.keySet()) { | ||
|
@@ -1757,6 +1800,7 @@ void createNewTasks() throws JsonProcessingException | |
) : Optional.absent()); | ||
|
||
final TaskGroup taskGroup = new TaskGroup( | ||
groupId, | ||
generateStartingOffsetsForPartitionGroup(groupId), | ||
minimumMessageTime, | ||
maximumMessageTime | ||
|
@@ -1984,8 +2028,12 @@ private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup) | |
|
||
final List<ListenableFuture<Void>> futures = Lists.newArrayList(); | ||
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) { | ||
if (!entry.getValue().status.isComplete()) { | ||
futures.add(stopTask(entry.getKey(), false)); | ||
final String taskId = entry.getKey(); | ||
final TaskData taskData = entry.getValue(); | ||
if (taskData.status == null) { | ||
killTask(taskId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is killing the task the right idea here? If we don't know its status is it safe to leave it alone? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The null There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, that sounds good. |
||
} else if (!taskData.status.isComplete()) { | ||
futures.add(stopTask(taskId, false)); | ||
} | ||
} | ||
|
||
|
@@ -2066,7 +2114,7 @@ private SupervisorReport<KafkaSupervisorReportPayload> generateReport(boolean in | |
for (TaskGroup taskGroup : taskGroups.values()) { | ||
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) { | ||
String taskId = entry.getKey(); | ||
DateTime startTime = entry.getValue().startTime; | ||
@Nullable DateTime startTime = entry.getValue().startTime; | ||
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets; | ||
Long remainingSeconds = null; | ||
if (startTime != null) { | ||
|
@@ -2093,7 +2141,7 @@ private SupervisorReport<KafkaSupervisorReportPayload> generateReport(boolean in | |
for (TaskGroup taskGroup : taskGroups) { | ||
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) { | ||
String taskId = entry.getKey(); | ||
DateTime startTime = entry.getValue().startTime; | ||
@Nullable DateTime startTime = entry.getValue().startTime; | ||
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets; | ||
Long remainingSeconds = null; | ||
if (taskGroup.completionTimeout != null) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be very surprising if this happened? Enough to stop the supervisor run (i.e.: probable bug)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should never happen and even if it occurs, the supervisor would kill the task of corresponding
taskId
and respawn the same task. Please check https://github.com/apache/incubator-druid/pull/6206/files/c46d4681c334709caa5cddbd0ce0c67a7d22eaad#diff-6eee87b3aa4eb3a516965fe6e93e25a4L1138.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks.