Skip to content

Commit

Permalink
Add refactoring around replicas change handling (#9821)
Browse files Browse the repository at this point in the history
Signed-off-by: Federico Valeri <[email protected]>
  • Loading branch information
fvaleri authored Mar 15, 2024
1 parent 3f9fe6f commit 2100985
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -476,54 +476,57 @@ private void updateInternal(List<ReconcilableTopic> topics) {
var mayNeedUpdate = partitionedByPaused.get(false);
metrics.reconciliationsCounter(namespace).increment(mayNeedUpdate.size());
var addedFinalizer = addOrRemoveFinalizer(useFinalizer, mayNeedUpdate);

var currentStatesOrError = describeTopic(addedFinalizer);

createMissingTopics(results, currentStatesOrError);


// figure out necessary updates
createMissingTopics(results, currentStatesOrError);
List<Pair<ReconcilableTopic, Collection<AlterConfigOp>>> someAlterConfigs = configChanges(results, currentStatesOrError);
List<Pair<ReconcilableTopic, NewPartitions>> someCreatePartitions = partitionChanges(results, currentStatesOrError);

// execute those updates
var alterConfigsResults = alterConfigs(someAlterConfigs);
var createPartitionsResults = createPartitions(someCreatePartitions);
var findDifferentRfResults = findDifferentRf(currentStatesOrError);

// Cruise Control integration
maybeCheckReplicasChanges(topics, findDifferentRfResults, results);
var checkReplicasChangesResults = checkReplicasChanges(topics, currentStatesOrError);

accumulateResults(topics, results, alterConfigsResults, createPartitionsResults, findDifferentRfResults);
// update statuses
accumulateResults(results, alterConfigsResults, createPartitionsResults, checkReplicasChangesResults);
updateStatuses(results);
remainingAfterDeletions.forEach(rt -> stopReconciliationTimer(rt, metrics, namespace));

LOGGER.traceOp("Reconciled batch of {} KafkaTopics", results.size());
}

/**
* Check replicas changes when CruiseControl integration is enabled.
* <br/><br/>
* Check topic replicas changes.
*
* It runs the following operations in order:
* <p>If Cruise Control integration is disabled, it simply returns an error for each change.</p>
*
* <p>
* If Cruise Control integration is enabled, it runs the following operations in order:
* <ol>
* <li>Request new and pending changes</li>
* <li>Check the state of ongoing changes</li>
* <li>Complete pending but completed changes (*)</li>
* </ol>
*
* (*) A pending change needs to be completed by the reconciliation when it is unknown to Cruise Control
* due to a restart, but the task actually completed successfully (i.e. the new replication factor
* (*) A pending change needs to be completed by the reconciliation when it is unknown to Cruise Control
* due to a restart, but the task actually completed successfully (i.e. the new replication factor
* was applied), or when the user reverts an invalid replicas change to the previous value.
*
* @param reconcilableTopics Reconcilable topics
* @param differentRfResults Topics with different RF
* @param results Topics with updated status
* <p>
*
* @param reconcilableTopics Reconcilable topics from Kube
* @param currentStatesOrError Current topic state or error from Kafka
* @return Reconcilable topics partitioned by error
*/
/* test */ void maybeCheckReplicasChanges(List<ReconcilableTopic> reconcilableTopics,
PartitionedByError<ReconcilableTopic, CurrentState> differentRfResults,
Map<ReconcilableTopic, Either<TopicOperatorException, Object>> results) {
/* test */ PartitionedByError<ReconcilableTopic, Void> checkReplicasChanges(List<ReconcilableTopic> reconcilableTopics,
PartitionedByError<ReconcilableTopic, CurrentState> currentStatesOrError) {
var differentRfResults = findDifferentRf(currentStatesOrError);
Stream<Pair<ReconcilableTopic, Either<TopicOperatorException, Void>>> errorStream = differentRfResults.errors()
.map(pair -> pair(pair.getKey(), Either.ofLeft(pair.getValue())));

Stream<Pair<ReconcilableTopic, Either<TopicOperatorException, Void>>> okStream;

if (config.cruiseControlEnabled()) {
var results = new ArrayList<ReconcilableTopic>();
var differentRfMap = differentRfResults.ok().map(Pair::getKey).collect(Collectors.toList())
.stream().collect(Collectors.toMap(ReconcilableTopic::topicName, Function.identity()));

Expand All @@ -532,10 +535,11 @@ private void updateInternal(List<ReconcilableTopic> topics) {
.filter(rt -> !isPendingReplicasChange(rt.kt()) && !isOngoingReplicasChange(rt.kt()))
.collect(Collectors.toList());
pending.addAll(brandNew);
requestPendingReplicasChanges(pending, results);
warnTooLargeMinIsr(pending);
results.addAll(replicasChangeHandler.requestPendingChanges(pending));

var ongoing = topicsMatching(reconcilableTopics, this::isOngoingReplicasChange);
checkOngoingReplicasChanges(ongoing, results);
results.addAll(replicasChangeHandler.requestOngoingChanges(ongoing));

var completed = pending.stream()
.filter(rt -> !differentRfMap.containsKey(rt.topicName()) && !isFailedReplicasChange(rt.kt()))
Expand All @@ -545,26 +549,35 @@ private void updateInternal(List<ReconcilableTopic> topics) {
.collect(Collectors.toList());
completed.addAll(reverted);
LOGGER.debugOp("Pending but completed replicas changes, Topics: {}", topicNames(completed));
completeReplicasChanges(completed, results);
completed.forEach(reconcilableTopic -> {
reconcilableTopic.kt().getStatus().setReplicasChange(null);
});
results.addAll(completed);

okStream = results.stream().map(reconcilableTopic -> pair(reconcilableTopic, Either.ofRight(null)));

} else {
okStream = differentRfResults.ok().map(pair -> {
var reconcilableTopic = pair.getKey();
var specPartitions = partitions(reconcilableTopic.kt());
var partitions = pair.getValue().partitionsWithDifferentRfThan(specPartitions);
return pair(reconcilableTopic, Either.ofLeft(new TopicOperatorException.NotSupported(
"Replication factor change not supported, but required for partitions " + partitions)));
});
}

return partitionedByError(Stream.concat(okStream, errorStream));
}

private void requestPendingReplicasChanges(List<ReconcilableTopic> pending, Map<ReconcilableTopic, Either<TopicOperatorException, Object>> results) {
warnTooLargeMinIsr(pending);
var pendingAndOngoing = replicasChangeHandler.requestPendingChanges(pending);
pendingAndOngoing.forEach(rt -> putResult(results, rt, Either.ofRight(null)));
}

private void checkOngoingReplicasChanges(List<ReconcilableTopic> ongoing, Map<ReconcilableTopic, Either<TopicOperatorException, Object>> results) {
var completedAndFailed = replicasChangeHandler.requestOngoingChanges(ongoing);
completedAndFailed.forEach(rt -> putResult(results, rt, Either.ofRight(null)));
}
private PartitionedByError<ReconcilableTopic, CurrentState> findDifferentRf(PartitionedByError<ReconcilableTopic, CurrentState> currentStatesOrError) {
var apparentlyDifferentRf = currentStatesOrError.ok().filter(pair -> {
var reconcilableTopic = pair.getKey();
var currentState = pair.getValue();
return reconcilableTopic.kt().getSpec().getReplicas() != null
&& currentState.uniqueReplicationFactor() != reconcilableTopic.kt().getSpec().getReplicas();
}).toList();

private void completeReplicasChanges(List<ReconcilableTopic> toReset, Map<ReconcilableTopic, Either<TopicOperatorException, Object>> results) {
toReset.forEach(reconcilableTopic -> {
reconcilableTopic.kt().getStatus().setReplicasChange(null);
putResult(results, reconcilableTopic, Either.ofRight(null));
});
return partitionedByError(filterByReassignmentTargetReplicas(apparentlyDifferentRf).stream());
}

/**
Expand Down Expand Up @@ -689,31 +702,19 @@ private void updateStatuses(Map<ReconcilableTopic, Either<TopicOperatorException
LOGGER.traceOp("Updated status of {} KafkaTopics", results.size());
}

private void accumulateResults(List<ReconcilableTopic> topics,
Map<ReconcilableTopic, Either<TopicOperatorException, Object>> results,
private void accumulateResults(Map<ReconcilableTopic, Either<TopicOperatorException, Object>> results,
PartitionedByError<ReconcilableTopic, Void> alterConfigsResults,
PartitionedByError<ReconcilableTopic, Void> createPartitionsResults,
PartitionedByError<ReconcilableTopic, CurrentState> differentRfResults) {
PartitionedByError<ReconcilableTopic, Void> replicasChangeResults) {
// add the successes to the results
alterConfigsResults.ok().forEach(pair -> putResult(results, pair.getKey(), Either.ofRight(null)));
createPartitionsResults.ok().forEach(pair -> putResult(results, pair.getKey(), Either.ofRight(null)));
replicasChangeResults.ok().forEach(pair -> putResult(results, pair.getKey(), Either.ofRight(null)));

// add to errors (potentially overwriting some successes, e.g. if configs succeeded but partitions failed)
alterConfigsResults.errors().forEach(pair -> putResult(results, pair.getKey(), Either.ofLeft(pair.getValue())));
createPartitionsResults.errors().forEach(pair -> putResult(results, pair.getKey(), Either.ofLeft(pair.getValue())));

// handle topics with different replication factor value
LOGGER.traceOp("Reconciling replicas changes");
differentRfResults.errors().forEach(pair -> putResult(results, pair.getKey(), Either.ofLeft(pair.getValue())));
if (!config.cruiseControlEnabled()) {
differentRfResults.ok().forEach(pair -> {
var reconcilableTopic = pair.getKey();
var specPartitions = partitions(reconcilableTopic.kt());
var partitions = pair.getValue().partitionsWithDifferentRfThan(specPartitions);
putResult(results, reconcilableTopic, Either.ofLeft(new TopicOperatorException.NotSupported(
"Replication factor change not supported, but required for partitions " + partitions)));
});
}
replicasChangeResults.errors().forEach(pair -> putResult(results, pair.getKey(), Either.ofLeft(pair.getValue())));
}

private static List<Pair<ReconcilableTopic, Collection<AlterConfigOp>>> configChanges(Map<ReconcilableTopic, Either<TopicOperatorException, Object>> results, PartitionedByError<ReconcilableTopic, CurrentState> currentStatesOrError) {
Expand Down Expand Up @@ -754,15 +755,14 @@ private List<Pair<ReconcilableTopic, Either<TopicOperatorException, CurrentState
if (apparentlyDifferentRfTopics.isEmpty()) {
return List.of();
}
Set<TopicPartition> apparentDifferentRfPartitions = apparentlyDifferentRfTopics.stream().flatMap(pair -> {
return pair.getValue().topicDescription.partitions().stream()
.filter(pi -> {
// includes only the partitions of the topic with a RF that mismatches the desired RF
var desiredRf = pair.getKey().kt().getSpec().getReplicas();
return desiredRf != pi.replicas().size();
})
.map(pi -> new TopicPartition(pair.getKey().topicName(), pi.partition()));
}).collect(Collectors.toSet());
Set<TopicPartition> apparentDifferentRfPartitions = apparentlyDifferentRfTopics.stream()
.flatMap(pair -> pair.getValue().topicDescription().partitions().stream()
.filter(pi -> {
// includes only the partitions of the topic with a RF that mismatches the desired RF
var desiredRf = pair.getKey().kt().getSpec().getReplicas();
return desiredRf != pi.replicas().size();
})
.map(pi -> new TopicPartition(pair.getKey().topicName(), pi.partition()))).collect(Collectors.toSet());

Map<TopicPartition, PartitionReassignment> reassignments;
LOGGER.traceOp("Admin.listPartitionReassignments({})", apparentDifferentRfPartitions);
Expand Down Expand Up @@ -793,7 +793,7 @@ private List<Pair<ReconcilableTopic, Either<TopicOperatorException, CurrentState
}));

return apparentlyDifferentRfTopics.stream().filter(pair -> {
boolean b = pair.getValue.topicDescription.partitions().stream().anyMatch(pi -> {
boolean b = pair.getValue.topicDescription().partitions().stream().anyMatch(pi -> {
TopicPartition tp = new TopicPartition(pair.getKey.topicName(), pi.partition());
Integer targetRf = partitionToTargetRf.get(tp);
Integer desiredRf = pair.getKey.kt().getSpec().getReplicas();
Expand Down Expand Up @@ -870,17 +870,6 @@ private PartitionedByError<ReconcilableTopic, Void> createPartitions(List<Pair<R
return partitionedByError(entryStream);
}

private PartitionedByError<ReconcilableTopic, CurrentState> findDifferentRf(PartitionedByError<ReconcilableTopic, CurrentState> currentStatesOrError) {
var apparentlyDifferentRf = currentStatesOrError.ok().filter(pair -> {
var reconcilableTopic = pair.getKey();
var currentState = pair.getValue();
return reconcilableTopic.kt().getSpec().getReplicas() != null
&& currentState.uniqueReplicationFactor() != reconcilableTopic.kt().getSpec().getReplicas();
}).toList();

return partitionedByError(filterByReassignmentTargetReplicas(apparentlyDifferentRf).stream());
}

private static ConfigResource topicConfigResource(String tn) {
return new ConfigResource(ConfigResource.Type.TOPIC, tn);
}
Expand Down
Loading

0 comments on commit 2100985

Please sign in to comment.