-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable Continuous auto kill #14831
Enable Continuous auto kill #14831
Changes from all commits
40da93e
55ae7bb
55f53ca
b598810
0e785e2
8696df1
0f6d795
96f1554
d444b71
ed0f11a
d0bbaa7
f20b77f
384b6d4
b50069b
3f9c050
9691435
e170d46
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,8 @@ | |
|
||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
/** | ||
* Completely removes information about unused segments who have an interval end that comes before | ||
|
@@ -67,6 +69,12 @@ public class KillUnusedSegments implements CoordinatorDuty | |
private final long retainDuration; | ||
private final boolean ignoreRetainDuration; | ||
private final int maxSegmentsToKill; | ||
|
||
/** | ||
* Used to keep track of the last interval end time that was killed for each | ||
* datasource. | ||
*/ | ||
private final Map<String, DateTime> datasourceToLastKillIntervalEnd; | ||
Comment on lines
+72
to
+77
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if this approach is the best way. If the leader changes, the in-memory map is essentially wiped out when a new leader comes online, so it is possible to re-schedule an interval that is already in the process of being killed. So 2 questions on this
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. testing revealed that 1. is what happens, so thought it was no the end of the world that we still have potential of submitting duplicate tasks. |
||
private long lastKillTime = 0; | ||
private final long bufferPeriod; | ||
|
||
|
@@ -82,8 +90,8 @@ public KillUnusedSegments( | |
{ | ||
this.period = config.getCoordinatorKillPeriod().getMillis(); | ||
Preconditions.checkArgument( | ||
this.period > config.getCoordinatorIndexingPeriod().getMillis(), | ||
"coordinator kill period must be greater than druid.coordinator.period.indexingPeriod" | ||
this.period >= config.getCoordinatorIndexingPeriod().getMillis(), | ||
"coordinator kill period must be greater than or equal to druid.coordinator.period.indexingPeriod" | ||
); | ||
|
||
this.ignoreRetainDuration = config.getCoordinatorKillIgnoreDurationToRetain(); | ||
|
@@ -100,6 +108,8 @@ public KillUnusedSegments( | |
this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments(); | ||
Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0"); | ||
|
||
datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>(); | ||
|
||
log.info( | ||
"Kill Task scheduling enabled with period [%s], retainDuration [%s], bufferPeriod [%s], maxSegmentsToKill [%s]", | ||
this.period, | ||
|
@@ -115,12 +125,18 @@ public KillUnusedSegments( | |
@Override | ||
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) | ||
{ | ||
|
||
final long currentTimeMillis = System.currentTimeMillis(); | ||
if (lastKillTime + period > currentTimeMillis) { | ||
log.debug("Skipping kill of unused segments as kill period has not elapsed yet."); | ||
return params; | ||
} | ||
|
||
return runInternal(params); | ||
} | ||
|
||
@VisibleForTesting | ||
DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams params) | ||
{ | ||
TaskStats taskStats = new TaskStats(); | ||
Collection<String> dataSourcesToKill = | ||
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn(); | ||
|
@@ -147,11 +163,14 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) | |
} | ||
|
||
log.debug("Killing unused segments in datasources: %s", dataSourcesToKill); | ||
lastKillTime = currentTimeMillis; | ||
lastKillTime = System.currentTimeMillis(); | ||
taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, availableKillTaskSlots); | ||
|
||
} | ||
|
||
// any datasources that are no longer being considered for kill should have their | ||
// last kill interval removed from map. | ||
datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill); | ||
addStats(taskStats, stats); | ||
return params; | ||
} | ||
|
@@ -175,13 +194,14 @@ private int killUnusedSegments( | |
if (0 < availableKillTaskSlots && !CollectionUtils.isNullOrEmpty(dataSourcesToKill)) { | ||
for (String dataSource : dataSourcesToKill) { | ||
if (submittedTasks >= availableKillTaskSlots) { | ||
log.info(StringUtils.format( | ||
log.debug(StringUtils.format( | ||
"Submitted [%d] kill tasks and reached kill task slot limit [%d]. Will resume " | ||
+ "on the next coordinator cycle.", submittedTasks, availableKillTaskSlots)); | ||
break; | ||
} | ||
final Interval intervalToKill = findIntervalForKill(dataSource); | ||
if (intervalToKill == null) { | ||
datasourceToLastKillIntervalEnd.remove(dataSource); | ||
continue; | ||
} | ||
|
||
|
@@ -193,6 +213,7 @@ private int killUnusedSegments( | |
maxSegmentsToKill | ||
), true); | ||
++submittedTasks; | ||
datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd()); | ||
} | ||
catch (Exception ex) { | ||
log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource); | ||
|
@@ -233,7 +254,7 @@ private Interval findIntervalForKill(String dataSource) | |
: DateTimes.nowUtc().minus(retainDuration); | ||
|
||
List<Interval> unusedSegmentIntervals = segmentsMetadataManager | ||
.getUnusedSegmentIntervals(dataSource, maxEndTime, maxSegmentsToKill, DateTimes.nowUtc().minus(bufferPeriod)); | ||
.getUnusedSegmentIntervals(dataSource, datasourceToLastKillIntervalEnd.get(dataSource), maxEndTime, maxSegmentsToKill, DateTimes.nowUtc().minus(bufferPeriod)); | ||
|
||
if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) { | ||
return null; | ||
|
@@ -258,6 +279,13 @@ static int getKillTaskCapacity(int totalWorkerCapacity, double killTaskSlotRatio | |
return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio, 1.0)), maxKillTaskSlots); | ||
} | ||
|
||
@VisibleForTesting | ||
Map<String, DateTime> getDatasourceToLastKillIntervalEnd() | ||
{ | ||
return datasourceToLastKillIntervalEnd; | ||
} | ||
|
||
|
||
static class TaskStats | ||
{ | ||
int availableTaskSlots; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: style