Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable auto kill segments by default #12187

Merged
merged 11 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -796,10 +796,10 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZK interaction code, however, is written in a way that doesn’t allow the Coordinator to know for a fact that it’s done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|PT300S|
|`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical process.|PT15M|
|`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earliest running/pending/waiting tasks. Once the created time is found, then for all dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configuration)), Coordinator will ask the Overlord to clean up the entries 1 day or more older than the found created time in the `pendingSegments` table. This will be done periodically based on `druid.coordinator.period.indexingPeriod` specified.|true|
|`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all unused segments except for the last `durationToRetain` period. Whitelist or All can be set via dynamic configuration `killAllDataSources` and `killDataSourceWhitelist` described later.|false|
|`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all unused segments except for the last `durationToRetain` period. A whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.|true|
|`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)|
|`druid.coordinator.kill.durationToRetain`| Do not kill unused segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)|
|`druid.coordinator.kill.maxSegments`|Kill at most n unused segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0|
|`druid.coordinator.kill.durationToRetain`| Do not kill unused segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on.|`P90D`|
|`druid.coordinator.kill.maxSegments`|Kill at most n unused segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on.|100|
|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy for the coordinator to use to distribute segments among the historicals. `cachingCost` is logically equivalent to `cost` but is more CPU-efficient on large clusters. `diskNormalized` weights the costs according to the servers' disk usage ratios - there are known issues with this strategy distributing segments unevenly across the cluster. `random` distributes segments among services randomly.|`cost`|
|`druid.coordinator.balancer.cachingCost.awaitInitialization`|Whether to wait for segment view initialization before creating the `cachingCost` balancing strategy. This property is enabled only when `druid.coordinator.balancer.strategy` is `cachingCost`. If set to 'true', the Coordinator will not start to assign segments, until the segment view is initialized. If set to 'false', the Coordinator will fallback to use the `cost` balancing strategy only if the segment view is not initialized yet. Notes, it may take much time to wait for the initialization since the `cachingCost` balancing strategy involves much computing to build itself.|false|
|`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the loadqueuepeon, which manages the load and drop of segments.|PT0.050S (50 ms)|
Expand Down Expand Up @@ -902,7 +902,6 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1|
|`emitBalancingStats`|Boolean flag for whether or not we should emit balancing stats. This is an expensive operation.|false|
|`killDataSourceWhitelist`|List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array.|none|
|`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false|
|`killPendingSegmentsSkipList`|List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array.|none|
|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 100. |100|
|`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none|
Expand Down
8 changes: 4 additions & 4 deletions docs/operations/clean-metadata-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ If you want to skip the details, check out the [example](#example) for configuri

Segment records and segments in deep storage become eligible for deletion when both of the following conditions hold:

- When they meet the eligibility requirement of kill task datasource configuration according to `killDataSourceWhitelist` and `killAllDataSources` set in the Coordinator dynamic configuration. See [Dynamic configuration](../configuration/index.md#dynamic-configuration).
- When they meet the eligibility requirement of kill task datasource configuration according to `killDataSourceWhitelist` set in the Coordinator dynamic configuration. See [Dynamic configuration](../configuration/index.md#dynamic-configuration).
- When the `durationToRetain` time has passed since their creation.

Kill tasks use the following configuration:
- `druid.coordinator.kill.on`: When `true`, enables the Coordinator to submit a kill task for unused segments, which deletes them completely from metadata store and from deep storage.
Only applies to the specified datasources in the dynamic configuration parameter `killDataSourceWhitelist`.
If `killDataSourceWhitelist` is not set or empty, `killAllDataSources` defaults to true so that kill tasks can be submitted for all datasources.
If `killDataSourceWhitelist` is not set or empty, then kill tasks can be submitted for all datasources.
- `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job to check for and delete eligible segments. Defaults to `P1D`. Must be greater than `druid.coordinator.period.indexingPeriod`.
- `druid.coordinator.kill.durationToRetain`: Defines the retention period in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after creation that segments become eligible for deletion.
- `druid.coordinator.kill.maxSegments`: Defines the maximum number of segments to delete per kill task.
Expand Down Expand Up @@ -189,8 +189,8 @@ druid.coordinator.period.metadataStoreManagementPeriod=P1H

# Set a kill task to poll every day to delete Segment records and segments
# in deep storage > 4 days old. When druid.coordinator.kill.on is set to true,
# you must set either killAllDataSources or killDataSourceWhitelist in the dynamic
# configuration. For this example, assume killAllDataSources is set to true.
# you can set killDataSourceWhitelist in the dynamic configuration to limit
# the datasources that can be killed.
# Required also for automated cleanup of rules and compaction configuration.

druid.coordinator.kill.on=true
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/docker/environment-configs/coordinator
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ druid_auth_basic_common_cacheDirectory=/tmp/authCache/coordinator
druid_auth_unsecuredPaths=["/druid/coordinator/v1/loadqueue"]
druid_server_https_crlPath=/tls/revocations.crl
druid_coordinator_period_indexingPeriod=PT180000S
# 2x indexing period so that kill period is valid
druid_coordinator_kill_period=PT360000S
druid_coordinator_period=PT1S
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
package org.apache.druid.server.coordinator;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;

Expand Down Expand Up @@ -62,11 +62,6 @@ public class CoordinatorDynamicConfig
private final int balancerComputeThreads;
private final boolean emitBalancingStats;

/**
* If true {@link KillUnusedSegments} sends kill tasks for unused segments in all data sources.
*/
private final boolean killUnusedSegmentsInAllDataSources;

/**
* List of specific data sources for which kill tasks are sent in {@link KillUnusedSegments}.
*/
Expand Down Expand Up @@ -129,9 +124,6 @@ public CoordinatorDynamicConfig(
// Keeping the legacy 'killDataSourceWhitelist' property name for backward compatibility. When the project is
// updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152
@JsonProperty("killDataSourceWhitelist") Object specificDataSourcesToKillUnusedSegmentsIn,
// Keeping the legacy 'killAllDataSources' property name for backward compatibility. When the project is
// updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152
@JsonProperty("killAllDataSources") boolean killUnusedSegmentsInAllDataSources,
// Type is Object here so that we can support both string and list as Coordinator console can not send array of
// strings in the update request, as well as for specificDataSourcesToKillUnusedSegmentsIn.
// Keeping the legacy 'killPendingSegmentsSkipList' property name for backward compatibility. When the project is
Expand Down Expand Up @@ -172,7 +164,6 @@ public CoordinatorDynamicConfig(
this.replicationThrottleLimit = replicationThrottleLimit;
this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
this.emitBalancingStats = emitBalancingStats;
this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources;
this.specificDataSourcesToKillUnusedSegmentsIn = parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn);
this.dataSourcesToNotKillStalePendingSegmentsIn =
parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn);
Expand All @@ -186,11 +177,6 @@ public CoordinatorDynamicConfig(
);
this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove;

if (this.killUnusedSegmentsInAllDataSources && !this.specificDataSourcesToKillUnusedSegmentsIn.isEmpty()) {
throw new IAE(
"can't have killUnusedSegmentsInAllDataSources and non-empty specificDataSourcesToKillUnusedSegmentsIn"
);
}
this.pauseCoordination = pauseCoordination;
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;

Expand Down Expand Up @@ -312,10 +298,10 @@ public Set<String> getSpecificDataSourcesToKillUnusedSegmentsIn()
return specificDataSourcesToKillUnusedSegmentsIn;
}

@JsonProperty("killAllDataSources")
@JsonIgnore
public boolean isKillUnusedSegmentsInAllDataSources()
{
return killUnusedSegmentsInAllDataSources;
return specificDataSourcesToKillUnusedSegmentsIn.isEmpty();
}

@JsonProperty("killPendingSegmentsSkipList")
Expand Down Expand Up @@ -398,7 +384,6 @@ public String toString()
", replicationThrottleLimit=" + replicationThrottleLimit +
", balancerComputeThreads=" + balancerComputeThreads +
", emitBalancingStats=" + emitBalancingStats +
", killUnusedSegmentsInAllDataSources=" + killUnusedSegmentsInAllDataSources +
", specificDataSourcesToKillUnusedSegmentsIn=" + specificDataSourcesToKillUnusedSegmentsIn +
", dataSourcesToNotKillStalePendingSegmentsIn=" + dataSourcesToNotKillStalePendingSegmentsIn +
", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue +
Expand Down Expand Up @@ -453,9 +438,6 @@ public boolean equals(Object o)
if (emitBalancingStats != that.emitBalancingStats) {
return false;
}
if (killUnusedSegmentsInAllDataSources != that.killUnusedSegmentsInAllDataSources) {
return false;
}
if (maxSegmentsInNodeLoadingQueue != that.maxSegmentsInNodeLoadingQueue) {
return false;
}
Expand Down Expand Up @@ -494,7 +476,6 @@ public int hashCode()
replicationThrottleLimit,
balancerComputeThreads,
emitBalancingStats,
killUnusedSegmentsInAllDataSources,
maxSegmentsInNodeLoadingQueue,
specificDataSourcesToKillUnusedSegmentsIn,
dataSourcesToNotKillStalePendingSegmentsIn,
Expand Down Expand Up @@ -523,7 +504,6 @@ public static class Builder
private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1;
private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false;
private static final boolean DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES = false;
private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100;
private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
private static final boolean DEFAULT_PAUSE_COORDINATION = false;
Expand All @@ -541,7 +521,6 @@ public static class Builder
private Boolean emitBalancingStats;
private Integer balancerComputeThreads;
private Object specificDataSourcesToKillUnusedSegmentsIn;
private Boolean killUnusedSegmentsInAllDataSources;
private Object dataSourcesToNotKillStalePendingSegmentsIn;
private Integer maxSegmentsInNodeLoadingQueue;
private Object decommissioningNodes;
Expand All @@ -568,7 +547,6 @@ public Builder(
@JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads,
@JsonProperty("emitBalancingStats") @Nullable Boolean emitBalancingStats,
@JsonProperty("killDataSourceWhitelist") @Nullable Object specificDataSourcesToKillUnusedSegmentsIn,
@JsonProperty("killAllDataSources") @Nullable Boolean killUnusedSegmentsInAllDataSources,
@JsonProperty("killPendingSegmentsSkipList") @Nullable Object dataSourcesToNotKillStalePendingSegmentsIn,
@JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue,
@JsonProperty("decommissioningNodes") @Nullable Object decommissioningNodes,
Expand All @@ -591,7 +569,6 @@ public Builder(
this.balancerComputeThreads = balancerComputeThreads;
this.emitBalancingStats = emitBalancingStats;
this.specificDataSourcesToKillUnusedSegmentsIn = specificDataSourcesToKillUnusedSegmentsIn;
this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources;
this.dataSourcesToNotKillStalePendingSegmentsIn = dataSourcesToNotKillStalePendingSegmentsIn;
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
this.decommissioningNodes = decommissioningNodes;
Expand Down Expand Up @@ -668,12 +645,6 @@ public Builder withSpecificDataSourcesToKillUnusedSegmentsIn(Set<String> dataSou
return this;
}

public Builder withKillUnusedSegmentsInAllDataSources(boolean killUnusedSegmentsInAllDataSources)
{
this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources;
return this;
}

public Builder withMaxSegmentsInNodeLoadingQueue(int maxSegmentsInNodeLoadingQueue)
{
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
Expand Down Expand Up @@ -727,9 +698,6 @@ public CoordinatorDynamicConfig build()
balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads,
emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS : emitBalancingStats,
specificDataSourcesToKillUnusedSegmentsIn,
killUnusedSegmentsInAllDataSources == null
? DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES
: killUnusedSegmentsInAllDataSources,
dataSourcesToNotKillStalePendingSegmentsIn,
maxSegmentsInNodeLoadingQueue == null
? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
Expand Down Expand Up @@ -765,9 +733,6 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
specificDataSourcesToKillUnusedSegmentsIn == null
? defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()
: specificDataSourcesToKillUnusedSegmentsIn,
killUnusedSegmentsInAllDataSources == null
? defaults.isKillUnusedSegmentsInAllDataSources()
: killUnusedSegmentsInAllDataSources,
dataSourcesToNotKillStalePendingSegmentsIn == null
? defaults.getDataSourcesToNotKillStalePendingSegmentsIn()
: dataSourcesToNotKillStalePendingSegmentsIn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ public abstract class DruidCoordinatorConfig
public abstract Duration getCoordinatorKillPeriod();

@Config("druid.coordinator.kill.durationToRetain")
@Default("PT-1s")
@Default("P90D")
public abstract Duration getCoordinatorKillDurationToRetain();

@Config("druid.coordinator.kill.maxSegments")
@Default("0")
@Default("100")
public abstract int getCoordinatorKillMaxSegments();

@Config("druid.coordinator.kill.supervisor.period")
Expand Down
Loading