Skip to content

Commit

Permalink
Fix topicId status update (#10491)
Browse files Browse the repository at this point in the history
Signed-off-by: Federico Valeri <[email protected]>
  • Loading branch information
fvaleri authored Sep 2, 2024
1 parent cf53b49 commit e7e8b11
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ public class BatchingTopicController {

// Key: topic name, Value: The KafkaTopics known to manage that topic
/* test */ final Map<String, List<KubeRef>> topics = new HashMap<>();

// topic name id map, which is updated on every reconciliation
private Map<String, String> topicNameIdMap;

private final TopicOperatorMetricsHolder metrics;
private final String namespace;
Expand Down Expand Up @@ -135,6 +138,7 @@ public class BatchingTopicController {
this.namespace = config.namespace();
this.enableAdditionalMetrics = config.enableAdditionalMetrics();
this.replicasChangeHandler = replicasChangeHandler;
this.topicNameIdMap = new HashMap<>();
}

/**
Expand Down Expand Up @@ -321,8 +325,7 @@ private PartitionedByError<ReconcilableTopic, Void> createTopics(List<Reconcilab
}
try {
values.get(reconcilableTopic.topicName()).get();
reconcilableTopic.kt().setStatus(new KafkaTopicStatusBuilder()
.withTopicId(ctr.topicId(reconcilableTopic.topicName()).get().toString()).build());
topicNameIdMap.put(reconcilableTopic.topicName(), ctr.topicId(reconcilableTopic.topicName()).get().toString());
return new Pair<>(reconcilableTopic, Either.ofRight((null)));
} catch (ExecutionException e) {
if (e.getCause() != null && e.getCause() instanceof TopicExistsException) {
Expand Down Expand Up @@ -399,6 +402,9 @@ void onUpdate(List<ReconcilableTopic> topics) throws InterruptedException {

private void updateInternal(List<ReconcilableTopic> batch) {
LOGGER.debugOp("Reconciling batch {}", batch);
Map<ReconcilableTopic, Either<TopicOperatorException, Object>> results = new HashMap<>();
topicNameIdMap.clear();

// process deletions
var partitionedByDeletion = batch.stream().filter(reconcilableTopic -> {
var kt = reconcilableTopic.kt();
Expand All @@ -423,7 +429,6 @@ private void updateInternal(List<ReconcilableTopic> batch) {
}

// process remaining
Map<ReconcilableTopic, Either<TopicOperatorException, Object>> results = new HashMap<>();
var remainingAfterDeletions = partitionedByDeletion.get(false);
var timerSamples = remainingAfterDeletions.stream().collect(
Collectors.toMap(identity(), rt -> startReconciliationTimer(metrics)));
Expand Down Expand Up @@ -889,6 +894,7 @@ private PartitionedByError<ReconcilableTopic, TopicState> describeTopic(List<Rec
ExecutionException exception = null;
try {
description = cs1.get(reconcilableTopic.topicName()).get();
topicNameIdMap.put(reconcilableTopic.topicName(), description.topicId().toString());
} catch (ExecutionException e) {
exception = e;
} catch (InterruptedException e) {
Expand Down Expand Up @@ -1225,33 +1231,39 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) {
// the observedGeneration is a marker that shows that the operator works and that it saw the last update to the resource
reconcilableTopic.kt().getStatus().setObservedGeneration(reconcilableTopic.kt().getMetadata().getGeneration());

// set or reset the topicName
// add or remove topicName
reconcilableTopic.kt().getStatus().setTopicName(
!TopicOperatorUtil.isManaged(reconcilableTopic.kt())
? null
: oldStatus != null && oldStatus.getTopicName() != null
? oldStatus.getTopicName()
: TopicOperatorUtil.topicName(reconcilableTopic.kt())
? oldStatus.getTopicName()
: TopicOperatorUtil.topicName(reconcilableTopic.kt())
);

// add or remove topicId
reconcilableTopic.kt().getStatus().setTopicId(
(!TopicOperatorUtil.isManaged(reconcilableTopic.kt()) || TopicOperatorUtil.isPaused(reconcilableTopic.kt()))
? null : topicNameIdMap.get(reconcilableTopic.topicName())
);

StatusDiff statusDiff = new StatusDiff(oldStatus, reconcilableTopic.kt().getStatus());
if (!statusDiff.isEmpty()) {
var updatedTopic = new KafkaTopicBuilder(reconcilableTopic.kt())
.editOrNewMetadata()
.withResourceVersion(null)
.endMetadata()
.withStatus(reconcilableTopic.kt().getStatus())
.build();
LOGGER.debugCr(reconcilableTopic.reconciliation(), "Updating status with {}", updatedTopic.getStatus());
var timerSample = TopicOperatorUtil.startExternalRequestTimer(metrics, enableAdditionalMetrics);
try {
var updatedTopic = new KafkaTopicBuilder(reconcilableTopic.kt())
.editOrNewMetadata()
.withResourceVersion(null)
.endMetadata()
.withStatus(reconcilableTopic.kt().getStatus())
.build();
LOGGER.debugCr(reconcilableTopic.reconciliation(), "Updating status with {}", updatedTopic.getStatus());
var timerSample = TopicOperatorUtil.startExternalRequestTimer(metrics, enableAdditionalMetrics);
var got = Crds.topicOperation(kubeClient).resource(updatedTopic).updateStatus();
TopicOperatorUtil.stopExternalRequestTimer(timerSample, metrics::updateStatusTimer, enableAdditionalMetrics, namespace);
LOGGER.traceCr(reconcilableTopic.reconciliation(), "Updated status to observedGeneration {}, resourceVersion {}",
got.getStatus().getObservedGeneration(), got.getMetadata().getResourceVersion());
} catch (Throwable e) {
LOGGER.errorOp("Status update failed: {}", e.getMessage());
}
TopicOperatorUtil.stopExternalRequestTimer(timerSample, metrics::updateStatusTimer, enableAdditionalMetrics, namespace);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -548,14 +548,50 @@ private List<KafkaTopic> createTopicsConcurrently(KafkaCluster kc, KafkaTopic...

private KafkaTopic pauseTopic(String namespace, String topicName) {
var current = Crds.topicOperation(kubernetesClient).inNamespace(namespace).withName(topicName).get();
var paused = Crds.topicOperation(kubernetesClient).resource(new KafkaTopicBuilder(current)
var kafkaTopic = Crds.topicOperation(kubernetesClient).resource(new KafkaTopicBuilder(current)
.editMetadata()
.withAnnotations(Map.of(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION, "true"))
.withAnnotations(Map.of(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION, "true"))
.endMetadata()
.build()).update();
LOGGER.info("Test paused KafkaTopic {} with resourceVersion {}",
paused.getMetadata().getName(), TopicOperatorUtil.resourceVersion(paused));
return waitUntil(paused, pausedIsTrue());
kafkaTopic.getMetadata().getName(), TopicOperatorUtil.resourceVersion(kafkaTopic));
return waitUntil(kafkaTopic, pausedIsTrue());
}

private KafkaTopic unpauseTopic(String namespace, String topicName) {
var current = Crds.topicOperation(kubernetesClient).inNamespace(namespace).withName(topicName).get();
var kafkaTopic = Crds.topicOperation(kubernetesClient).resource(new KafkaTopicBuilder(current)
.editMetadata()
.withAnnotations(Map.of(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION, "false"))
.endMetadata()
.build()).update();
LOGGER.info("Test unpaused KafkaTopic {} with resourceVersion {}",
kafkaTopic.getMetadata().getName(), TopicOperatorUtil.resourceVersion(kafkaTopic));
return waitUntil(kafkaTopic, readyIsTrue());
}

private KafkaTopic unmanageTopic(String namespace, String topicName) {
var current = Crds.topicOperation(kubernetesClient).inNamespace(namespace).withName(topicName).get();
var kafkaTopic = Crds.topicOperation(kubernetesClient).resource(new KafkaTopicBuilder(current)
.editMetadata()
.withAnnotations(Map.of(TopicOperatorUtil.MANAGED, "false"))
.endMetadata()
.build()).update();
LOGGER.info("Test unmanaged KafkaTopic {} with resourceVersion {}",
kafkaTopic.getMetadata().getName(), TopicOperatorUtil.resourceVersion(kafkaTopic));
return waitUntil(kafkaTopic, unmanagedIsTrue());
}

private KafkaTopic manageTopic(String namespace, String topicName) {
var current = Crds.topicOperation(kubernetesClient).inNamespace(namespace).withName(topicName).get();
var kafkaTopic = Crds.topicOperation(kubernetesClient).resource(new KafkaTopicBuilder(current)
.editMetadata()
.withAnnotations(Map.of(TopicOperatorUtil.MANAGED, "true"))
.endMetadata()
.build()).update();
LOGGER.info("Test managed KafkaTopic {} with resourceVersion {}",
kafkaTopic.getMetadata().getName(), TopicOperatorUtil.resourceVersion(kafkaTopic));
return waitUntil(kafkaTopic, readyIsTrue());
}

private TopicDescription awaitTopicDescription(String expectedTopicName) throws InterruptedException, ExecutionException, TimeoutException {
Expand Down Expand Up @@ -671,17 +707,12 @@ public void shouldCreateTopicInKafkaWhenKafkaTopicHasOnlyConfigs(
@ParameterizedTest
@MethodSource("unmanagedKafkaTopics")
public void shouldNotCreateTopicInKafkaWhenUnmanagedTopicCreatedInKube(
KafkaTopic kt,
KafkaTopic kafkaTopic,
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
// given

// when
var reconciled = createTopic(kafkaCluster, kt, unmanagedStatusTrue());

// then
assertNull(reconciled.getStatus().getTopicName());
assertNotExistsInKafka(TopicOperatorUtil.topicName(kt));
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
createTopic(kafkaCluster, kafkaTopic);
assertNotExistsInKafka(TopicOperatorUtil.topicName(kafkaTopic));
}

@ParameterizedTest
Expand Down Expand Up @@ -1470,11 +1501,12 @@ public void shouldFailCreationIfIllegalTopicName(
@ParameterizedTest
@MethodSource("managedKafkaTopics")
public void shouldFailChangeToSpecTopicName(
KafkaTopic kt,
KafkaTopic kafkaTopic,
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException, TimeoutException {
var expectedTopicName = TopicOperatorUtil.topicName(kt);
shouldFailOnModification(kafkaCluster, kt,
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException, TimeoutException {
var expectedTopicName = TopicOperatorUtil.topicName(kafkaTopic);
shouldFailOnModification(kafkaCluster, kafkaTopic,
theKt -> {
theKt.getSpec().setTopicName("CHANGED-" + expectedTopicName);
return theKt;
Expand Down Expand Up @@ -2080,67 +2112,108 @@ public void shouldTerminateIfQueueFull(
@Test
public void shouldNotReconcilePausedKafkaTopicOnAdd(
@BrokerConfig(name = BatchingTopicController.AUTO_CREATE_TOPICS_ENABLE, value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
var topicName = "my-topic";

// generation: 1, observedGeneration: 0
KafkaTopic kt = createTopic(
var kafkaTopic = createTopic(
kafkaCluster,
kafkaTopic(NAMESPACE, topicName, SELECTOR,
Map.of(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION, "true"),
true, topicName, 1, 1, Map.of()),
pausedIsTrue()
);

assertEquals(1, kt.getStatus().getObservedGeneration());
assertEquals(1, kafkaTopic.getStatus().getObservedGeneration());
assertNotExistsInKafka(topicName);
}

@Test
public void shouldNotReconcilePausedKafkaTopicOnUpdate(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
var topicName = "my-topic";

// generation: 1, observedGeneration: 1
createTopic(kafkaCluster,
kafkaTopic(NAMESPACE, topicName, SELECTOR, null, true, topicName, 1, 1, Map.of()));

// generation: 2, observedGeneration: 1
KafkaTopic kt = pauseTopic(NAMESPACE, topicName);

// generation: 3, observedGeneration: 1
TopicOperatorTestUtil.changeTopic(kubernetesClient, kt, theKt -> {

var kafkaTopic = pauseTopic(NAMESPACE, topicName);

TopicOperatorTestUtil.changeTopic(kubernetesClient, kafkaTopic, theKt -> {
theKt.getSpec().setConfig(Map.of(TopicConfig.FLUSH_MS_CONFIG, "1000"));
return theKt;
});

assertEquals(1, kt.getStatus().getObservedGeneration());
assertEquals(1, kafkaTopic.getStatus().getObservedGeneration());
assertEquals(Map.of(), topicConfigMap(topicName));
}

@Test
public void shouldReconcilePausedKafkaTopicOnDelete(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
var topicName = "my-topic";

// generation: 1, observedGeneration: 1
createTopic(kafkaCluster,
kafkaTopic(NAMESPACE, topicName, SELECTOR, null, true, topicName, 1, 1, Map.of()));

var kafkaTopic = pauseTopic(NAMESPACE, topicName);

// generation: 2, observedGeneration: 1
KafkaTopic kt = pauseTopic(NAMESPACE, topicName);

Crds.topicOperation(kubernetesClient).resource(kt).delete();
Crds.topicOperation(kubernetesClient).resource(kafkaTopic).delete();
LOGGER.info("Test deleted KafkaTopic {} with resourceVersion {}",
kt.getMetadata().getName(), TopicOperatorUtil.resourceVersion(kt));
Resource<KafkaTopic> resource = Crds.topicOperation(kubernetesClient).resource(kt);
kafkaTopic.getMetadata().getName(), TopicOperatorUtil.resourceVersion(kafkaTopic));
Resource<KafkaTopic> resource = Crds.topicOperation(kubernetesClient).resource(kafkaTopic);
TopicOperatorTestUtil.waitUntilCondition(resource, Objects::isNull);

assertNotExistsInKafka(topicName);
}

@Test
public void topicIdShouldBeEmptyOnPausedKafkaTopic(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
var topicName = "my-topic";
var kafkaTopic = createTopic(kafkaCluster,
kafkaTopic(NAMESPACE, topicName, SELECTOR, null, true, topicName, 1, 1, Map.of()));

assertNotNull(kafkaTopic.getStatus().getTopicName());
assertNotNull(kafkaTopic.getStatus().getTopicId());

kafkaTopic = pauseTopic(NAMESPACE, topicName);

assertNotNull(kafkaTopic.getStatus().getTopicName());
assertNull(kafkaTopic.getStatus().getTopicId());

kafkaTopic = unpauseTopic(NAMESPACE, topicName);

assertNotNull(kafkaTopic.getStatus().getTopicName());
assertNotNull(kafkaTopic.getStatus().getTopicId());
}

@Test
public void topicNameAndIdShouldBeEmptyOnUnmanagedKafkaTopic(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
var topicName = "my-topic";

var kafkaTopic = createTopic(kafkaCluster,
kafkaTopic(NAMESPACE, topicName, SELECTOR, null, true, topicName, 1, 1, Map.of()));

assertNotNull(kafkaTopic.getStatus().getTopicName());
assertNotNull(kafkaTopic.getStatus().getTopicId());

kafkaTopic = unmanageTopic(NAMESPACE, topicName);

assertNull(kafkaTopic.getStatus().getTopicName());
assertNull(kafkaTopic.getStatus().getTopicId());

kafkaTopic = manageTopic(NAMESPACE, topicName);

assertNotNull(kafkaTopic.getStatus().getTopicName());
assertNotNull(kafkaTopic.getStatus().getTopicId());
}

@Test
public void shouldReconcileKafkaTopicWithoutPartitions(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
Expand Down Expand Up @@ -2191,7 +2264,7 @@ public void shouldNotReconcileKafkaTopicWithMissingSpec(
var topicName = "my-topic";
maybeStartOperator(topicOperatorConfig(NAMESPACE, kafkaCluster));

var created = Crds.topicOperation(kubernetesClient)
Crds.topicOperation(kubernetesClient)
.resource(kafkaTopicWithNoSpec(topicName, false))
.create();

Expand Down Expand Up @@ -2273,5 +2346,4 @@ public void shouldUpdateAnUnmanagedTopic(
resourceVersionAfterUpdate.equals(kt.getMetadata().getResourceVersion())
);
}

}

0 comments on commit e7e8b11

Please sign in to comment.