Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Continuous auto kill #14831

Merged
merged 17 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical process.|PT15M|
|`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earliest running/pending/waiting tasks. Once the created time is found, then for all dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configuration)), Coordinator will ask the Overlord to clean up the entries 1 day or more older than the found created time in the `pendingSegments` table. This will be done periodically based on `druid.coordinator.period.indexingPeriod` specified.|true|
|`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, permanently delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. A whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.<br /><br />When `druid.coordinator.kill.on` is true, segments are eligible for permanent deletion once their data intervals are older than `druid.coordinator.kill.durationToRetain` relative to the current time. If a segment's data interval is older than this threshold at the time it is marked unused, it is eligible for permanent deletion immediately after being marked unused.|false|
|`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)|
|`druid.coordinator.kill.period`| The frequency of sending kill tasks to the indexing service. The value must be greater than or equal to `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 day)|
|`druid.coordinator.kill.durationToRetain`|Only applies if you set `druid.coordinator.kill.on` to `true`. This value is ignored if `druid.coordinator.kill.ignoreDurationToRetain` is `true`. Valid configurations must be a ISO8601 period. Druid will not kill unused segments whose interval end date is beyond `now - durationToRetain`. `durationToRetain` can be a negative ISO8601 period, which would result in `now - durationToRetain` to be in the future.<br /><br />Note that the `durationToRetain` parameter applies to the segment interval, not the time that the segment was last marked unused. For example, if `durationToRetain` is set to `P90D`, then a segment for a time chunk 90 days in the past is eligible for permanent deletion immediately after being marked unused.|`P90D`|
|`druid.coordinator.kill.ignoreDurationToRetain`|A way to override `druid.coordinator.kill.durationToRetain` and tell the coordinator that you do not care about the end date of unused segment intervals when it comes to killing them. If true, the coordinator considers all unused segments as eligible to be killed.|false|
|`druid.coordinator.kill.bufferPeriod`|The amount of time that a segment must be unused before it is able to be permanently removed from metadata and deep storage. This can serve as a buffer period to prevent data loss if data ends up being needed after being marked unused.|`P30D`|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,17 @@ Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasour
Set<String> retrieveAllDataSourceNames();

/**
* Returns top N unused segment intervals with the end time no later than the specified maxEndTime and
* used_status_last_updated time no later than maxLastUsedTime when ordered by segment start time, end time. Any segment having no
* used_status_last_updated time due to upgrade from legacy Druid means maxUsedFlagLastUpdatedTime is ignored for that segment.
* Returns top N unused segment intervals with the start time no earlier than the specified start time (if not null)
* and with the end time no later than the specified maxEndTime and with sed_status_last_updated time no later than
* maxLastUsedTime when ordered by segment start time, end time. Any segment having no used_status_last_updated time
* due to upgrade from legacy Druid means maxUsedFlagLastUpdatedTime is ignored for that segment.
*/
List<Interval> getUnusedSegmentIntervals(
String dataSource,
DateTime minStartTime,
DateTime maxEndTime,
int limit,
DateTime maxUsedFlagLastUpdatedTime
);
DateTime maxUsedFlagLastUpdatedTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: style

Suggested change
DateTime maxUsedFlagLastUpdatedTime);
DateTime maxUsedFlagLastUpdatedTime
);


@VisibleForTesting
void poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
Expand Down Expand Up @@ -1088,6 +1089,7 @@ private String getSegmentsTable()
@Override
public List<Interval> getUnusedSegmentIntervals(
final String dataSource,
@Nullable final DateTime minStartTime,
final DateTime maxEndTime,
final int limit,
DateTime maxUsedFlagLastUpdatedTime
Expand All @@ -1100,13 +1102,14 @@ public List<Interval> getUnusedSegmentIntervals(
@Override
public List<Interval> inTransaction(Handle handle, TransactionStatus status)
{
Iterator<Interval> iter = handle
final Query<Interval> sql = handle
.createQuery(
StringUtils.format(
"SELECT start, %2$send%2$s FROM %1$s WHERE dataSource = :dataSource AND "
+ "%2$send%2$s <= :end AND used = false AND used_status_last_updated IS NOT NULL AND used_status_last_updated <= :used_status_last_updated ORDER BY start, %2$send%2$s",
+ "%2$send%2$s <= :end AND used = false AND used_status_last_updated IS NOT NULL AND used_status_last_updated <= :used_status_last_updated %3$s ORDER BY start, %2$send%2$s",
getSegmentsTable(),
connector.getQuoteString()
connector.getQuoteString(),
null != minStartTime ? "AND start >= :start" : ""
)
)
.setFetchSize(connector.getStreamingFetchSize())
Expand All @@ -1126,8 +1129,12 @@ protected Interval mapInternal(int index, Map<String, Object> row)
);
}
}
)
.iterator();
);
if (null != minStartTime) {
sql.bind("start", minStartTime.toString());
}

Iterator<Interval> iter = sql.iterator();


List<Interval> result = Lists.newArrayListWithCapacity(limit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Completely removes information about unused segments who have an interval end that comes before
Expand All @@ -67,6 +69,12 @@ public class KillUnusedSegments implements CoordinatorDuty
private final long retainDuration;
private final boolean ignoreRetainDuration;
private final int maxSegmentsToKill;

/**
* Used to keep track of the last interval end time that was killed for each
* datasource.
*/
private final Map<String, DateTime> datasourceToLastKillIntervalEnd;
Comment on lines +72 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this approach is the best way. If the leader changes, the in-memory map is essentially wiped out when a new leader comes online, so it is possible to re-schedule an interval that is already in the process of being killed. So 2 questions on this

  1. What is the impact if the same interval is scheduled to be killed twice? Can we test / validate that nothing bad happens? If the task just waits on a lock and then succeeds as there is no data to delete, I think that is reasonable.
  2. Is it possible to get the list of intervals that are being killed from the kill task payloads? Would that put a lot of stress on the master nodes if we polled the task payloads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testing revealed that 1. is what happens, so thought it was no the end of the world that we still have potential of submitting duplicate tasks.

private long lastKillTime = 0;
private final long bufferPeriod;

Expand All @@ -82,8 +90,8 @@ public KillUnusedSegments(
{
this.period = config.getCoordinatorKillPeriod().getMillis();
Preconditions.checkArgument(
this.period > config.getCoordinatorIndexingPeriod().getMillis(),
"coordinator kill period must be greater than druid.coordinator.period.indexingPeriod"
this.period >= config.getCoordinatorIndexingPeriod().getMillis(),
"coordinator kill period must be greater than or equal to druid.coordinator.period.indexingPeriod"
);

this.ignoreRetainDuration = config.getCoordinatorKillIgnoreDurationToRetain();
Expand All @@ -100,6 +108,8 @@ public KillUnusedSegments(
this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments();
Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0");

datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>();

log.info(
"Kill Task scheduling enabled with period [%s], retainDuration [%s], bufferPeriod [%s], maxSegmentsToKill [%s]",
this.period,
Expand All @@ -115,12 +125,18 @@ public KillUnusedSegments(
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{

final long currentTimeMillis = System.currentTimeMillis();
if (lastKillTime + period > currentTimeMillis) {
log.debug("Skipping kill of unused segments as kill period has not elapsed yet.");
return params;
}

return runInternal(params);
}

@VisibleForTesting
DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams params)
{
TaskStats taskStats = new TaskStats();
Collection<String> dataSourcesToKill =
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
Expand All @@ -147,11 +163,14 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
}

log.debug("Killing unused segments in datasources: %s", dataSourcesToKill);
lastKillTime = currentTimeMillis;
lastKillTime = System.currentTimeMillis();
taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, availableKillTaskSlots);

}

// any datasources that are no longer being considered for kill should have their
// last kill interval removed from map.
datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
addStats(taskStats, stats);
return params;
}
Expand All @@ -175,13 +194,14 @@ private int killUnusedSegments(
if (0 < availableKillTaskSlots && !CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
for (String dataSource : dataSourcesToKill) {
if (submittedTasks >= availableKillTaskSlots) {
log.info(StringUtils.format(
log.debug(StringUtils.format(
"Submitted [%d] kill tasks and reached kill task slot limit [%d]. Will resume "
+ "on the next coordinator cycle.", submittedTasks, availableKillTaskSlots));
break;
}
final Interval intervalToKill = findIntervalForKill(dataSource);
if (intervalToKill == null) {
datasourceToLastKillIntervalEnd.remove(dataSource);
continue;
}

Expand All @@ -193,6 +213,7 @@ private int killUnusedSegments(
maxSegmentsToKill
), true);
++submittedTasks;
datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
}
catch (Exception ex) {
log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource);
Expand Down Expand Up @@ -233,7 +254,7 @@ private Interval findIntervalForKill(String dataSource)
: DateTimes.nowUtc().minus(retainDuration);

List<Interval> unusedSegmentIntervals = segmentsMetadataManager
.getUnusedSegmentIntervals(dataSource, maxEndTime, maxSegmentsToKill, DateTimes.nowUtc().minus(bufferPeriod));
.getUnusedSegmentIntervals(dataSource, datasourceToLastKillIntervalEnd.get(dataSource), maxEndTime, maxSegmentsToKill, DateTimes.nowUtc().minus(bufferPeriod));

if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
return null;
Expand All @@ -258,6 +279,13 @@ static int getKillTaskCapacity(int totalWorkerCapacity, double killTaskSlotRatio
return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio, 1.0)), maxKillTaskSlots);
}

@VisibleForTesting
Map<String, DateTime> getDatasourceToLastKillIntervalEnd()
{
return datasourceToLastKillIntervalEnd;
}


static class TaskStats
{
int availableTaskSlots;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,34 +425,42 @@ public void testGetUnusedSegmentIntervals() throws IOException

Assert.assertEquals(
ImmutableList.of(segment2.getInterval()),
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of("3000"), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", null, DateTimes.of("3000"), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
);

// Test the DateTime maxEndTime argument of getUnusedSegmentIntervals
Assert.assertEquals(
ImmutableList.of(segment2.getInterval()),
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", null, DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
);
Assert.assertEquals(
ImmutableList.of(segment1.getInterval()),
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 4, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
);
Assert.assertEquals(
ImmutableList.of(),
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
);

Assert.assertEquals(
ImmutableList.of(segment2.getInterval(), segment1.getInterval()),
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of("3000"), 5, DateTimes.COMPARE_DATE_AS_STRING_MAX)
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", null, DateTimes.of("3000"), 5, DateTimes.COMPARE_DATE_AS_STRING_MAX)
);

// Test a buffer period that should exclude some segments

// The wikipedia datasource has segments generated with last used time equal to roughly the time of test run. None of these segments should be selected with a bufer period of 1 day
Assert.assertEquals(
ImmutableList.of(),
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of("3000"), 5, DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5, DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
);

// One of the 3 segments in newDs has a null used_status_last_updated which should mean getUnusedSegmentIntervals never returns it
// One of the 3 segments in newDs has a used_status_last_updated older than 1 day which means it should also be returned
// The last of the 3 segemns in newDs has a used_status_last_updated date less than one day and should not be returned
Assert.assertEquals(
ImmutableList.of(newSegment2.getInterval()),
sqlSegmentsMetadataManager.getUnusedSegmentIntervals(newDs, DateTimes.of("3000"), 5, DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
sqlSegmentsMetadataManager.getUnusedSegmentIntervals(newDs, DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5, DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
);
}

Expand Down
Loading