Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to limit the number of segments killed in kill task #14662

Merged
merged 26 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
142d548
* add a `maxSegmentsToKill` config property to `KillUnusedSegmentsTask`
zachjsh Jul 25, 2023
fa48bd0
* pipe `maxSegments` config in autokill to kill task submitted
zachjsh Jul 25, 2023
72e2a8b
* add coordinator dynamic config `killTaskSlotRatio` which allows
zachjsh Jul 25, 2023
4364f63
* fix CoordinatorDynamicConfig equals, toString, and hashCode methods
zachjsh Jul 25, 2023
c71c69f
* minimum of 1 task used for kill tasks with auto kill duty
zachjsh Jul 25, 2023
d9fb724
Merge remote-tracking branch 'apache/master' into auto-kill-improvements
zachjsh Jul 25, 2023
c90e804
* fixes after merge
zachjsh Jul 25, 2023
3f7ddc9
* fix failing tests
zachjsh Jul 26, 2023
be58f1c
* add @Nullable
zachjsh Jul 26, 2023
3a16585
* add more tests
zachjsh Jul 26, 2023
f08bf89
* add docs
zachjsh Jul 26, 2023
5509841
Merge remote-tracking branch 'apache/master' into auto-kill-improvements
zachjsh Jul 27, 2023
b98c30f
* fix docs
zachjsh Jul 27, 2023
e52410a
* add missing javadoc
zachjsh Jul 27, 2023
0347e08
* review comments
zachjsh Jul 27, 2023
3d33819
* add validation for killTaskSlotRatio
zachjsh Jul 27, 2023
ea99715
* fix failing test
zachjsh Jul 27, 2023
e15cfce
Apply suggestions from code review
zachjsh Jul 27, 2023
3cf2bd8
* address review comments
zachjsh Jul 27, 2023
d853a03
* propogate maxSegments config when retreiving unused segments
zachjsh Jul 28, 2023
2eccb33
* revert back to existing kill task with limit
zachjsh Aug 2, 2023
66153db
* revert killTaskSlotRatio
zachjsh Aug 2, 2023
3420936
Merge remote-tracking branch 'apache/master' into auto-kill-improvements
zachjsh Aug 2, 2023
48b4938
* fix failing tests
zachjsh Aug 3, 2023
c95f13b
* address review comments
zachjsh Aug 3, 2023
48e6fdc
* docs update
zachjsh Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions docs/data-management/delete.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,18 @@ The available grammar is:
"dataSource": <task_datasource>,
"interval" : <all_unused_segments_in_this_interval_will_die!>,
"context": <task context>,
"batchSize": <optional_batch size>
"batchSize": <optional_batch size>,
"limit": <the maximum number of segments to delete>
}
```

Some of the parameters used in the task payload are further explained below:

| Parameter |Default| Explanation |
|--------------|-------|--------------------------------------------------------------------------------------------------------|
| Parameter | Default | Explanation |
|-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `batchSize` |100 | Maximum number of segments that are deleted in one kill batch. Some operations on the Overlord may get stuck while a `kill` task is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, a `kill` task splits the list of unused segments to be deleted into smaller batches to yield the Overlord resources intermittently to other task operations.|
| `limit` | null - no limit | Maximum number of segments for the kill task to delete.|


**WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and
deep storage. This operation cannot be undone.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;

import javax.annotation.Nullable;

import java.util.List;

public class RetrieveUnusedSegmentsAction implements TaskAction<List<DataSegment>>
Expand All @@ -37,14 +39,19 @@ public class RetrieveUnusedSegmentsAction implements TaskAction<List<DataSegment
@JsonIgnore
private final Interval interval;

@JsonIgnore
private final Integer limit;

@JsonCreator
public RetrieveUnusedSegmentsAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
@JsonProperty("interval") Interval interval,
@JsonProperty("limit") @Nullable Integer limit
)
{
this.dataSource = dataSource;
this.interval = interval;
this.limit = limit;
}

@JsonProperty
Expand All @@ -59,6 +66,13 @@ public Interval getInterval()
return interval;
}

@Nullable
@JsonProperty
public Integer getLimit()
{
return limit;
}

@Override
public TypeReference<List<DataSegment>> getReturnTypeReference()
{
Expand All @@ -68,7 +82,8 @@ public TypeReference<List<DataSegment>> getReturnTypeReference()
@Override
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(dataSource, interval);
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUnusedSegmentsForInterval(dataSource, interval, limit);
}

@Override
Expand All @@ -83,6 +98,7 @@ public String toString()
return getClass().getSimpleName() + "{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", limit=" + limit +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval()));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskToolbox;
Expand All @@ -37,13 +37,16 @@
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -61,6 +64,7 @@
*/
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
public static final String TYPE = "kill";
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);

/**
Expand All @@ -74,14 +78,16 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;

private final boolean markAsUnused;

/**
* Split processing to try and keep each nuke operation relatively short, in the case that either
* the database or the storage layer is particularly slow.
*/
private final int batchSize;
@Nullable private final Integer limit;


// counter included primarily for testing
// counters included primarily for testing
private int numSegmentsKilled = 0;
private long numBatchesProcessed = 0;

@JsonCreator
Expand All @@ -90,8 +96,9 @@ public KillUnusedSegmentsTask(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("markAsUnused") Boolean markAsUnused,
@JsonProperty("batchSize") Integer batchSize
@JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this flag deprecated? Is the reason captured anywhere?

@JsonProperty("batchSize") Integer batchSize,
@JsonProperty("limit") @Nullable Integer limit
)
{
super(
Expand All @@ -103,6 +110,19 @@ public KillUnusedSegmentsTask(
this.markAsUnused = markAsUnused != null && markAsUnused;
this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
Preconditions.checkArgument(this.batchSize > 0, "batchSize should be greater than zero");
if (null != limit && limit <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can use Preconditions.checkArgument

throw InvalidInput.exception(
"limit [%d] is invalid. It must be a positive integer.",
limit
);
}
if (limit != null && markAsUnused != null && markAsUnused) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can use Preconditions.checkArgument

throw InvalidInput.exception(
"limit cannot be provided with markAsUnused.",
limit
);
}
this.limit = limit;
}

@JsonProperty
Expand All @@ -119,10 +139,17 @@ public int getBatchSize()
return batchSize;
}

@Nullable
@JsonProperty
public Integer getLimit()
{
return limit;
}

@Override
public String getType()
{
return "kill";
return TYPE;
}

@Nonnull
Expand All @@ -140,6 +167,13 @@ long getNumBatchesProcessed()
return numBatchesProcessed;
}

@JsonIgnore
@VisibleForTesting
long getNumSegmentsKilled()
{
return numSegmentsKilled;
}

@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
Expand All @@ -153,27 +187,29 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
}

// List unused segments
final List<DataSegment> allUnusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval()));

final List<List<DataSegment>> unusedSegmentBatches = Lists.partition(allUnusedSegments, batchSize);

// The individual activities here on the toolbox have possibility to run for a longer period of time,
// since they involve calls to metadata storage and archival object storage. And, the tasks take hold of the
// task lockbox to run. By splitting the segment list into smaller batches, we have an opportunity to yield the
// lock to other activity that might need to happen using the overlord tasklockbox.

LOG.info("Running kill task[%s] for dataSource[%s] and interval[%s]. Killing total [%,d] unused segments in [%d] batches(batchSize = [%d]).",
getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size(), batchSize);
int nextBatchSize = computeNextBatchSize(numSegmentsKilled);
@Nullable Integer numTotalBatches = getNumTotalBatches();
List<DataSegment> unusedSegments;
LOG.info(
"Starting kill with batchSize[%d], up to limit[%d] segments will be deleted%s",
batchSize,
limit,
numTotalBatches != null ? StringUtils.format(" in([%d] batches]).", numTotalBatches) : "."
);
do {
if (nextBatchSize <= 0) {
break;
}
unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));

for (final List<DataSegment> unusedSegments : unusedSegmentBatches) {
if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
);
}

Expand All @@ -186,19 +222,40 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);
numBatchesProcessed++;
numSegmentsKilled += unusedSegments.size();

if (numBatchesProcessed % 10 == 0) {
LOG.info("Processed [%d/%d] batches for kill task[%s].",
numBatchesProcessed, unusedSegmentBatches.size(), getId());
}
}
LOG.info("Processed [%d] batches for kill task[%s].", numBatchesProcessed, getId());

LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%,d] unused segments in [%d] batches.",
getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size());
nextBatchSize = computeNextBatchSize(numSegmentsKilled);
} while (unusedSegments.size() != 0 && (null == numTotalBatches || numBatchesProcessed < numTotalBatches));

LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%d] unused segments "
+ "in [%d] batches.",
getId(),
getDataSource(),
getInterval(),
numSegmentsKilled,
numBatchesProcessed
);

return TaskStatus.success(getId());
}

@JsonIgnore
@VisibleForTesting
@Nullable
Integer getNumTotalBatches()
{
return null != limit ? (int) Math.ceil((double) limit / batchSize) : null;
}

@JsonIgnore
@VisibleForTesting
int computeNextBatchSize(int numSegmentsKilled)
{
return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : batchSize;

Check failure

Code scanning / CodeQL

User-controlled data in arithmetic expression

This arithmetic expression depends on a [user-provided value](1), potentially causing an underflow.
jasonk000 marked this conversation as resolved.
Show resolved Hide resolved
}

private NavigableMap<DateTime, List<TaskLock>> getTaskLockMap(TaskActionClient client) throws IOException
{
final NavigableMap<DateTime, List<TaskLock>> taskLockMap = new TreeMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval()));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval()));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = "kill", value = KillUnusedSegmentsTask.class),
@Type(name = KillUnusedSegmentsTask.TYPE, value = KillUnusedSegmentsTask.class),
jasonk000 marked this conversation as resolved.
Show resolved Hide resolved
@Type(name = "move", value = MoveTask.class),
@Type(name = "archive", value = ArchiveTask.class),
@Type(name = "restore", value = RestoreTask.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testRetrieveUsedSegmentsAction()
@Test
public void testRetrieveUnusedSegmentsAction()
{
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL);
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null);
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUnusedSegments, resultSegments);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
"killTaskId",
"datasource",
Intervals.of("2020-01-01/P1D"),
true,
99
false,
99,
5
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class);
Expand All @@ -61,6 +62,8 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize()));
Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit());

}

@Test
Expand All @@ -71,6 +74,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault
"datasource",
Intervals.of("2020-01-01/P1D"),
true,
null,
null
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
Expand All @@ -80,6 +84,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(100, fromJson.getBatchSize());
Assert.assertNull(taskQuery.getLimit());
}

@Test
Expand All @@ -91,7 +96,8 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
Intervals.of("2020-01-01/P1D"),
null,
true,
99
99,
null
);
final byte[] json = objectMapper.writeValueAsBytes(task);
final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
Expand All @@ -103,5 +109,6 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
Assert.assertNull(task.getLimit());
}
}
Loading