Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve rolling Supervisor restarts at taskDuration #15859

Merged
merged 1 commit into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public void testSerdeWithDefaults() throws Exception
Assert.assertNull(config.getTopicPattern());
Assert.assertEquals(1, (int) config.getReplicas());
Assert.assertEquals(1, (int) config.getTaskCount());
Assert.assertNull(config.getStopTaskCount());
Assert.assertEquals((int) config.getTaskCount(), config.getMaxAllowedStops());
Assert.assertEquals(Duration.standardMinutes(60), config.getTaskDuration());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
Assert.assertEquals(100, config.getPollTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public void testSerdeWithDefaults() throws Exception
Assert.assertEquals(KinesisRegion.US_EAST_1.getEndpoint(), config.getEndpoint());
Assert.assertEquals(1, (int) config.getReplicas());
Assert.assertEquals(1, (int) config.getTaskCount());
Assert.assertNull(config.getStopTaskCount());
Assert.assertEquals((int) config.getTaskCount(), config.getMaxAllowedStops());
Assert.assertEquals(Duration.standardMinutes(60), config.getTaskDuration());
Assert.assertEquals(Duration.standardSeconds(5), config.getStartDelay());
Assert.assertEquals(Duration.standardSeconds(30), config.getPeriod());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3105,45 +3105,59 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> futures = new ArrayList<>();
final List<Integer> futureGroupIds = new ArrayList<>();

boolean stopTasksEarly = false;
boolean stopTasksEarly;
if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || earlyStopTime.isEqualNow())) {
log.info("Early stop requested - signalling tasks to complete");

earlyStopTime = null;
stopTasksEarly = true;
}

int stoppedTasks = 0;
for (Entry<Integer, TaskGroup> entry : activelyReadingTaskGroups.entrySet()) {
Integer groupId = entry.getKey();
TaskGroup group = entry.getValue();

if (stopTasksEarly) {
log.info("Stopping task group [%d] early. It has run for [%s]", groupId, ioConfig.getTaskDuration());
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
} else {
// find the longest running task from this group
DateTime earliestTaskStart = DateTimes.nowUtc();
for (TaskData taskData : group.tasks.values()) {
if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) {
earliestTaskStart = taskData.startTime;
}
}

if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
// if this task has run longer than the configured duration
// as long as the pending task groups are less than the configured stop task count.
if (pendingCompletionTaskGroups.values().stream().mapToInt(CopyOnWriteArrayList::size).sum() + stoppedTasks
< ioConfig.getStopTaskCount()) {
log.info("Task group [%d] has run for [%s]. Stopping.", groupId, ioConfig.getTaskDuration());
} else {
stopTasksEarly = false;
}

AtomicInteger stoppedTasks = new AtomicInteger();
// Sort task groups by start time to prioritize early termination of earlier groups, then iterate for processing
activelyReadingTaskGroups
.entrySet().stream().sorted(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Concurrency check: the stream change provide the same level of Concurrency guaranteed as the previous for loop.

Comparator.comparingLong(
(Entry<Integer, TaskGroup> entry) ->
computeEarliestTaskStartTime(entry.getValue())
.getMillis()))
.forEach(entry -> {
Integer groupId = entry.getKey();
TaskGroup group = entry.getValue();

if (stopTasksEarly) {
log.info(
"Stopping task group [%d] early. It has run for [%s]",
groupId,
ioConfig.getTaskDuration()
);
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
stoppedTasks++;
} else {
DateTime earliestTaskStart = computeEarliestTaskStartTime(group);

if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
// if this task has run longer than the configured duration
// as long as the pending task groups are less than the configured stop task count.
if (pendingCompletionTaskGroups.values()
.stream()
.mapToInt(CopyOnWriteArrayList::size)
.sum() + stoppedTasks.get()
< ioConfig.getMaxAllowedStops()) {
log.info(
"Task group [%d] has run for [%s]. Stopping.",
groupId,
ioConfig.getTaskDuration()
);
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
stoppedTasks.getAndIncrement();
}
}
}
}
}
}
});

List<Either<Throwable, Map<PartitionIdType, SequenceOffsetType>>> results = coalesceAndAwait(futures);
for (int j = 0; j < results.size(); j++) {
Expand Down Expand Up @@ -3200,6 +3214,15 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
}
}

private DateTime computeEarliestTaskStartTime(TaskGroup group)
{
return group.tasks.values().stream()
.filter(taskData -> taskData.startTime != null)
.map(taskData -> taskData.startTime)
.min(DateTime::compareTo)
.orElse(DateTimes.nowUtc());
}

private ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> checkpointTaskGroup(
final TaskGroup taskGroup,
final boolean finalize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ public abstract class SeekableStreamSupervisorIOConfig
private final Optional<DateTime> lateMessageRejectionStartDateTime;
@Nullable private final AutoScalerConfig autoScalerConfig;
@Nullable private final IdleConfig idleConfig;

private final int stopTaskCount;
@Nullable private final Integer stopTaskCount;

public SeekableStreamSupervisorIOConfig(
String stream,
Expand Down Expand Up @@ -81,8 +80,9 @@ public SeekableStreamSupervisorIOConfig(
} else {
this.taskCount = taskCount != null ? taskCount : 1;
}
this.stopTaskCount = stopTaskCount == null ? this.taskCount : stopTaskCount;
Preconditions.checkArgument(this.stopTaskCount > 0, "stopTaskCount must be greater than 0");
Preconditions.checkArgument(stopTaskCount == null || stopTaskCount > 0,
"stopTaskCount must be greater than 0");
this.stopTaskCount = stopTaskCount;
this.taskDuration = defaultDuration(taskDuration, "PT1H");
this.startDelay = defaultDuration(startDelay, "PT5S");
this.period = defaultDuration(period, "PT30S");
Expand Down Expand Up @@ -205,9 +205,15 @@ public IdleConfig getIdleConfig()
return idleConfig;
}

@Nullable
@JsonProperty
public int getStopTaskCount()
public Integer getStopTaskCount()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would be nice to add a javadoc here telling devs to use #getMaxAllowedStops instead.

{
return stopTaskCount;
}

public int getMaxAllowedStops()
{
return stopTaskCount == null ? taskCount : stopTaskCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,228 @@
Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
}

@Test(timeout = 60_000L)
public void testEarlyStoppingOfTaskGroupBasedOnStopTaskCount() throws InterruptedException, JsonProcessingException
{
// Assuming tasks have surpassed their duration limit at test execution
DateTime startTime = DateTimes.nowUtc().minusHours(2);
// Configure supervisor to stop only one task at a time
int stopTaskCount = 1;
SeekableStreamSupervisorIOConfig ioConfig = new SeekableStreamSupervisorIOConfig(
STREAM,
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
1,
3,
new Period("PT1H"),
new Period("PT1S"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
null,
new IdleConfig(true, 200L),
stopTaskCount
)
{
};

EasyMock.reset(spec);
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note test

Invoking
SeekableStreamSupervisorSpec.getDataSchema
should be avoided because it has been deprecated.
EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig()
{
@Override
public Duration getEmissionDuration()
{
return new Period("PT2S").toStandardDuration();
}
}).anyTimes();
EasyMock.expect(spec.getType()).andReturn("stream").anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

SeekableStreamIndexTaskTuningConfig taskTuningConfig = getTuningConfig().convertToTaskTuningConfig();

TreeMap<Integer, Map<String, Long>> sequenceOffsets = new TreeMap<>();
sequenceOffsets.put(0, ImmutableMap.of("0", 10L, "1", 20L));

Map<String, Object> context = new HashMap<>();
context.put("checkpoints", new ObjectMapper().writeValueAsString(sequenceOffsets));

TestSeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
"id1",
null,
getDataSchema(),
taskTuningConfig,
createTaskIoConfigExt(
0,
Collections.singletonMap("0", "10"),
Collections.singletonMap("0", "20"),
"test",
startTime,
null,
Collections.emptySet(),
ioConfig
),
context,
"0"
);

TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
"id2",
null,
getDataSchema(),
taskTuningConfig,
createTaskIoConfigExt(
1,
Collections.singletonMap("1", "10"),
Collections.singletonMap("1", "20"),
"test",
startTime,
null,
Collections.emptySet(),
ioConfig
),
context,
"1"
);

TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask(
"id3",
null,
getDataSchema(),
taskTuningConfig,
createTaskIoConfigExt(
2,
Collections.singletonMap("2", "10"),
Collections.singletonMap("2", "20"),
"test",
startTime,
null,
Collections.emptySet(),
ioConfig
),
context,
"2"
);

final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final TaskLocation location3 = TaskLocation.create("testHost3", 145, -1);

Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
workItems.add(new TestTaskRunnerWorkItem(id3, null, location3));

EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id2)).anyTimes();

EasyMock.reset(indexerMetadataStorageCoordinator);
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
.andReturn(new TestSeekableStreamDataSourceMetadata(null)).anyTimes();
EasyMock.expect(indexTaskClient.getStatusAsync("id1"))
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
.anyTimes();
EasyMock.expect(indexTaskClient.getStatusAsync("id2"))
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
.anyTimes();
EasyMock.expect(indexTaskClient.getStatusAsync("id3"))
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
.anyTimes();

EasyMock.expect(indexTaskClient.getStartTimeAsync("id1"))
.andReturn(Futures.immediateFuture(startTime.plusSeconds(1)))
.anyTimes();
// Mocking to return the earliest start time for task id2, indicating it's the first group to start
EasyMock.expect(indexTaskClient.getStartTimeAsync("id2"))
.andReturn(Futures.immediateFuture(startTime)).anyTimes();
EasyMock.expect(indexTaskClient.getStartTimeAsync("id3"))
.andReturn(Futures.immediateFuture(startTime.plusSeconds(2)))
.anyTimes();

ImmutableMap<String, String> partitionOffset = ImmutableMap.of("0", "10");
final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
checkpoints.put(0, partitionOffset);

EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.anyTimes();
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.anyTimes();
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id3"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.anyTimes();
EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id1", partitionOffset, false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id2", partitionOffset, false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id3", partitionOffset, false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.resumeAsync("id1"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.resumeAsync("id2"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.resumeAsync("id3"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.pauseAsync("id1"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.pauseAsync("id2"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.pauseAsync("id3"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();

// Expect the earliest-started task (id2) to transition to publishing first
taskQueue.shutdown("id2", "All tasks in group[%s] failed to transition to publishing state", 1);

replayAll();

SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

supervisor.start();
supervisor.runInternal();

supervisor.checkpoint(
0,
new TestSeekableStreamDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(STREAM, checkpoints.get(0), ImmutableSet.of())
)
);

while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}

verifyAll();

Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
}

@Test
public void testEmitBothLag() throws Exception
{
Expand Down
Loading