Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to limit the number of segments killed in kill task #14662

Merged
merged 26 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
142d548
* add a `maxSegmentsToKill` config property to `KillUnusedSegmentsTask`
zachjsh Jul 25, 2023
fa48bd0
* pipe `maxSegments` config in autokill to kill task submitted
zachjsh Jul 25, 2023
72e2a8b
* add coordinator dynamic config `killTaskSlotRatio` which allows
zachjsh Jul 25, 2023
4364f63
* fix CoordinatorDynamicConfig equals, toString, and hashCode methods
zachjsh Jul 25, 2023
c71c69f
* minimum of 1 task used for kill tasks with auto kill duty
zachjsh Jul 25, 2023
d9fb724
Merge remote-tracking branch 'apache/master' into auto-kill-improvements
zachjsh Jul 25, 2023
c90e804
* fixes after merge
zachjsh Jul 25, 2023
3f7ddc9
* fix failing tests
zachjsh Jul 26, 2023
be58f1c
* add @Nullable
zachjsh Jul 26, 2023
3a16585
* add more tests
zachjsh Jul 26, 2023
f08bf89
* add docs
zachjsh Jul 26, 2023
5509841
Merge remote-tracking branch 'apache/master' into auto-kill-improvements
zachjsh Jul 27, 2023
b98c30f
* fix docs
zachjsh Jul 27, 2023
e52410a
* add missing javadoc
zachjsh Jul 27, 2023
0347e08
* review comments
zachjsh Jul 27, 2023
3d33819
* add validation for killTaskSlotRatio
zachjsh Jul 27, 2023
ea99715
* fix failing test
zachjsh Jul 27, 2023
e15cfce
Apply suggestions from code review
zachjsh Jul 27, 2023
3cf2bd8
* address review comments
zachjsh Jul 27, 2023
d853a03
* propogate maxSegments config when retreiving unused segments
zachjsh Jul 28, 2023
2eccb33
* revert back to existing kill task with limit
zachjsh Aug 2, 2023
66153db
* revert killTaskSlotRatio
zachjsh Aug 2, 2023
3420936
Merge remote-tracking branch 'apache/master' into auto-kill-improvements
zachjsh Aug 2, 2023
48b4938
* fix failing tests
zachjsh Aug 3, 2023
c95f13b
* address review comments
zachjsh Aug 3, 2023
48e6fdc
* docs update
zachjsh Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ A sample Coordinator dynamic config JSON object is shown below:
"replicantLifetime": 15,
"replicationThrottleLimit": 10,
"killDataSourceWhitelist": ["wikipedia", "testDatasource"],
"killTaskSlotRatio": 0.10,
"decommissioningNodes": ["localhost:8182", "localhost:8282"],
"decommissioningMaxPercentOfMaxSegmentsToMove": 70,
"pauseCoordination": false,
Expand All @@ -955,6 +956,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`replicationThrottleLimit`|The maximum number of segment replicas that can be assigned to a historical tier in a single Coordinator run. This property prevents historicals from becoming overwhelmed when loading extra replicas of segments that are already available in the cluster.|500|
|`balancerComputeThreads`|Thread pool size for computing moving cost of segments during segment balancing. Consider increasing this if you have a lot of segments and moving segments begins to stall.|1|
|`killDataSourceWhitelist`|List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array.|none|
|`killTaskSlotRatio`| Ratio of total available task slots, including autoscaling if applicable that will be allowed for kill tasks. This limit only applies for kill tasks that are spawned automatically by the coordinator's auto kill duty, which is enabled when `druid.coordinator.kill.on` is true.| none - no limit |
zachjsh marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

@abhishekrb19 abhishekrb19 Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of dynamic configuration, should we have a dedicated auto-kill config similar to auto-compaction? I think that will ease config management a bit. We can then move all these kill* properties to the dedicated kill config and deprecate the existing dynamic kill config. What do y'all think?

Copy link
Contributor Author

@zachjsh zachjsh Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered this as well 😆 . I thought since other kill properties are already here at this level to expand and add more. I frankly didnt want to have to deal with the resolution of properties from multiple possible configs, deprecated and new, so decided against it. If you feel strongly let me know. And lets see what others think as well.

|`killPendingSegmentsSkipList`|List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array.|none|
|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments allowed in the load queue of any given server. Use this parameter to load segments faster if, for example, the cluster contains slow-loading nodes or if there are too many segments to be replicated to a particular node (when faster loading is preferred to better segments distribution). The optimal value depends on the loading speed of segments, acceptable replication time and number of nodes. |500|
|`useRoundRobinSegmentAssignment`|Boolean flag for whether segments should be assigned to historicals in a round robin fashion. When disabled, segment assignment is done using the chosen balancer strategy. When enabled, this can speed up segment assignments leaving balancing to move the segments to their optimal locations (based on the balancer strategy) lazily. |true|
Expand Down
8 changes: 7 additions & 1 deletion docs/data-management/delete.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,15 @@ The available grammar is:
"id": <task_id>,
"dataSource": <task_datasource>,
"interval" : <all_unused_segments_in_this_interval_will_die!>,
"context": <task context>
"context": <task context>,
"maxSegmentsToKill": <the maximum number of segents to delete>
zachjsh marked this conversation as resolved.
Show resolved Hide resolved
}
```

**WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and
deep storage. This operation cannot be undone.

Note: If `maxSegmentsToKill` is not specified, all matched segments are deleted. If `maxSegmentsToKill` is less than
the number of matching segments found, then only that number of matching segments will be deleted, but all matching
segments will still be marked unused, if specified to be. In this case, any remaining unused segments can be deleted
with a subsequent kill task issued, or via [Automated unused segment deletion](../operations/clean-metadata-store.md#segment-records-and-segments-in-deep-storage-kill-task)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -41,6 +42,8 @@
import org.joda.time.Interval;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -58,17 +61,20 @@
*/
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
public static final String TYPE = "kill";
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);

private final boolean markAsUnused;
@Nullable private final Integer maxSegmentsToKill;

@JsonCreator
public KillUnusedSegmentsTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("markAsUnused") Boolean markAsUnused
@JsonProperty("markAsUnused") Boolean markAsUnused,
@JsonProperty("maxSegmentsToKill") @Nullable Integer maxSegmentsToKill
)
{
super(
Expand All @@ -77,7 +83,12 @@ public KillUnusedSegmentsTask(
interval,
context
);
if (null != maxSegmentsToKill) {
Preconditions.checkArgument(maxSegmentsToKill > 0, "maxSegmentsToKill must be > 0");
}
zachjsh marked this conversation as resolved.
Show resolved Hide resolved
this.markAsUnused = markAsUnused != null && markAsUnused;
this.maxSegmentsToKill = maxSegmentsToKill;

}

@JsonProperty
Expand All @@ -87,6 +98,13 @@ public boolean isMarkAsUnused()
return markAsUnused;
}

@Nullable
@JsonProperty
public Integer getMaxSegmentsToKill()
{
return maxSegmentsToKill;
}

@Override
public String getType()
{
Expand Down Expand Up @@ -114,7 +132,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
}

// List unused segments
final List<DataSegment> unusedSegments = toolbox
List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval()));

Expand All @@ -128,6 +146,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
}

// Kill segments
unusedSegments = maxSegmentsToKill == null
? unusedSegments
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of pruning out the unused segments up here, have we considered pushing down the limit to the metadata query so we ask only what we want? I think that should considerably reduce the load on the metadata store, reduce the lock duration, etc, for unused segments that aren't killed in a run anyway. What do you think?

Copy link
Contributor Author

@zachjsh zachjsh Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was best to mark all segements in interval specified as unused. Seemed less expensive than actually deleting the segment from deep storage, and other things involved in killing the segment. I did consider this though. But with the limit also applied to marking unused, a user might need multiple requests to mark segments in interval as unused, where as if the limit is only applied to kill, then all segments in interval are marked unused, and if we hit limit on kill, auto kill will clean up eventually, if enabled. Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment was referring to something different, about adjusting RetrieveUnusedSegmentsAction(getDataSource(), getInterval())) so that it can accept a limit and return a smaller set of unused segments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! sorry, I misunderstood, fixing now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

@gianm gianm Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, kill tasks shouldn't mark stuff unused at all. It's best if there is a time delay between markings things unused and permanently deleting them. It's possible for people to make mistakes with what they delete, and we want the mistakes to be undoable for some period of time.

I think the best thing to do in general is actually deprecate the markUnused parameter, and focus on workflows that are like:

  • Users delete stuff by marking it unused
  • Later, the system deletes it permanently by way of autokill (which runs these kill tasks)

Btw, an important part of making this work is including a buffer period between when a segment is marked unused, and when autokill would remove it. This was discussed in #12526 although I am not sure what the current status is.

For this patch, IMO the best approach is to have the Coordinator send a list of segments to the kill task, rather than interval + maxSegmentsToKill. This will make it easier to implement the buffer period we need for autokill later— the Coordinator would be in charge of determining which specific segments are killable, and it would need the kill task to respect that specific list. This also side-steps the issue with markAsUnused, since the maxSegmentsToKill parameter would no longer exist.

Copy link
Contributor Author

@zachjsh zachjsh Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your input @gianm . This makes sense to me. I guess kill tasks will have another config property added, like segmentIds , and if that is set, the killTask will ignore the other config properties like interval and datasource ?

I guess one down side of this is that is makes the killTask harder to use by end users, as they now will need to know the ids of segments that they want to kill, if they want to do so manually. But I guess we will have the delete by interval fallback that exists now, if the user still wished to delete segments that way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new task type to kill unused segments by id.

: unusedSegments.subList(0, Math.min(maxSegmentsToKill, unusedSegments.size()));
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
for (DataSegment segment : unusedSegments) {
toolbox.getDataSegmentKiller().kill(segment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = "kill", value = KillUnusedSegmentsTask.class),
@Type(name = KillUnusedSegmentsTask.TYPE, value = KillUnusedSegmentsTask.class),
jasonk000 marked this conversation as resolved.
Show resolved Hide resolved
@Type(name = "move", value = MoveTask.class),
@Type(name = "archive", value = ArchiveTask.class),
@Type(name = "restore", value = RestoreTask.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
"killTaskId",
"datasource",
Intervals.of("2020-01-01/P1D"),
true
true,
5
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class);
Assert.assertEquals(taskQuery.getId(), fromJson.getId());
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(taskQuery.getMaxSegmentsToKill(), fromJson.getMaxSegmentsToKill());
}

@Test
Expand All @@ -69,7 +71,8 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
"datasource",
Intervals.of("2020-01-01/P1D"),
null,
true
true,
null
);
final byte[] json = objectMapper.writeValueAsBytes(task);
final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
Expand All @@ -80,5 +83,6 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
Assert.assertNull(task.getMaxSegmentsToKill());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public void testKill() throws Exception
DATA_SOURCE,
Intervals.of("2019-03-01/2019-04-01"),
null,
false
false,
null
);

Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
Expand Down Expand Up @@ -124,7 +125,8 @@ public void testKillWithMarkUnused() throws Exception
DATA_SOURCE,
Intervals.of("2019-03-01/2019-04-01"),
null,
true
true,
null
);

Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
Expand All @@ -151,7 +153,8 @@ public void testGetInputSourceResources()
DATA_SOURCE,
Intervals.of("2019-03-01/2019-04-01"),
null,
true
true,
null
);
Assert.assertTrue(task.getInputSourceResources().isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,8 @@ public DataSegment apply(String input)
"test_kill_task",
Intervals.of("2011-04-01/P4D"),
null,
false
false,
null
);

final TaskStatus status = runTask(killUnusedSegmentsTask);
Expand All @@ -967,6 +968,103 @@ public DataSegment apply(String input)
}
}

@Test
public void testKillUnusedSegmentsTaskWithMaxSegmentsToKill() throws Exception
{
final File tmpSegmentDir = temporaryFolder.newFolder();

List<DataSegment> expectedUnusedSegments = Lists.transform(
ImmutableList.of(
"2011-04-01/2011-04-02",
"2011-04-02/2011-04-03",
"2011-04-04/2011-04-05"
), new Function<String, DataSegment>()
{
@Override
public DataSegment apply(String input)
{
final Interval interval = Intervals.of(input);
try {
return DataSegment.builder()
.dataSource("test_kill_task")
.interval(interval)
.loadSpec(
ImmutableMap.of(
"type",
"local",
"path",
tmpSegmentDir.getCanonicalPath()
+ "/druid/localStorage/wikipedia/"
+ interval.getStart()
+ "-"
+ interval.getEnd()
+ "/"
+ "2011-04-6T16:52:46.119-05:00"
+ "/0/index.zip"
)
)
.version("2011-04-6T16:52:46.119-05:00")
.dimensions(ImmutableList.of())
.metrics(ImmutableList.of())
.shardSpec(NoneShardSpec.instance())
.binaryVersion(9)
.size(0)
.build();
}
catch (IOException e) {
throw new ISE(e, "Error creating segments");
}
}
}
);

mdc.setUnusedSegments(expectedUnusedSegments);

// manually create local segments files
List<File> segmentFiles = new ArrayList<>();
for (DataSegment segment : mdc.retrieveUnusedSegmentsForInterval("test_kill_task", Intervals.of("2011-04-01/P4D"))) {
File file = new File((String) segment.getLoadSpec().get("path"));
FileUtils.mkdirp(file.getParentFile());
Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY);
segmentFiles.add(file);
}

final int maxSegmentsToKill = 2;
final Task killUnusedSegmentsTask =
new KillUnusedSegmentsTask(
null,
"test_kill_task",
Intervals.of("2011-04-01/P4D"),
null,
false,
maxSegmentsToKill
);

final TaskStatus status = runTask(killUnusedSegmentsTask);
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", maxSegmentsToKill, mdc.getNuked().size());
Assert.assertTrue(
"expected unused segments get killed",
expectedUnusedSegments.containsAll(mdc.getNuked())
);

int expectedNumOfSegmentsRemaining = segmentFiles.size() - maxSegmentsToKill;
int actualNumOfSegmentsRemaining = 0;
for (File file : segmentFiles) {
if (file.exists()) {
actualNumOfSegmentsRemaining++;
}
}

Assert.assertEquals(
"Expected of segments deleted did not match expectations",
expectedNumOfSegmentsRemaining,
actualNumOfSegmentsRemaining
);
}

@Test
public void testRealtimeishTask() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.common.base.Preconditions;
import org.joda.time.Interval;

import javax.annotation.Nullable;

import java.util.Objects;

/**
Expand All @@ -39,19 +41,25 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
private final String dataSource;
private final Interval interval;
private final Boolean markAsUnused;
@Nullable private final Integer maxSegmentsToKill;

@JsonCreator
public ClientKillUnusedSegmentsTaskQuery(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("markAsUnused") Boolean markAsUnused
@JsonProperty("markAsUnused") Boolean markAsUnused,
@JsonProperty("maxSegmentsToKill") Integer maxSegmentsToKill
)
{
this.id = Preconditions.checkNotNull(id, "id");
this.dataSource = dataSource;
this.interval = interval;
this.markAsUnused = markAsUnused;
if (null != maxSegmentsToKill) {
Preconditions.checkArgument(maxSegmentsToKill > 0, "maxSegmentsToKill must be > 0");
}
this.maxSegmentsToKill = maxSegmentsToKill;
}

@JsonProperty
Expand Down Expand Up @@ -87,6 +95,14 @@ public Boolean getMarkAsUnused()
return markAsUnused;
}

@JsonProperty
@Nullable
public Integer getMaxSegmentsToKill()
{
return maxSegmentsToKill;
}


@Override
public boolean equals(Object o)
{
Expand All @@ -100,12 +116,13 @@ public boolean equals(Object o)
return Objects.equals(id, that.id)
&& Objects.equals(dataSource, that.dataSource)
&& Objects.equals(interval, that.interval)
&& Objects.equals(markAsUnused, that.markAsUnused);
&& Objects.equals(markAsUnused, that.markAsUnused)
&& Objects.equals(maxSegmentsToKill, that.maxSegmentsToKill);
}

@Override
public int hashCode()
{
return Objects.hash(id, dataSource, interval, markAsUnused);
return Objects.hash(id, dataSource, interval, markAsUnused, maxSegmentsToKill);
}
}
Loading