Skip to content

Commit

Permalink
Add support for intra.broker.goals in anomaly detection / self healing
Browse files Browse the repository at this point in the history
  • Loading branch information
ecojan committed Nov 17, 2021
1 parent 1290039 commit 70ce833
Show file tree
Hide file tree
Showing 26 changed files with 953 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,25 @@ public ClusterModel clusterModel(ModelCompletenessRequirements requirements,
return _loadMonitor.clusterModel(timeMs(), requirements, allowCapacityEstimation, operationProgress);
}

/**
* Get the cluster model cutting off at the current timestamp with replica placement info.
* @param requirements the model completeness requirements.
* @param allowCapacityEstimation whether allow capacity estimation in cluster model if the underlying live broker capacity is unavailable.
* @param operationProgress the progress of the job to report.
* @param populateReplicaPlacementInfo whether populate replica placement information.
* @return The cluster workload model.
* @throws NotEnoughValidWindowsException If there is not enough sample to generate cluster model.
* @throws TimeoutException If broker capacity resolver is unable to resolve broker capacity in time.
* @throws BrokerCapacityResolutionException If broker capacity resolver fails to resolve broker capacity.
*/
public ClusterModel clusterModel(ModelCompletenessRequirements requirements,
boolean allowCapacityEstimation,
OperationProgress operationProgress,
boolean populateReplicaPlacementInfo)
throws NotEnoughValidWindowsException, TimeoutException, BrokerCapacityResolutionException {
return _loadMonitor.clusterModel(timeMs(), requirements, allowCapacityEstimation, populateReplicaPlacementInfo, operationProgress);
}

/**
* Get the cluster model for a given time window.
* @param from the start time of the window
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.kafka.cruisecontrol.model.RawAndDerivedResource;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -168,6 +169,26 @@ public static List<Goal> getGoalsByPriority(KafkaCruiseControlConfig config) {
return config.getConfiguredInstances(AnalyzerConfig.DEFAULT_GOALS_CONFIG, Goal.class);
}

/**
* @param config The configurations for Cruise Control.
* @return The list of intra broker goals sorted by highest to lowest default priority.
*/
public static List<Goal> getIntraBrokerGoalsByPriority(KafkaCruiseControlConfig config) {
return config.getConfiguredInstances(AnalyzerConfig.INTRA_BROKER_GOALS_CONFIG, Goal.class);
}

/**
* @param goals The list of goals to execute the conversion for
* @return Converts a list of Goals to a list of Strings
*/
public static List<String> convertGoalsToString(List<Goal> goals) {
List<String> stringGoals = new ArrayList<>();
for (Goal goal : goals) {
stringGoals.add(goal.name());
}
return stringGoals;
}

/**
* @param config The configurations for Cruise Control.
* @return A goal map with goal name as the keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class GoalOptimizer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(GoalOptimizer.class);
private static final long HALF_MINUTE_IN_MS = TimeUnit.SECONDS.toMillis(30);
private final List<Goal> _goalsByPriority;
private final List<Goal> _intraBrokerGoals;
private final BalancingConstraint _balancingConstraint;
private final Pattern _defaultExcludedTopics;
private final LoadMonitor _loadMonitor;
Expand Down Expand Up @@ -101,6 +102,7 @@ public GoalOptimizer(KafkaCruiseControlConfig config,
Executor executor,
AdminClient adminClient) {
_goalsByPriority = AnalyzerUtils.getGoalsByPriority(config);
_intraBrokerGoals = AnalyzerUtils.getIntraBrokerGoalsByPriority(config);
_defaultModelCompletenessRequirements = MonitorUtils.combineLoadRequirementOptions(_goalsByPriority);
_requirementsWithAvailableValidWindows = new ModelCompletenessRequirements(
1,
Expand Down Expand Up @@ -136,7 +138,7 @@ public GoalOptimizer(KafkaCruiseControlConfig config,

@Override
public void run() {
// We need to get this thread so it can be interrupted if the cached proposal has been invalidated.
// We need to get this thread, so it can be interrupted if the cached proposal has been invalidated.
_proposalPrecomputingSchedulerThread = Thread.currentThread();
LOG.info("Starting proposal candidate computation.");
while (!_shutdown && _numPrecomputingThreads > 0) {
Expand Down Expand Up @@ -523,6 +525,24 @@ private void logProgress(boolean isSelfHeal,
LOG.trace("Proposals for {}{}.{}%n", isSelfHeal ? "self-healing " : "", goalName, proposals);
}

/**
* Checks if the list of Goals includes {@link #_intraBrokerGoals}
* @param goals The list of goals to look in
* @return return true if the list of goals contains an Intra Broker Goal
*/
protected boolean containsIntraBrokerGoal(List<Goal> goals) {
boolean result = false;
List<String> goalNames = AnalyzerUtils.convertGoalsToString(goals);

for (String goal : AnalyzerUtils.convertGoalsToString(_intraBrokerGoals)) {
if (goalNames.contains(goal)) {
result = true;
break;
}
}
return result;
}

private OptimizerResult updateCachedProposals(OptimizerResult result) {
synchronized (_cacheLock) {
_hasOngoingExplicitPrecomputation = false;
Expand Down Expand Up @@ -570,7 +590,15 @@ public void run() {
// We compute the proposal even if there is not enough modeled partitions.
ModelCompletenessRequirements requirements = _loadMonitor.meetCompletenessRequirements(_defaultModelCompletenessRequirements)
? _defaultModelCompletenessRequirements : _requirementsWithAvailableValidWindows;
ClusterModel clusterModel = _loadMonitor.clusterModel(_time.milliseconds(), requirements, _allowCapacityEstimation, operationProgress);

ClusterModel clusterModel = null;
// We check for Intra broker goals among Default goals - if we have intra broker goals, set replicaPlacementInfo to true
if (containsIntraBrokerGoal(_goalsByPriority)) {
clusterModel = _loadMonitor.clusterModel(_time.milliseconds(), requirements, _allowCapacityEstimation, true, operationProgress);
} else {
clusterModel = _loadMonitor.clusterModel(_time.milliseconds(), requirements, _allowCapacityEstimation, operationProgress);
}

if (!clusterModel.topics().isEmpty()) {
OptimizerResult result = optimizations(clusterModel, _goalsByPriority, operationProgress);
LOG.debug("Generated a proposal candidate in {} ms.", _time.milliseconds() - startMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ protected void rebalanceForBroker(Broker broker,
OptimizationOptions optimizationOptions) {
double upperLimit = _balanceUpperThresholdByBroker.get(broker);
double lowerLimit = _balanceLowerThresholdByBroker.get(broker);
LOG.debug("balancing broker {}", broker);
LOG.debug("List of broker disks is {}.", broker.disks());
for (Disk disk : broker.disks()) {
if (!disk.isAlive()) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,15 @@ private void sanityCheckGoalNames() {
AnomalyDetectorConfig.SELF_HEALING_GOALS_CONFIG, selfHealingGoalNames,
AnalyzerConfig.DEFAULT_GOALS_CONFIG, defaultGoalNames));
}

// Ensure that intra-broker goals used for self-healing are contained in intra broker goals.
List<String> selfHealingIntraBrokerGoalNames = getList(AnomalyDetectorConfig.SELF_HEALING_INTRA_BROKER_GOALS_CONFIG);
if (selfHealingIntraBrokerGoalNames.stream().anyMatch(g -> !intraBrokerGoalNames.contains(g))) {
throw new ConfigException(String.format("Attempt to configure self healing goals with unsupported goals (%s:%s and %s:%s).",
AnomalyDetectorConfig.SELF_HEALING_INTRA_BROKER_GOALS_CONFIG, selfHealingIntraBrokerGoalNames,
AnalyzerConfig.INTRA_BROKER_GOALS_CONFIG, intraBrokerGoalNames));
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
package com.linkedin.kafka.cruisecontrol.config.constants;

import com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.MinTopicLeadersPerBrokerGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal;
import com.linkedin.kafka.cruisecontrol.detector.BasicProvisioner;
import com.linkedin.kafka.cruisecontrol.detector.BrokerFailures;
import com.linkedin.kafka.cruisecontrol.detector.DiskFailures;
import com.linkedin.kafka.cruisecontrol.detector.GoalViolations;
import com.linkedin.kafka.cruisecontrol.detector.IntraBrokerGoalViolations;
import com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomaly;
import com.linkedin.kafka.cruisecontrol.detector.MaintenanceEvent;
import com.linkedin.kafka.cruisecontrol.detector.NoopMaintenanceEventReader;
Expand Down Expand Up @@ -65,6 +68,13 @@ public final class AnomalyDetectorConfig {
public static final String DEFAULT_GOAL_VIOLATIONS_CLASS = GoalViolations.class.getName();
public static final String GOAL_VIOLATIONS_CLASS_DOC = "The name of class that extends goal violations.";

/**
* <code>intra.broker.goal.violations.class</code>
*/
public static final String INTRA_BROKER_GOAL_VIOLATIONS_CLASS_CONFIG = "intra.broker.goal.violations.class";
public static final String DEFAULT_INTRA_BROKER_GOAL_VIOLATIONS_CLASS = IntraBrokerGoalViolations.class.getName();
public static final String INTRA_BROKER_GOAL_VIOLATIONS_CLASS_DOC = "The name of class that extends intra broker goal violations.";

/**
* <code>disk.failures.class</code>
*/
Expand All @@ -87,6 +97,14 @@ public final class AnomalyDetectorConfig {
public static final String SELF_HEALING_GOALS_DOC = "The list of goals to be used for self-healing relevant anomalies."
+ " If empty, uses the default.goals for self healing.";

/**
* <code>self.healing.goals</code>
*/
public static final String SELF_HEALING_INTRA_BROKER_GOALS_CONFIG = "self.healing.intra.broker.goals";
public static final List<String> DEFAULT_SELF_HEALING_INTRA_BROKER_GOALS = Collections.emptyList();
public static final String SELF_HEALING_INTRA_BROKER_GOALS_DOC = "The list of intra broker goals to be used for self-healing relevant anomalies."
+ " If empty, uses the default.goals for self healing.";

/**
* <code>anomaly.notifier.class</code>
*/
Expand All @@ -104,6 +122,17 @@ public final class AnomalyDetectorConfig {
.add(DiskCapacityGoal.class.getName()).toString();
public static final String ANOMALY_DETECTION_GOALS_DOC = "The goals that anomaly detector should detect if they are violated.";

/**
* <code>anomaly.detection.goals</code>
*/
public static final String ANOMALY_DETECTION_INTRA_BROKER_GOALS_CONFIG = "anomaly.detection.intra.broker.goals";
public static final String DEFAULT_ANOMALY_DETECTION_INTRA_BROKER_GOALS = new StringJoiner(",")
.add(IntraBrokerDiskUsageDistributionGoal.class.getName())
.add(IntraBrokerDiskCapacityGoal.class.getName())
.toString();
public static final String ANOMALY_DETECTION_INTRA_BROKER_GOALS_DOC = "The intra broker goals that anomaly detector "
+ "should detect if they are violated.";

/**
* <code>self.healing.exclude.recently.demoted.brokers</code>
*/
Expand Down Expand Up @@ -324,6 +353,11 @@ public static ConfigDef define(ConfigDef configDef) {
DEFAULT_GOAL_VIOLATIONS_CLASS,
ConfigDef.Importance.MEDIUM,
GOAL_VIOLATIONS_CLASS_DOC)
.define(INTRA_BROKER_GOAL_VIOLATIONS_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_INTRA_BROKER_GOAL_VIOLATIONS_CLASS,
ConfigDef.Importance.MEDIUM,
INTRA_BROKER_GOAL_VIOLATIONS_CLASS_DOC)
.define(DISK_FAILURES_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_DISK_FAILURES_CLASS,
Expand All @@ -339,6 +373,11 @@ public static ConfigDef define(ConfigDef configDef) {
DEFAULT_SELF_HEALING_GOALS,
ConfigDef.Importance.HIGH,
SELF_HEALING_GOALS_DOC)
.define(SELF_HEALING_INTRA_BROKER_GOALS_CONFIG,
ConfigDef.Type.LIST,
DEFAULT_SELF_HEALING_INTRA_BROKER_GOALS,
ConfigDef.Importance.HIGH,
SELF_HEALING_INTRA_BROKER_GOALS_DOC)
.define(ANOMALY_NOTIFIER_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_ANOMALY_NOTIFIER_CLASS,
Expand All @@ -349,6 +388,11 @@ public static ConfigDef define(ConfigDef configDef) {
DEFAULT_ANOMALY_DETECTION_GOALS,
ConfigDef.Importance.MEDIUM,
ANOMALY_DETECTION_GOALS_DOC)
.define(ANOMALY_DETECTION_INTRA_BROKER_GOALS_CONFIG,
ConfigDef.Type.LIST,
DEFAULT_ANOMALY_DETECTION_INTRA_BROKER_GOALS,
ConfigDef.Importance.MEDIUM,
ANOMALY_DETECTION_INTRA_BROKER_GOALS_DOC)
.define(SELF_HEALING_EXCLUDE_RECENTLY_DEMOTED_BROKERS_CONFIG,
ConfigDef.Type.BOOLEAN,
DEFAULT_SELF_HEALING_EXCLUDE_RECENT_BROKERS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import static com.linkedin.cruisecontrol.CruiseControlUtils.utcDateFor;
import static com.linkedin.kafka.cruisecontrol.detector.notifier.KafkaAnomalyType.GOAL_VIOLATION;
import static com.linkedin.kafka.cruisecontrol.detector.notifier.KafkaAnomalyType.INTRA_BROKER_GOAL_VIOLATION;


@JsonResponseClass
public class AnomalyDetails {
Expand Down Expand Up @@ -75,6 +77,15 @@ public Map<String, Object> populateAnomalyDetails() {
anomalyDetails.put(OPTIMIZATION_RESULT, goalViolations.optimizationResult(_isJson));
}
break;
case INTRA_BROKER_GOAL_VIOLATION:
IntraBrokerGoalViolations intraBrokerGoalViolations = (IntraBrokerGoalViolations) _anomalyState.anomaly();
Map<Boolean, List<String>> violatedIntraBrokerGoalsByFixability = intraBrokerGoalViolations.violatedGoalsByFixability();
anomalyDetails.put(FIXABLE_VIOLATED_GOALS, violatedIntraBrokerGoalsByFixability.getOrDefault(true, Collections.emptyList()));
anomalyDetails.put(UNFIXABLE_VIOLATED_GOALS, violatedIntraBrokerGoalsByFixability.getOrDefault(false, Collections.emptyList()));
if (_hasFixStarted) {
anomalyDetails.put(OPTIMIZATION_RESULT, intraBrokerGoalViolations.optimizationResult(_isJson));
}
break;
case BROKER_FAILURE:
BrokerFailures brokerFailures = (BrokerFailures) _anomalyState.anomaly();
anomalyDetails.put(FAILED_BROKERS_BY_TIME_MS, brokerFailures.failedBrokers());
Expand Down
Loading

0 comments on commit 70ce833

Please sign in to comment.