Skip to content

Commit

Permalink
Added the suggestion by Paolo, Jakub and Paul
Browse files Browse the repository at this point in the history
Signed-off-by: ShubhamRwt <[email protected]>
  • Loading branch information
ShubhamRwt committed Oct 16, 2024
1 parent 8a60248 commit f46d8f5
Show file tree
Hide file tree
Showing 18 changed files with 186 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.strimzi.api.kafka.model.common.Constants;
import io.strimzi.api.kafka.model.common.UnknownPropertyPreserving;
import io.strimzi.crdgenerator.annotations.Description;
import io.strimzi.crdgenerator.annotations.MinimumItems;
import io.sundr.builder.annotations.Buildable;
import lombok.EqualsAndHashCode;
import lombok.ToString;
Expand All @@ -19,7 +20,7 @@
import java.util.Map;

/**
* Configures the brokerId and VolumeId for the remove-disks endpoint for Cruise Control
* Configures the broker and Volume ID's for the remove-disks endpoint for Cruise Control
*/
@Buildable(
editableEnabled = false,
Expand All @@ -35,7 +36,7 @@ public class BrokerAndVolumeIds implements UnknownPropertyPreserving {
private List<Integer> volumeIds;
private Map<String, Object> additionalProperties;

@Description("Id of the broker which contains the disk from which you want to move the the partition replicas from")
@Description("ID of the broker that contains the disk from which you want to move the partition replicas.")
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("brokerId")
public Integer getBrokerId() {
Expand All @@ -46,9 +47,10 @@ public void setBrokerId(Integer brokerId) {
this.brokerId = brokerId;
}

@Description("Ids of the disk from which the partition replicas need to be moved")
@Description("IDs of the disks from which the partition replicas need to be moved.")
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("volumeIds")
@MinimumItems(1)
public List<Integer> getVolumeIds() {
return volumeIds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.strimzi.api.kafka.model.common.Spec;
import io.strimzi.crdgenerator.annotations.Description;
import io.strimzi.crdgenerator.annotations.Minimum;
import io.strimzi.crdgenerator.annotations.MinimumItems;
import io.sundr.builder.annotations.Buildable;
import lombok.EqualsAndHashCode;
import lombok.ToString;
Expand All @@ -22,26 +23,15 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({ "mode", "brokers", "moveReplicasOffVolumes", "goals", "skipHardGoalCheck", "rebalanceDisk", "excludedTopics", "concurrentPartitionMovementsPerBroker",
"concurrentIntraBrokerPartitionMovements", "concurrentLeaderMovements", "replicationThrottle", "replicaMovementStrategies" })
@JsonPropertyOrder({ "mode", "brokers", "goals", "skipHardGoalCheck", "rebalanceDisk", "excludedTopics", "concurrentPartitionMovementsPerBroker",
"concurrentIntraBrokerPartitionMovements", "concurrentLeaderMovements", "replicationThrottle", "replicaMovementStrategies", "moveReplicasOffVolumes" })
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class KafkaRebalanceSpec extends Spec {
// rebalancing modes
private KafkaRebalanceMode mode = KafkaRebalanceMode.FULL;
private List<Integer> brokers;

@Description("List of the brokers and the volumes corresponding to them whose replicas needs to be moved")
public List<BrokerAndVolumeIds> getMoveReplicasOffVolumes() {
return moveReplicasOffVolumes;
}

public void setMoveReplicasOffVolumes(List<BrokerAndVolumeIds> moveReplicasOffVolumes) {
this.moveReplicasOffVolumes = moveReplicasOffVolumes;
}

private List<BrokerAndVolumeIds> moveReplicasOffVolumes;

// Optimization goal configurations
private List<String> goals;
private boolean skipHardGoalCheck;
Expand All @@ -56,13 +46,15 @@ public void setMoveReplicasOffVolumes(List<BrokerAndVolumeIds> moveReplicasOffVo
private int concurrentLeaderMovements;
private long replicationThrottle;
private List<String> replicaMovementStrategies;
private List<BrokerAndVolumeIds> moveReplicasOffVolumes;

@Description("Mode to run the rebalancing. " +
"The supported modes are `full`, `add-brokers`, `remove-brokers`.\n" +
"If not specified, the `full` mode is used by default. \n\n" +
"* `full` mode runs the rebalancing across all the brokers in the cluster.\n" +
"* `add-brokers` mode can be used after scaling up the cluster to move some replicas to the newly added brokers.\n" +
"* `remove-brokers` mode can be used before scaling down the cluster to move replicas out of the brokers to be removed.\n")
"* `remove-brokers` mode can be used before scaling down the cluster to move replicas out of the brokers to be removed.\n" +
"* `remove-disks` mode can be used to move data across the volumes within the same broker\n")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public KafkaRebalanceMode getMode() {
return mode;
Expand Down Expand Up @@ -177,4 +169,14 @@ public List<String> getReplicaMovementStrategies() {
public void setReplicaMovementStrategies(List<String> replicaMovementStrategies) {
this.replicaMovementStrategies = replicaMovementStrategies;
}

@Description("List of brokers and their corresponding volumes from which replicas need to be moved.")
@MinimumItems(1)
public List<BrokerAndVolumeIds> getMoveReplicasOffVolumes() {
return moveReplicasOffVolumes;
}

public void setMoveReplicasOffVolumes(List<BrokerAndVolumeIds> moveReplicasOffVolumes) {
this.moveReplicasOffVolumes = moveReplicasOffVolumes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ void testKafkaRebalanceRemoveDisks() {
createDeleteCustomResource("KafkaRebalance-remove-disks.yaml");
}

@Test
void testKafkaRebalanceRemoveDisksWithEmptyVolumes() {
Throwable exception = assertThrows(
KubernetesClientException.class,
() -> createDeleteCustomResource("KafkaRebalance-remove-disks-with-empty-volumes.yaml"));

assertThat(exception.getMessage(), containsString("spec.moveReplicasOffVolumes[0].volumeIds: Invalid value: 0: spec.moveReplicasOffVolumes[0].volumeIds in body should have at least 1 items."));
}

@Test
void testKafkaRebalanceRemoveDisksWithEmptyBrokerAndVolumes() {
Throwable exception = assertThrows(
KubernetesClientException.class,
() -> createDeleteCustomResource("KafkaRebalance-remove-disks-with-empty-broker-and-volumes.yaml"));

assertThat(exception.getMessage(), containsString("spec.moveReplicasOffVolumes: Invalid value: 0: spec.moveReplicasOffVolumes in body should have at least 1 items."));
}

@Test
void testKafkaRebalanceWrongMode() {
Throwable exception = assertThrows(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
spec:
mode: remove-disks
moveReplicasOffVolumes: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
spec:
mode: remove-disks
moveReplicasOffVolumes:
- brokerId: 0
volumeIds: []
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private KafkaRebalanceStatus updateStatus(KafkaRebalance kafkaRebalance,
if (brokerAndVolumeIds != null && !brokerAndVolumeIds.isEmpty()) {
((RemoveDisksOptions.RemoveDisksOptionsBuilder) rebalanceOptionsBuilder).withBrokersandVolumeIds(brokerAndVolumeIds);
} else {
throw new IllegalArgumentException("The brokers and volume id list is mandatory when using the " + mode.toValue() + " rebalancing mode");
throw new IllegalArgumentException("The brokers and volume ID's list is mandatory when using the " + mode.toValue() + " rebalancing mode");
}
break;
default:
Expand Down Expand Up @@ -494,66 +494,10 @@ protected static Map<Integer, Map<String, Object>> extractLoadParameters(JsonArr

/**
* Converts the supplied before and after broker load arrays into a map linking from broker ID integer to a map linking
* from load parameter to an array of [after]. The load parameters included in the map are dictated
* by the values in he {@link CruiseControlLoadParameters} enum.
*
* @param brokerLoadAfterArray The JSONArray of broker load JSONObjects, for after the optimization proposal is applied,
* returned by the Cruise Control rebalance endpoint.
* @return A JsonObject linking from broker ID integer to a map of load parameter to `after` array.
*/
protected static JsonObject parseLoadStats(JsonArray brokerLoadAfterArray) {

// There is no guarantee that the brokers are in the same order in both the before and after arrays.
// Therefore we need to convert them into maps indexed by broker ID so we can align them later for the comparison.
Map<Integer, Map<String, Object>> loadAfterMap = extractLoadParameters(brokerLoadAfterArray);

JsonObject brokersStats = new JsonObject();

for (Map.Entry<Integer, Map<String, Object>> loadAfterEntry : loadAfterMap.entrySet()) {
Map<String, Object> brokerAfter = loadAfterMap.get(loadAfterEntry.getKey());

JsonObject brokerStats = new JsonObject();

for (CruiseControlLoadParameters intLoadParameter : CruiseControlLoadParameters.getIntegerParameters()) {
if (brokerAfter.containsKey(intLoadParameter.getKafkaRebalanceStatusKey())) {

int intAfterStat = (int) brokerAfter.get(intLoadParameter.getKafkaRebalanceStatusKey());
JsonObject intStats = new JsonObject();
intStats.put("after", intAfterStat);

brokerStats.put(intLoadParameter.getKafkaRebalanceStatusKey(), intStats);
} else {
LOGGER.warnOp("{} information was missing from the broker after load information",
intLoadParameter.getKafkaRebalanceStatusKey());
}
}

for (CruiseControlLoadParameters doubleLoadParameter : CruiseControlLoadParameters.getDoubleParameters()) {

if (brokerAfter.containsKey(doubleLoadParameter.getKafkaRebalanceStatusKey())) {

double doubleAfterStat = (double) brokerAfter.get(doubleLoadParameter.getKafkaRebalanceStatusKey());
JsonObject doubleStats = new JsonObject();
doubleStats.put("after", doubleAfterStat);

brokerStats.put(doubleLoadParameter.getKafkaRebalanceStatusKey(), doubleStats);
} else {
LOGGER.warnOp("{} information was missing from the broker after load information",
doubleLoadParameter.getKafkaRebalanceStatusKey());
}

}

brokersStats.put(String.valueOf(loadAfterEntry.getKey()), brokerStats);
}

return brokersStats;
}

/**
* Converts the supplied before and after broker load arrays into a map linking from broker ID integer to a map linking
* from load parameter to an array of [before, after, difference]. The load parameters included in the map are dictated
* by the values in he {@link CruiseControlLoadParameters} enum.
* from load parameter to an array of [before, after, difference] if verbose was enabled with the request
* In case verbose was disabled, we only convert the after load array into map linking from broker ID integer to a map linking
* from load parameter to an array of [after].
* The load parameters included in the map are dictated by the values in he {@link CruiseControlLoadParameters} enum.
*
* @param brokerLoadBeforeArray The JSONArray of broker load JSONObjects, for before the optimization proposal is applied,
* returned by the Cruise Control rebalance endpoint.
Expand All @@ -565,24 +509,36 @@ protected static JsonObject parseLoadStats(JsonArray brokerLoadBeforeArray, Json

// There is no guarantee that the brokers are in the same order in both the before and after arrays.
// Therefore we need to convert them into maps indexed by broker ID so we can align them later for the comparison.
Map<Integer, Map<String, Object>> loadBeforeMap = extractLoadParameters(brokerLoadBeforeArray);

Map<Integer, Map<String, Object>> loadBeforeMap = new HashMap<>();

if (brokerLoadBeforeArray != null) {
loadBeforeMap = extractLoadParameters(brokerLoadBeforeArray);
}

Map<Integer, Map<String, Object>> loadAfterMap = extractLoadParameters(brokerLoadAfterArray);

if (loadBeforeMap.size() != loadAfterMap.size()) {
if (!loadBeforeMap.isEmpty() && loadBeforeMap.size() != loadAfterMap.size()) {
throw new IllegalArgumentException("Broker data was missing from the load before/after information");
}

JsonObject brokersStats = new JsonObject();

for (Map.Entry<Integer, Map<String, Object>> loadBeforeEntry : loadBeforeMap.entrySet()) {
Map<String, Object> brokerBefore = loadBeforeEntry.getValue();
Map<String, Object> brokerAfter = loadAfterMap.get(loadBeforeEntry.getKey());
for (Map.Entry<Integer, Map<String, Object>> loadAfterEntry : loadAfterMap.entrySet()) {

Map<String, Object> brokerBefore = new HashMap<>();

if (!loadBeforeMap.isEmpty()) {
brokerBefore = loadBeforeMap.get(loadAfterEntry.getKey());
}

Map<String, Object> brokerAfter = loadAfterEntry.getValue();

JsonObject brokerStats = new JsonObject();

for (CruiseControlLoadParameters intLoadParameter : CruiseControlLoadParameters.getIntegerParameters()) {

if (brokerBefore.containsKey(intLoadParameter.getKafkaRebalanceStatusKey()) &&
if (!brokerBefore.isEmpty() && brokerBefore.containsKey(intLoadParameter.getKafkaRebalanceStatusKey()) &&
brokerAfter.containsKey(intLoadParameter.getKafkaRebalanceStatusKey())) {

int intBeforeStat = (int) brokerBefore.get(intLoadParameter.getKafkaRebalanceStatusKey());
Expand All @@ -595,6 +551,13 @@ protected static JsonObject parseLoadStats(JsonArray brokerLoadBeforeArray, Json
intStats.put("after", intAfterStat);
intStats.put("diff", intDiff);

brokerStats.put(intLoadParameter.getKafkaRebalanceStatusKey(), intStats);
} else if (brokerBefore.isEmpty() &&
brokerAfter.containsKey(intLoadParameter.getKafkaRebalanceStatusKey())) {
int intAfterStat = (int) brokerAfter.get(intLoadParameter.getKafkaRebalanceStatusKey());
JsonObject intStats = new JsonObject();
intStats.put("after", intAfterStat);

brokerStats.put(intLoadParameter.getKafkaRebalanceStatusKey(), intStats);
} else {
LOGGER.warnOp("{} information was missing from the broker before/after load information",
Expand All @@ -616,15 +579,22 @@ protected static JsonObject parseLoadStats(JsonArray brokerLoadBeforeArray, Json
doubleStats.put("after", doubleAfterStat);
doubleStats.put("diff", doubleDiff);

brokerStats.put(doubleLoadParameter.getKafkaRebalanceStatusKey(), doubleStats);
} else if (brokerBefore.isEmpty() &&
brokerAfter.containsKey(doubleLoadParameter.getKafkaRebalanceStatusKey())) {
double doubleAfterStat = (double) brokerAfter.get(doubleLoadParameter.getKafkaRebalanceStatusKey());
JsonObject doubleStats = new JsonObject();
doubleStats.put("after", doubleAfterStat);

brokerStats.put(doubleLoadParameter.getKafkaRebalanceStatusKey(), doubleStats);
} else {
LOGGER.warnOp("{} information was missing from the broker before/after load information",
LOGGER.warnOp("{} information was missing from the broker after load information",
doubleLoadParameter.getKafkaRebalanceStatusKey());
}

}

brokersStats.put(String.valueOf(loadBeforeEntry.getKey()), brokerStats);
brokersStats.put(String.valueOf(loadAfterEntry.getKey()), brokerStats);
}

return brokersStats;
Expand Down Expand Up @@ -669,7 +639,7 @@ protected static MapAndStatus<ConfigMap, Map<String, Object>> processOptimizatio
JsonObject beforeAndAfterBrokerLoad;
JsonArray brokerLoadBeforeOptimization;
JsonArray brokerLoadAfterOptimization;
if (!kafkaRebalance.getSpec().getMode().equals(KafkaRebalanceMode.REMOVE_DISKS) && proposalJson.containsKey(CruiseControlRebalanceKeys.LOAD_BEFORE_OPTIMIZATION.getKey()) &&
if (proposalJson.containsKey(CruiseControlRebalanceKeys.LOAD_BEFORE_OPTIMIZATION.getKey()) &&
proposalJson.containsKey(CruiseControlRebalanceKeys.LOAD_AFTER_OPTIMIZATION.getKey())) {
brokerLoadBeforeOptimization = proposalJson
.getJsonObject(CruiseControlRebalanceKeys.LOAD_BEFORE_OPTIMIZATION.getKey())
Expand All @@ -678,12 +648,12 @@ protected static MapAndStatus<ConfigMap, Map<String, Object>> processOptimizatio
.getJsonObject(CruiseControlRebalanceKeys.LOAD_AFTER_OPTIMIZATION.getKey())
.getJsonArray(CruiseControlRebalanceKeys.BROKERS.getKey());
beforeAndAfterBrokerLoad = parseLoadStats(brokerLoadBeforeOptimization, brokerLoadAfterOptimization);
} else if (kafkaRebalance.getSpec().getMode().equals(KafkaRebalanceMode.REMOVE_DISKS) &&
} else if (!proposalJson.containsKey(CruiseControlRebalanceKeys.LOAD_BEFORE_OPTIMIZATION.getKey()) &&
proposalJson.containsKey(CruiseControlRebalanceKeys.LOAD_AFTER_OPTIMIZATION.getKey())) {
brokerLoadAfterOptimization = proposalJson
.getJsonObject(CruiseControlRebalanceKeys.LOAD_AFTER_OPTIMIZATION.getKey())
.getJsonArray(CruiseControlRebalanceKeys.BROKERS.getKey());
beforeAndAfterBrokerLoad = parseLoadStats(brokerLoadAfterOptimization);
beforeAndAfterBrokerLoad = parseLoadStats(null, brokerLoadAfterOptimization);
} else {
throw new IllegalArgumentException("The rebalance optimization proposal returned by Cruise Control did not contain broker load information");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public interface CruiseControlApi {
Future<CruiseControlRebalanceResponse> removeBroker(Reconciliation reconciliation, String host, int port, RemoveBrokerOptions options, String userTaskId);

/**
* Send a request to the Cruise Control server to perform a cluster rebalance when moving replicas off volumes of a broker.
* This method allows to move replicas from the volumes of a broker and move to the other volumes in the same broker.
* Send a request to the Cruise Control server to perform a cluster rebalance when moving replicas off a broker's volumes.
* This method allows moving replicas from a broker's volumes to other volumes within the same broker.
*
* @param reconciliation The reconciliation marker
* @param host The address of the Cruise Control server.
Expand Down
Loading

0 comments on commit f46d8f5

Please sign in to comment.