Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BrokerSetAwareGoal #1809

Merged
merged 7 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@

package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.common.Utils;
import com.linkedin.kafka.cruisecontrol.config.ReplicaToBrokerSetMappingPolicy;
import com.linkedin.kafka.cruisecontrol.exception.BrokerSetResolutionException;
import com.linkedin.kafka.cruisecontrol.exception.ReplicaToBrokerSetMappingException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
Expand Down Expand Up @@ -79,6 +82,8 @@ public class BrokerSetAwareGoal extends AbstractGoal {
private BrokerSetResolutionHelper _brokerSetResolutionHelper;
private Map<String, Set<Broker>> _brokersByBrokerSet;
private ReplicaToBrokerSetMappingPolicy _replicaToBrokerSetMappingPolicy;
private Set<String> _excludedTopics;
private Set<String> _mustHaveTopicLeadersPerBroker;

/**
* Constructor for Broker Set Aware Goal.
Expand Down Expand Up @@ -133,7 +138,11 @@ protected void initGoalState(ClusterModel clusterModel, OptimizationOptions opti
// Whether the {@link com.linkedin.kafka.cruisecontrol.model.SortedReplicas} tracks only leader replicas or all replicas.
boolean tracksOnlyLeaderReplicas = false;

Set<String> excludedTopics = optimizationOptions.excludedTopics();
_mustHaveTopicLeadersPerBroker = Collections.unmodifiableSet(
Utils.getTopicNamesMatchedWithPattern(_balancingConstraint.topicsWithMinLeadersPerBrokerPattern(), clusterModel::topics));
_excludedTopics = Collections.unmodifiableSet(
Stream.of(_mustHaveTopicLeadersPerBroker, optimizationOptions.excludedTopics()).flatMap(Set::stream).collect(Collectors.toSet()));

/*
Replicas using selection/priority/score functions can be filtered/ordered/scored to be picked for movement during balancing act.
If the Optimization Options requires movement of only immigrant replicas, then we pick immigrant replicas.
Expand All @@ -143,8 +152,8 @@ protected void initGoalState(ClusterModel clusterModel, OptimizationOptions opti
*/
new SortedReplicasHelper().maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectImmigrants(),
optimizationOptions.onlyMoveImmigrantReplicas())
.maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectReplicasBasedOnExcludedTopics(excludedTopics),
!excludedTopics.isEmpty())
.maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectReplicasBasedOnExcludedTopics(_excludedTopics),
!_excludedTopics.isEmpty())
.trackSortedReplicasFor(replicaSortName(this, tracksReplicasInReverseOrder, tracksOnlyLeaderReplicas),
clusterModel);

Expand Down Expand Up @@ -179,10 +188,9 @@ protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions op
private void ensureBrokerSetAware(ClusterModel clusterModel, OptimizationOptions optimizationOptions)
throws OptimizationFailureException {
// Sanity check to confirm that the final distribution is brokerSet aware.
Set<String> excludedTopics = optimizationOptions.excludedTopics();

for (Map.Entry<String, List<Partition>> partitionsByTopic : clusterModel.getPartitionsByTopic().entrySet()) {
if (!excludedTopics.contains(partitionsByTopic.getKey())) {
String topicName = partitionsByTopic.getKey();
if (!_excludedTopics.contains(topicName)) {
List<Partition> partitions = partitionsByTopic.getValue();
Set<Broker> allBrokersForTopic = partitions.stream()
.map(partition -> partition.partitionBrokers())
Expand All @@ -191,7 +199,7 @@ private void ensureBrokerSetAware(ClusterModel clusterModel, OptimizationOptions
// Checks if a topic's brokers do not all live in a single brokerSet
if (_brokersByBrokerSet.values().stream().noneMatch(brokerSetBrokers -> brokerSetBrokers.containsAll(allBrokersForTopic))) {
throw new OptimizationFailureException(
String.format("[%s] Topic %s is not brokerSet-aware. brokers (%s).", name(), partitionsByTopic.getKey(), allBrokersForTopic));
String.format("[%s] Topic %s is not brokerSet-aware. brokers (%s).", name(), topicName, allBrokersForTopic));
}
}
}
Expand All @@ -217,7 +225,9 @@ protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<
_replicaToBrokerSetMappingPolicy.brokerSetIdForReplica(replica, clusterModel, _brokerSetResolutionHelper);
mohitpali marked this conversation as resolved.
Show resolved Hide resolved

// If the re-balancing action is performed within the broker set boundary, we are good
if (broker.isAlive() && eligibleBrokerSetIdForReplica.equals(currentBrokerSetId)) {
// Also, if the topics that are configured to have leaders on each broker within the cluster, then those topics won't be rebalanced
if ((broker.isAlive() && eligibleBrokerSetIdForReplica.equals(currentBrokerSetId)) || _mustHaveTopicLeadersPerBroker.contains(
mohitpali marked this conversation as resolved.
Show resolved Hide resolved
replica.topicPartition().topic())) {
continue;
}
// If the brokerSet awareness condition is violated. Move replica to an eligible broker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.linkedin.kafka.cruisecontrol.analyzer;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.BrokerSetAwareGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal;
Expand Down Expand Up @@ -37,6 +38,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import org.junit.Test;
Expand Down Expand Up @@ -97,7 +99,8 @@ public static Collection<Object[]> data() {
Collection<Object[]> p = new ArrayList<>();

// Sorted by priority.
List<String> goalNameByPriority = Arrays.asList(RackAwareGoal.class.getName(),
List<String> goalNameByPriority = Arrays.asList(BrokerSetAwareGoal.class.getName(),
RackAwareGoal.class.getName(),
RackAwareDistributionGoal.class.getName(),
MinTopicLeadersPerBrokerGoal.class.getName(),
ReplicaCapacityGoal.class.getName(),
Expand All @@ -119,6 +122,9 @@ public static Collection<Object[]> data() {

Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
props.setProperty(AnalyzerConfig.MAX_REPLICAS_PER_BROKER_CONFIG, Long.toString(2000L));
String brokerSetsDataFile = Objects.requireNonNull(KafkaCruiseControlUnitTestUtils.class.getClassLoader().getResource(
TestConstants.BROKER_SET_RESOLVER_FILE_3)).getFile();
props.setProperty(AnalyzerConfig.BROKER_SET_CONFIG_FILE_CONFIG, brokerSetsDataFile);
BalancingConstraint constraint = new BalancingConstraint(new KafkaCruiseControlConfig(props));
constraint.setResourceBalancePercentage(TestConstants.LOW_BALANCE_PERCENTAGE);
constraint.setCapacityThreshold(TestConstants.MEDIUM_CAPACITY_THRESHOLD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public enum Distribution {
public static final String DEFAULT_BROKER_SET_RESOLVER_FILE = "testBrokerSets.json";
public static final String BROKER_SET_RESOLVER_FILE_1 = "testBrokerSets1.json";
public static final String BROKER_SET_RESOLVER_FILE_2 = "testBrokerSets2.json";
public static final String BROKER_SET_RESOLVER_FILE_3 = "testBrokerSets3.json";

// Topic replication factor anomaly test.
public static final TopicReplicationFactorAnomalyEntry TOPIC_REPLICATION_FACTOR_ANOMALY_ENTRY =
Expand Down
62 changes: 62 additions & 0 deletions cruise-control/src/test/resources/testBrokerSets3.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{
"brokerSets": [
{
"brokerSetId": "Blue",
"brokerIds": [
0,
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
12,
13,
14,
15,
16,
17,
18,
19
],
"doc": "This block contains broker ids that belong to the Blue BrokerSet"
},
{
"brokerSetId": "Green",
"brokerIds": [
20,
21,
22,
23,
24,
25,
26,
27,
28,
29
],
"doc": "This block contains broker ids that belong to the Green BrokerSet"
},
{
"brokerSetId": "Red",
"brokerIds": [
30,
31,
32,
33,
34,
35,
36,
37,
38,
39
],
"doc": "This block contains broker ids that belong to the Red BrokerSet"
}
]
}