-
Notifications
You must be signed in to change notification settings - Fork 603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for intra.broker.goals in anomaly detection / self healing #1721
base: migrate_to_kafka_2_4
Are you sure you want to change the base?
Add support for intra.broker.goals in anomaly detection / self healing #1721
Conversation
*/ | ||
public GoalBasedOperationRunnable(KafkaCruiseControl kafkaCruiseControl, | ||
OperationFuture future, | ||
List<String> goals, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see dryRun
boolean in other version of constructor. Do we need to support dryRun here too ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not, that one is used for a user request (ex. request coming from UI for example)
/** | ||
* Constructor to be used for creating a runnable for self-healing. | ||
*/ | ||
public GoalBasedOperationRunnable(KafkaCruiseControl kafkaCruiseControl, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be great to have same order of parameters as in other constructor.
@@ -252,6 +257,20 @@ private void alertGoalViolation(AnomalyType anomalyType, final String localHostn | |||
} | |||
} | |||
|
|||
private void alertIntraBrokerGoalViolation(AnomalyType anomalyType, final String localHostname, | |||
List<AlertaMessage> alertaMessages, IntraBrokerGoalViolations goalViolations) { | |||
Map<Boolean, List<String>> violations = goalViolations.violatedGoalsByFixability(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is some duplication in logic for this and method above. WDYT about extracting logic into private method and calling with required params ?
* This class will be scheduled to run periodically to check if the given goals are violated or not. An alert will be | ||
* triggered if one of the goals is not met. | ||
*/ | ||
public class IntraBrokerGoalViolationDetector extends AbstractAnomalyDetector implements Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a lot of logic in common with GoalViolationDetector
class. wdyt about extracting into a common(abstract) class and then override only necessary pieces ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracted this into a BaseGoalViolationDetector class, please have a look at let me know what you think.
70ce833
to
0278353
Compare
cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/GoalOptimizer.java
Outdated
Show resolved
Hide resolved
e47231b
to
6516586
Compare
Hi @efeg can you do a review of this PR please? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution, @ecojan! I haven't finished reviewing all the PR, but do let me know what you think about refactoring AbstractGoalViolations
.
* @throws TimeoutException If broker capacity resolver is unable to resolve broker capacity in time. | ||
* @throws BrokerCapacityResolutionException If broker capacity resolver fails to resolve broker capacity. | ||
*/ | ||
public ClusterModel clusterModel(long now, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public ClusterModel clusterModel(long now, | |
public ClusterModel clusterModel(long nowMs, |
@@ -443,6 +443,37 @@ public ClusterModel clusterModel(long nowMs, | |||
return clusterModel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return clusterModel; | |
return clusterModel(nowMs, requirements, allowCapacityEstimation, false, operationProgress); |
Would something go wrong if we dedup this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% sure on this one,
We have already generated an initial cluster model in this Constructor. (see line 435)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I wasn't clear. Github doesn't allow me to drag L435-442.
How about we delete L435-442, and replace L443 with my suggested change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it! Updated this bit
boolean allowCapacityEstimation, | ||
boolean populateReplicaPlacementInfo, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the order align with line 540.
boolean allowCapacityEstimation, | |
boolean populateReplicaPlacementInfo, | |
boolean populateReplicaPlacementInfo, | |
boolean allowCapacityEstimation, |
* @param allowCapacityEstimation whether allow capacity estimation in cluster model if the underlying live broker capacity is unavailable. | ||
* @param populateReplicaPlacementInfo whether populate replica placement information. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the order align with line 540.
boolean allowCapacityEstimation, | ||
OperationProgress operationProgress, | ||
boolean populateReplicaPlacementInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the order align with line 540.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ended up removing this Constructor since we have one already with the same signature.
_ignoreProposalCache = ignoreProposalCache; | ||
_destinationBrokerIds = SELF_HEALING_DESTINATION_BROKER_IDS; | ||
_isRebalanceDiskMode = isRebalanceDiskMode; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra whitespace
boolean isRebalanceDiskMode, | ||
boolean ignoreProposalCache, | ||
boolean skipHardGoalCheck, | ||
boolean stopOngoingExecution) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boolean isRebalanceDiskMode, | |
boolean ignoreProposalCache, | |
boolean skipHardGoalCheck, | |
boolean stopOngoingExecution) { | |
boolean stopOngoingExecution, | |
boolean skipHardGoalCheck, | |
boolean ignoreProposalCache, | |
boolean isRebalanceDiskMode) { |
Good to keep same parameter order as GoalBasedOperationRunnable.java line 103.
Also adding new parameters to the end, according to existing assigning order.
|
||
|
||
/** | ||
* A class that holds all the goal violations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* A class that holds all the goal violations. | |
* A class that holds all the intra broker goal violations. |
/** | ||
* A class that holds all the goal violations. | ||
*/ | ||
public class IntraBrokerGoalViolations extends KafkaAnomaly { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same comment as there a lot of dup between this class and GoalViolations
. How about making another abstract class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, will create an abstract class.
case INTRA_BROKER_GOAL_VIOLATION: | ||
IntraBrokerGoalViolations intraBrokerGoalViolations = (IntraBrokerGoalViolations) _anomalyState.anomaly(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can combine both cases once we have the abstract class?
case INTRA_BROKER_GOAL_VIOLATION: | |
IntraBrokerGoalViolations intraBrokerGoalViolations = (IntraBrokerGoalViolations) _anomalyState.anomaly(); | |
case GOAL_VIOLATION: | |
case INTRA_BROKER_GOAL_VIOLATION: | |
IntraBrokerGoalViolations intraBrokerGoalViolations = (AbstractGoalViolations) _anomalyState.anomaly(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @zornhsu , returned a bit to this.
My small concern on this is that it's used in other parts of the application and I would avoid obfuscating the difference between the two, maybe reuse the logic but keep them separate. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ecojan, I think what zorn suggested makes more sense. Do you have an example that merging them would cause confusions?
Hey @zornhsu thanks for the review, will try and get around to it the following days |
6516586
to
37d010b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -443,6 +443,37 @@ public ClusterModel clusterModel(long nowMs, | |||
return clusterModel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I wasn't clear. Github doesn't allow me to drag L435-442.
How about we delete L435-442, and replace L443 with my suggested change?
Hey @zornhsu sorry for not replying to the initial question, yes I makes perfect sense to refactor AbstractGoalViolations. Have a few changes locally already. |
Just stumbled across this and saw that this would be really useful with some of the JBOD Kafka clusters our team is currently managing. Is there anything else (besides cruise-control UI stuff) preventing this from being merged in (vs ppl who want to use it having to maintain an ongoing fork)? |
@chen-anders all that's left is perhaps some small modifications and cleanup + another round of reviews |
…r intra broker goals
…rokerGoalViolations to extend it
27e433b
to
05ff397
Compare
Did a rebase with the latest changes from the default branch. |
@CCisGG I see you are an active maintainer lately - is this something you wanna help us review please? thanks! |
@@ -553,7 +574,11 @@ public void run() { | |||
// We compute the proposal even if there is not enough modeled partitions. | |||
ModelCompletenessRequirements requirements = _loadMonitor.meetCompletenessRequirements(_defaultModelCompletenessRequirements) | |||
? _defaultModelCompletenessRequirements : _requirementsWithAvailableValidWindows; | |||
ClusterModel clusterModel = _loadMonitor.clusterModel(_time.milliseconds(), requirements, _allowCapacityEstimation, operationProgress); | |||
|
|||
// We check for Intra broker goals among Default goals - if we have intra broker goals, set replicaPlacementInfo to true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "set replicaPlacementInfo to true" -> "set populateReplicaPlacementInfo to true".
@@ -15,6 +15,8 @@ | |||
|
|||
import static com.linkedin.cruisecontrol.CruiseControlUtils.utcDateFor; | |||
import static com.linkedin.kafka.cruisecontrol.detector.notifier.KafkaAnomalyType.GOAL_VIOLATION; | |||
import static com.linkedin.kafka.cruisecontrol.detector.notifier.KafkaAnomalyType.INTRA_BROKER_GOAL_VIOLATION; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: redundant new lines. Just curious did you enable code style check? The code style should force to avoid 2 consecutive blank lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did, checkstyle passed (running the lastest version that exists in the default branch as well). Also, build process has a checkStyle
task in it as well and I noticed this is a recurring thing in other java files as well (just following the status quo).
Let me know if it should be different.
case INTRA_BROKER_GOAL_VIOLATION: | ||
IntraBrokerGoalViolations intraBrokerGoalViolations = (IntraBrokerGoalViolations) _anomalyState.anomaly(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ecojan, I think what zorn suggested makes more sense. Do you have an example that merging them would cause confusions?
* @param goalViolations Goal violations to check whether there are unfixable goals. | ||
* @return True if the given goal violations contain unfixable goals, false otherwise. | ||
*/ | ||
public static boolean hasUnfixableGoals(IntraBrokerGoalViolations goalViolations) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that better to change the input param to type AbstractGoalViolations? That way this method can help to check both interBrokerGoals and intraBrokerGoals
@@ -177,10 +64,10 @@ public void run() { | |||
} | |||
|
|||
Set<Integer> excludedBrokersForLeadership = _excludeRecentlyDemotedBrokers ? executorState.recentlyDemotedBrokers() | |||
: Collections.emptySet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary change
@@ -209,7 +96,7 @@ public void run() { | |||
_lastCheckedModelGeneration = clusterModel.generation(); | |||
} | |||
newModelNeeded = optimizeForGoal(clusterModel, goal, goalViolations, excludedBrokersForLeadership, excludedBrokersForReplicaMove, | |||
checkPartitionsWithRFGreaterThanNumRacks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary change.
} | ||
|
||
@Override | ||
public void run() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method looks like highly duplicated with the GoalViolationDetector. I think we should avoid it. E.g. maybe extract the common logic into the parent class.
boolean checkPartitionsWithRFGreaterThanNumRacks) | ||
throws KafkaCruiseControlException { | ||
if (clusterModel.topics().isEmpty()) { | ||
LOG.info("Skipping goal violation detection because the cluster model does not have any topic."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: skip intra broker goal violation detector.
} | ||
} | ||
|
||
protected boolean optimizeForGoal(ClusterModel clusterModel, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we not able to use the method in BaseGoalViolationDetector?
|
||
protected boolean optimizeForGoal(ClusterModel clusterModel, | ||
Goal goal, | ||
GoalViolations goalViolations, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we change the type to AbstractGoalViolations, maybe we can reuse this method for both GoalViolations and IntraBrokerGoalViolations?
Took a first pass and left some comments. Besides, I have some general comments for the commits so far:
|
Thank you for the review @CCisGG , will go through the proposed changes and come back this week. |
Hi @ecojan, thanks for your work on this very useful feature. Do you think that you'll have the opportunity to come back to this PR in the future? |
Hi @tkornai |
This PR resolves #1264 by enabling the self-healing for intra-broker goals. This however does not account for race conditions between Cluster level Goals and will only trigger after said goals are finished.
The scope of this PR is to allow self-healing on IntraBroker goals for now (as future work might make this obsolete as intra-broker goals and regular goals might be made to work together).
New configuration:
New Anomaly Type: INTRA_BROKER_GOAL_VIOLATION
A new type of Anomaly that needs to be treated differently from the regular GOAL_VIOLATIONS.
When configuring for the rebalance parameters for this violation, we are skipping hard goals check and ignoring proposalsCache.
We are skipping hard goals check because intra-broker goals may not be hard goals (at the current given time).
We are ignoring proposalsCache because the proposals model takes into account all given default goals. If we add the intraBrokersGoals this will not work properly as currently, disk-granularity goals do not work with broker-granularity goals.
New Anomaly detectors: IntraBrokerGoalViolationDetector
This anomaly detector is only looking for the anomaly.detection.intra.broker.goal and unlike the GoalViolationDetector, it's using a cluster model that is generating replica placement on disks as well.
This anomaly detector mainly functions for INTRA_BROKER_GOAL_VIOLATIONS.
What is missing from this feature