Skip to content

Commit

Permalink
Fix NPE in KafkaSupervisor.checkpointTaskGroup (#6206) (#6245)
Browse files Browse the repository at this point in the history
* Fix NPE in KafkaSupervisor.checkpointTaskGroup

* address comments

* address comment
  • Loading branch information
gianm authored and fjy committed Aug 27, 2018
1 parent 3a63540 commit e8966a7
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public class KafkaSupervisor implements Supervisor
*/
private class TaskGroup
{
final int groupId;

// This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
// in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
// this task group has completed successfully, at which point this will be destroyed and a new task group will be
Expand All @@ -161,11 +163,13 @@ private class TaskGroup
final String baseSequenceName;

TaskGroup(
int groupId,
ImmutableMap<Integer, Long> partitionOffsets,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime
)
{
this.groupId = groupId;
this.partitionOffsets = partitionOffsets;
this.minimumMessageTime = minimumMessageTime;
this.maximumMessageTime = maximumMessageTime;
Expand All @@ -187,9 +191,21 @@ Set<String> taskIds()

private static class TaskData
{
@Nullable
volatile TaskStatus status;
@Nullable
volatile DateTime startTime;
volatile Map<Integer, Long> currentOffsets = new HashMap<>();

@Override
public String toString()
{
return "TaskData{" +
"status=" + status +
", startTime=" + startTime +
", currentOffsets=" + currentOffsets +
'}';
}
}

// Map<{group ID}, {actively reading task group}>; see documentation for TaskGroup class
Expand Down Expand Up @@ -697,8 +713,8 @@ public void handle() throws ExecutionException, InterruptedException, TimeoutExc
log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
return;
}
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroup, false).get();
taskGroup.addNewCheckpoint(newCheckpoint);
log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
}
}
Expand Down Expand Up @@ -764,10 +780,13 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
: currentMetadata.getKafkaPartitions()
.getPartitionOffsetMap()
.get(resetPartitionOffset.getKey());
final TaskGroup partitionTaskGroup = taskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey()));
if (partitionOffsetInMetadataStore != null ||
(partitionTaskGroup != null && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
.equals(resetPartitionOffset.getValue()))) {
final TaskGroup partitionTaskGroup = taskGroups.get(
getTaskGroupIdForPartition(resetPartitionOffset.getKey())
);
final boolean isSameOffset = partitionTaskGroup != null
&& partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
.equals(resetPartitionOffset.getValue());
if (partitionOffsetInMetadataStore != null || isSameOffset) {
doReset = true;
break;
}
Expand Down Expand Up @@ -991,7 +1010,7 @@ private void discoverTasks() throws ExecutionException, InterruptedException, Ti
List<String> futureTaskIds = Lists.newArrayList();
List<ListenableFuture<Boolean>> futures = Lists.newArrayList();
List<Task> tasks = taskStorage.getActiveTasks();
final Set<Integer> taskGroupsToVerify = new HashSet<>();
final Map<Integer, TaskGroup> taskGroupsToVerify = new HashMap<>();

for (Task task : tasks) {
if (!(task instanceof KafkaIndexTask) || !dataSource.equals(task.getDataSource())) {
Expand Down Expand Up @@ -1098,6 +1117,7 @@ public Boolean apply(KafkaIndexTask.Status status)
k -> {
log.info("Creating a new task group for taskGroupId[%d]", taskGroupId);
return new TaskGroup(
taskGroupId,
ImmutableMap.copyOf(
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()
),
Expand All @@ -1106,8 +1126,15 @@ public Boolean apply(KafkaIndexTask.Status status)
);
}
);
taskGroupsToVerify.add(taskGroupId);
taskGroup.tasks.putIfAbsent(taskId, new TaskData());
taskGroupsToVerify.put(taskGroupId, taskGroup);
final TaskData prevTaskGroup = taskGroup.tasks.putIfAbsent(taskId, new TaskData());
if (prevTaskGroup != null) {
throw new ISE(
"WTH? a taskGroup[%s] already exists for new task[%s]",
prevTaskGroup,
taskId
);
}
}
}
return true;
Expand Down Expand Up @@ -1135,7 +1162,7 @@ public Boolean apply(KafkaIndexTask.Status status)
log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", taskCount, dataSource);

// make sure the checkpoints are consistent with each other and with the metadata store
taskGroupsToVerify.forEach(this::verifyAndMergeCheckpoints);
taskGroupsToVerify.values().forEach(this::verifyAndMergeCheckpoints);
}

/**
Expand All @@ -1145,10 +1172,9 @@ public Boolean apply(KafkaIndexTask.Status status)
* 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly
* created tasks for the taskGroup start indexing from after the latest published offsets.
*/
private void verifyAndMergeCheckpoints(final Integer groupId)
private void verifyAndMergeCheckpoints(final TaskGroup taskGroup)
{
final TaskGroup taskGroup = taskGroups.get(groupId);

final int groupId = taskGroup.groupId;
// List<TaskId, Map -> {SequenceId, Checkpoints}>
final List<Pair<String, TreeMap<Integer, Map<Integer, Long>>>> taskSequences = new CopyOnWriteArrayList<>();
final List<ListenableFuture<TreeMap<Integer, Map<Integer, Long>>>> futures = new ArrayList<>();
Expand Down Expand Up @@ -1302,6 +1328,7 @@ private void addDiscoveredTaskToPendingCompletionTaskGroups(
// reading the minimumMessageTime & maximumMessageTime from the publishing task and setting it here is not necessary as this task cannot
// change to a state where it will read any more events
TaskGroup newTaskGroup = new TaskGroup(
groupId,
ImmutableMap.copyOf(startingPartitions),
Optional.<DateTime>absent(),
Optional.<DateTime>absent()
Expand Down Expand Up @@ -1339,8 +1366,8 @@ public Boolean apply(@Nullable DateTime startTime)
}

taskData.startTime = startTime;
long millisRemaining = ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis()
- taskData.startTime.getMillis());
long millisRemaining = ioConfig.getTaskDuration().getMillis() -
(System.currentTimeMillis() - taskData.startTime.getMillis());
if (millisRemaining > 0) {
scheduledExec.schedule(
buildRunTask(),
Expand Down Expand Up @@ -1393,7 +1420,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
// find the longest running task from this group
DateTime earliestTaskStart = DateTimes.nowUtc();
for (TaskData taskData : group.tasks.values()) {
if (earliestTaskStart.isAfter(taskData.startTime)) {
// startTime can be null if kafkaSupervisor is stopped gracefully before processing any runNotice
if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) {
earliestTaskStart = taskData.startTime;
}
}
Expand All @@ -1402,7 +1430,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration());
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(groupId, true));
futures.add(checkpointTaskGroup(group, true));
}
}

Expand Down Expand Up @@ -1440,10 +1468,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
}
}

private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final int groupId, final boolean finalize)
private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final TaskGroup taskGroup, final boolean finalize)
{
final TaskGroup taskGroup = taskGroups.get(groupId);

if (finalize) {
// 1) Check if any task completed (in which case we're done) and kill unassigned tasks
Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
Expand All @@ -1452,30 +1478,33 @@ private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final int group
String taskId = taskEntry.getKey();
TaskData task = taskEntry.getValue();

if (task.status.isSuccess()) {
// If any task in this group has already completed, stop the rest of the tasks in the group and return.
// This will cause us to create a new set of tasks next cycle that will start from the offsets in
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing
// failed and we need to re-ingest)
return Futures.transform(
stopTasksInGroup(taskGroup),
new Function<Object, Map<Integer, Long>>()
{
@Nullable
@Override
public Map<Integer, Long> apply(@Nullable Object input)
// task.status can be null if kafkaSupervisor is stopped gracefully before processing any runNotice.
if (task.status != null) {
if (task.status.isSuccess()) {
// If any task in this group has already completed, stop the rest of the tasks in the group and return.
// This will cause us to create a new set of tasks next cycle that will start from the offsets in
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if
// publishing failed and we need to re-ingest)
return Futures.transform(
stopTasksInGroup(taskGroup),
new Function<Object, Map<Integer, Long>>()
{
return null;
@Nullable
@Override
public Map<Integer, Long> apply(@Nullable Object input)
{
return null;
}
}
}
);
}
);
}

if (task.status.isRunnable()) {
if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
killTask(taskId);
i.remove();
if (task.status.isRunnable()) {
if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
killTask(taskId);
i.remove();
}
}
}
}
Expand Down Expand Up @@ -1522,7 +1551,7 @@ public Map<Integer, Long> apply(List<Map<Integer, Long>> input)
final List<String> setEndOffsetTaskIds = ImmutableList.copyOf(taskGroup.taskIds());

if (setEndOffsetTaskIds.isEmpty()) {
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
return null;
}

Expand All @@ -1533,11 +1562,15 @@ public Map<Integer, Long> apply(List<Map<Integer, Long>> input)
"Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]",
endOffsets,
taskGroup.sequenceOffsets.lastEntry().getValue(),
groupId
taskGroup.groupId
);
}

log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
log.info(
"Setting endOffsets for tasks in taskGroup [%d] to %s and resuming",
taskGroup.groupId,
endOffsets
);
for (final String taskId : setEndOffsetTaskIds) {
setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize));
}
Expand All @@ -1559,7 +1592,7 @@ public Map<Integer, Long> apply(List<Map<Integer, Long>> input)
}

if (taskGroup.tasks.isEmpty()) {
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
return null;
}

Expand Down Expand Up @@ -1599,11 +1632,15 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
continue;
}

Iterator<Map.Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
Iterator<Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
while (iTask.hasNext()) {
Map.Entry<String, TaskData> task = iTask.next();
final Entry<String, TaskData> entry = iTask.next();
final String taskId = entry.getKey();
final TaskData taskData = entry.getValue();

Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);

if (task.getValue().status.isFailure()) {
if (taskData.status.isFailure()) {
iTask.remove(); // remove failed task
if (group.tasks.isEmpty()) {
// if all tasks in the group have failed, just nuke all task groups with this partition set and restart
Expand All @@ -1612,10 +1649,10 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
}
}

if (task.getValue().status.isSuccess()) {
if (taskData.status.isSuccess()) {
// If one of the pending completion tasks was successful, stop the rest of the tasks in the group as
// we no longer need them to publish their segment.
log.info("Task [%s] completed successfully, stopping tasks %s", task.getKey(), group.taskIds());
log.info("Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds());
futures.add(stopTasksInGroup(group));
foundSuccess = true;
toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups
Expand Down Expand Up @@ -1686,6 +1723,8 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep
continue;
}

Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);

// remove failed tasks
if (taskData.status.isFailure()) {
iTasks.remove();
Expand Down Expand Up @@ -1713,7 +1752,7 @@ void createNewTasks() throws JsonProcessingException
taskGroups.entrySet()
.stream()
.filter(taskGroup -> taskGroup.getValue().tasks.size() < ioConfig.getReplicas())
.forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getKey()));
.forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getValue()));

// check that there is a current task group for each group of partitions in [partitionGroups]
for (Integer groupId : partitionGroups.keySet()) {
Expand All @@ -1729,6 +1768,7 @@ void createNewTasks() throws JsonProcessingException
) : Optional.<DateTime>absent());

final TaskGroup taskGroup = new TaskGroup(
groupId,
generateStartingOffsetsForPartitionGroup(groupId),
minimumMessageTime,
maximumMessageTime
Expand Down Expand Up @@ -1955,8 +1995,12 @@ private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup)

final List<ListenableFuture<Void>> futures = Lists.newArrayList();
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
if (!entry.getValue().status.isComplete()) {
futures.add(stopTask(entry.getKey(), false));
final String taskId = entry.getKey();
final TaskData taskData = entry.getValue();
if (taskData.status == null) {
killTask(taskId);
} else if (!taskData.status.isComplete()) {
futures.add(stopTask(taskId, false));
}
}

Expand Down Expand Up @@ -2033,7 +2077,7 @@ private KafkaSupervisorReport generateReport(boolean includeOffsets)
for (TaskGroup taskGroup : taskGroups.values()) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
DateTime startTime = entry.getValue().startTime;
@Nullable DateTime startTime = entry.getValue().startTime;
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
Long remainingSeconds = null;
if (startTime != null) {
Expand All @@ -2060,7 +2104,7 @@ private KafkaSupervisorReport generateReport(boolean includeOffsets)
for (TaskGroup taskGroup : taskGroups) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
DateTime startTime = entry.getValue().startTime;
@Nullable DateTime startTime = entry.getValue().startTime;
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
Long remainingSeconds = null;
if (taskGroup.completionTimeout != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public TaskReportData(
String id,
@Nullable Map<Integer, Long> startingOffsets,
@Nullable Map<Integer, Long> currentOffsets,
DateTime startTime,
@Nullable DateTime startTime,
Long remainingSeconds,
TaskType type,
@Nullable Map<Integer, Long> lag
Expand Down

0 comments on commit e8966a7

Please sign in to comment.