Skip to content

Commit

Permalink
Add BrokerSetAwareGoal
Browse files Browse the repository at this point in the history
  • Loading branch information
Mohit Paliwal committed May 17, 2022
1 parent a8a190f commit 0c5b037
Show file tree
Hide file tree
Showing 24 changed files with 1,970 additions and 103 deletions.
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
<suppressions>
<suppress checks="RegexpHeader" files="com[\\/]linkedin[\\/]kafka[\\/]cruisecontrol[\\/]testutils"/>
<suppress checks="MemberName" files="BrokerCapacityConfigFileResolver.java"/>
<suppress checks="MemberName" files="BrokerSetFileResolver.java"/>
</suppressions>
9 changes: 9 additions & 0 deletions config/brokerSets.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"brokerSets":[
{
"brokerSetId": "brokerSet0",
"brokerIds": [0],
"doc": "This block contains broker ids that belong to BrokerSet 0"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package com.linkedin.kafka.cruisecontrol;

import com.linkedin.cruisecontrol.common.utils.Utils;
import com.linkedin.cruisecontrol.exception.CruiseControlException;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal;
Expand Down Expand Up @@ -165,6 +167,28 @@ public static String getRequiredConfig(Map<String, ?> configs, String configName
return value;
}

/**
* Get a configuration class's instance and throw exception if the configuration was not provided
* or if the class could not be constructed.
*
* @param configs the config map.
* @param configName the config to get.
* @param t super class of the class to be instantiated
* @param <T> The instance type.
* @return The configuration class instance.
*/
public static <T> T getRequiredConfigInstance(Map<String, ?> configs, String configName, Class<T> t) {
String value = (String) configs.get(configName);
if (value == null || value.isEmpty()) {
throw new ConfigException(String.format("Configuration %s must be provided.", configName));
}
try {
return Utils.newInstance(value, t);
} catch (ClassNotFoundException | CruiseControlException e) {
throw new ConfigException(String.format("Configuration class %s could not be loaded.", configName), e);
}
}

/**
* Creates the given topic if it does not exist.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.linkedin.kafka.cruisecontrol.analyzer;

import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.config.BrokerSetResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig;
import java.util.HashMap;
Expand Down Expand Up @@ -34,6 +35,7 @@ public class BalancingConstraint {
private final Pattern _topicsWithMinLeadersPerBrokerPattern;
private final int _minTopicLeadersPerBroker;
private final long _fastModePerBrokerMoveTimeoutMs;
private final BrokerSetResolver _brokerSetResolver;

/**
* Constructor for Balancing Constraint.
Expand Down Expand Up @@ -79,6 +81,8 @@ public BalancingConstraint(KafkaCruiseControlConfig config) {
_minTopicLeadersPerBroker = config.getInt(AnalyzerConfig.MIN_TOPIC_LEADERS_PER_BROKER_CONFIG);
// Set default value for the per broker move timeout in fast mode in milliseconds
_fastModePerBrokerMoveTimeoutMs = config.getLong(AnalyzerConfig.FAST_MODE_PER_BROKER_MOVE_TIMEOUT_MS_CONFIG);
// BrokerSet Data store class
_brokerSetResolver = config.getConfiguredInstance(AnalyzerConfig.BROKER_SET_RESOLVER_CLASS_CONFIG, BrokerSetResolver.class);
}

Properties setProps(Properties props) {
Expand Down Expand Up @@ -110,6 +114,7 @@ Properties setProps(Properties props) {
props.put(AnalyzerConfig.TOPICS_WITH_MIN_LEADERS_PER_BROKER_CONFIG, _topicsWithMinLeadersPerBrokerPattern.pattern());
props.put(AnalyzerConfig.MIN_TOPIC_LEADERS_PER_BROKER_CONFIG, Integer.toString(_minTopicLeadersPerBroker));
props.put(AnalyzerConfig.FAST_MODE_PER_BROKER_MOVE_TIMEOUT_MS_CONFIG, Long.toString(_fastModePerBrokerMoveTimeoutMs));
props.put(AnalyzerConfig.DEFAULT_BROKER_SET_RESOLVER_CLASS, _brokerSetResolver.getClass().getName());
return props;
}

Expand Down Expand Up @@ -242,6 +247,13 @@ public long fastModePerBrokerMoveTimeoutMs() {
return _fastModePerBrokerMoveTimeoutMs;
}

/**
* @return The Broker Set Resolver.
*/
public BrokerSetResolver brokerSetResolver() {
return _brokerSetResolver;
}

/**
* Set resource balance percentage for the given resource.
*
Expand Down Expand Up @@ -302,14 +314,15 @@ public String toString() {
+ "topicReplicaBalancePercentage=%.4f,topicReplicaBalanceGap=[%d,%d],"
+ "goalViolationDistributionThresholdMultiplier=%.4f,"
+ "topicsWithMinLeadersPerBrokerPattern=%s,"
+ "minTopicLeadersPerBroker=%d,fastModePerBrokerMoveTimeoutMs=%d]",
+ "minTopicLeadersPerBroker=%d,fastModePerBrokerMoveTimeoutMs=%d,"
+ "brokerSetDataStore=%s]",
_resourceBalancePercentage.get(Resource.CPU), _resourceBalancePercentage.get(Resource.DISK),
_resourceBalancePercentage.get(Resource.NW_IN), _resourceBalancePercentage.get(Resource.NW_OUT),
_capacityThreshold.get(Resource.CPU), _capacityThreshold.get(Resource.DISK),
_capacityThreshold.get(Resource.NW_IN), _capacityThreshold.get(Resource.NW_OUT),
_maxReplicasPerBroker, _replicaBalancePercentage, _leaderReplicaBalancePercentage,
_topicReplicaBalancePercentage, _topicReplicaBalanceMinGap, _topicReplicaBalanceMaxGap,
_goalViolationDistributionThresholdMultiplier, _topicsWithMinLeadersPerBrokerPattern.pattern(),
_minTopicLeadersPerBroker, _fastModePerBrokerMoveTimeoutMs);
_minTopicLeadersPerBroker, _fastModePerBrokerMoveTimeoutMs, _brokerSetResolver.getClass().getName());
}
}
Loading

0 comments on commit 0c5b037

Please sign in to comment.