-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability to limit the number of segments killed in kill task #14662
Changes from all commits
142d548
fa48bd0
72e2a8b
4364f63
c71c69f
d9fb724
c90e804
3f7ddc9
be58f1c
3a16585
f08bf89
5509841
b98c30f
e52410a
0347e08
3d33819
ea99715
e15cfce
3cf2bd8
d853a03
2eccb33
66153db
3420936
48b4938
c95f13b
48e6fdc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,8 +26,8 @@ | |
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.collect.ImmutableSet; | ||
import com.google.common.collect.Lists; | ||
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery; | ||
import org.apache.druid.error.InvalidInput; | ||
import org.apache.druid.indexer.TaskStatus; | ||
import org.apache.druid.indexing.common.TaskLock; | ||
import org.apache.druid.indexing.common.TaskToolbox; | ||
|
@@ -37,13 +37,16 @@ | |
import org.apache.druid.indexing.common.actions.TaskActionClient; | ||
import org.apache.druid.indexing.common.actions.TaskLocks; | ||
import org.apache.druid.java.util.common.ISE; | ||
import org.apache.druid.java.util.common.StringUtils; | ||
import org.apache.druid.java.util.common.logger.Logger; | ||
import org.apache.druid.server.security.ResourceAction; | ||
import org.apache.druid.timeline.DataSegment; | ||
import org.joda.time.DateTime; | ||
import org.joda.time.Interval; | ||
|
||
import javax.annotation.Nonnull; | ||
import javax.annotation.Nullable; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.HashSet; | ||
|
@@ -61,6 +64,7 @@ | |
*/ | ||
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask | ||
{ | ||
public static final String TYPE = "kill"; | ||
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class); | ||
|
||
/** | ||
|
@@ -74,14 +78,16 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask | |
private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100; | ||
|
||
private final boolean markAsUnused; | ||
|
||
/** | ||
* Split processing to try and keep each nuke operation relatively short, in the case that either | ||
* the database or the storage layer is particularly slow. | ||
*/ | ||
private final int batchSize; | ||
@Nullable private final Integer limit; | ||
|
||
|
||
// counter included primarily for testing | ||
// counters included primarily for testing | ||
private int numSegmentsKilled = 0; | ||
private long numBatchesProcessed = 0; | ||
|
||
@JsonCreator | ||
|
@@ -90,8 +96,9 @@ public KillUnusedSegmentsTask( | |
@JsonProperty("dataSource") String dataSource, | ||
@JsonProperty("interval") Interval interval, | ||
@JsonProperty("context") Map<String, Object> context, | ||
@JsonProperty("markAsUnused") Boolean markAsUnused, | ||
@JsonProperty("batchSize") Integer batchSize | ||
@JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused, | ||
@JsonProperty("batchSize") Integer batchSize, | ||
@JsonProperty("limit") @Nullable Integer limit | ||
) | ||
{ | ||
super( | ||
|
@@ -103,6 +110,19 @@ public KillUnusedSegmentsTask( | |
this.markAsUnused = markAsUnused != null && markAsUnused; | ||
this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE; | ||
Preconditions.checkArgument(this.batchSize > 0, "batchSize should be greater than zero"); | ||
if (null != limit && limit <= 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can use Preconditions.checkArgument |
||
throw InvalidInput.exception( | ||
"limit [%d] is invalid. It must be a positive integer.", | ||
limit | ||
); | ||
} | ||
if (limit != null && markAsUnused != null && markAsUnused) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can use Preconditions.checkArgument |
||
throw InvalidInput.exception( | ||
"limit cannot be provided with markAsUnused.", | ||
limit | ||
); | ||
} | ||
this.limit = limit; | ||
} | ||
|
||
@JsonProperty | ||
|
@@ -119,10 +139,17 @@ public int getBatchSize() | |
return batchSize; | ||
} | ||
|
||
@Nullable | ||
@JsonProperty | ||
public Integer getLimit() | ||
{ | ||
return limit; | ||
} | ||
|
||
@Override | ||
public String getType() | ||
{ | ||
return "kill"; | ||
return TYPE; | ||
} | ||
|
||
@Nonnull | ||
|
@@ -140,6 +167,13 @@ long getNumBatchesProcessed() | |
return numBatchesProcessed; | ||
} | ||
|
||
@JsonIgnore | ||
@VisibleForTesting | ||
long getNumSegmentsKilled() | ||
{ | ||
return numSegmentsKilled; | ||
} | ||
|
||
@Override | ||
public TaskStatus runTask(TaskToolbox toolbox) throws Exception | ||
{ | ||
|
@@ -153,27 +187,29 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception | |
} | ||
|
||
// List unused segments | ||
final List<DataSegment> allUnusedSegments = toolbox | ||
.getTaskActionClient() | ||
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval())); | ||
|
||
final List<List<DataSegment>> unusedSegmentBatches = Lists.partition(allUnusedSegments, batchSize); | ||
|
||
// The individual activities here on the toolbox have possibility to run for a longer period of time, | ||
// since they involve calls to metadata storage and archival object storage. And, the tasks take hold of the | ||
// task lockbox to run. By splitting the segment list into smaller batches, we have an opportunity to yield the | ||
// lock to other activity that might need to happen using the overlord tasklockbox. | ||
|
||
LOG.info("Running kill task[%s] for dataSource[%s] and interval[%s]. Killing total [%,d] unused segments in [%d] batches(batchSize = [%d]).", | ||
getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size(), batchSize); | ||
int nextBatchSize = computeNextBatchSize(numSegmentsKilled); | ||
@Nullable Integer numTotalBatches = getNumTotalBatches(); | ||
List<DataSegment> unusedSegments; | ||
LOG.info( | ||
"Starting kill with batchSize[%d], up to limit[%d] segments will be deleted%s", | ||
batchSize, | ||
limit, | ||
numTotalBatches != null ? StringUtils.format(" in([%d] batches]).", numTotalBatches) : "." | ||
); | ||
do { | ||
if (nextBatchSize <= 0) { | ||
break; | ||
} | ||
unusedSegments = toolbox | ||
.getTaskActionClient() | ||
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize)); | ||
|
||
for (final List<DataSegment> unusedSegments : unusedSegmentBatches) { | ||
if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) { | ||
throw new ISE( | ||
"Locks[%s] for task[%s] can't cover segments[%s]", | ||
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()), | ||
getId(), | ||
unusedSegments | ||
"Locks[%s] for task[%s] can't cover segments[%s]", | ||
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()), | ||
getId(), | ||
unusedSegments | ||
); | ||
} | ||
|
||
|
@@ -186,19 +222,40 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception | |
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); | ||
toolbox.getDataSegmentKiller().kill(unusedSegments); | ||
numBatchesProcessed++; | ||
numSegmentsKilled += unusedSegments.size(); | ||
|
||
if (numBatchesProcessed % 10 == 0) { | ||
LOG.info("Processed [%d/%d] batches for kill task[%s].", | ||
numBatchesProcessed, unusedSegmentBatches.size(), getId()); | ||
} | ||
} | ||
LOG.info("Processed [%d] batches for kill task[%s].", numBatchesProcessed, getId()); | ||
|
||
LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%,d] unused segments in [%d] batches.", | ||
getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size()); | ||
nextBatchSize = computeNextBatchSize(numSegmentsKilled); | ||
} while (unusedSegments.size() != 0 && (null == numTotalBatches || numBatchesProcessed < numTotalBatches)); | ||
|
||
LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%d] unused segments " | ||
+ "in [%d] batches.", | ||
getId(), | ||
getDataSource(), | ||
getInterval(), | ||
numSegmentsKilled, | ||
numBatchesProcessed | ||
); | ||
|
||
return TaskStatus.success(getId()); | ||
} | ||
|
||
@JsonIgnore | ||
@VisibleForTesting | ||
@Nullable | ||
Integer getNumTotalBatches() | ||
{ | ||
return null != limit ? (int) Math.ceil((double) limit / batchSize) : null; | ||
} | ||
|
||
@JsonIgnore | ||
@VisibleForTesting | ||
int computeNextBatchSize(int numSegmentsKilled) | ||
{ | ||
return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : batchSize; | ||
Check failure Code scanning / CodeQL User-controlled data in arithmetic expression
This arithmetic expression depends on a [user-provided value](1), potentially causing an underflow.
jasonk000 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
private NavigableMap<DateTime, List<TaskLock>> getTaskLockMap(TaskActionClient client) throws IOException | ||
{ | ||
final NavigableMap<DateTime, List<TaskLock>> taskLockMap = new TreeMap<>(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this flag deprecated? Is the reason captured anywhere?