Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix topicId status update #10491

Merged
merged 2 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ public class BatchingTopicController {

// Key: topic name, Value: The KafkaTopics known to manage that topic
/* test */ final Map<String, List<KubeRef>> topics = new HashMap<>();

// topic name id map, which is updated on every reconciliation
private Map<String, String> topicNameIdMap;

private final TopicOperatorMetricsHolder metrics;
private final String namespace;
Expand Down Expand Up @@ -134,6 +137,7 @@ public class BatchingTopicController {
this.namespace = config.namespace();
this.enableAdditionalMetrics = config.enableAdditionalMetrics();
this.replicasChangeHandler = replicasChangeHandler;
this.topicNameIdMap = new HashMap<>();
}

/**
Expand Down Expand Up @@ -312,8 +316,7 @@ private PartitionedByError<ReconcilableTopic, Void> createTopics(List<Reconcilab
return partitionedByError(kts.stream().map(reconcilableTopic -> {
try {
values.get(reconcilableTopic.topicName()).get();
reconcilableTopic.kt().setStatus(new KafkaTopicStatusBuilder()
.withTopicId(ctr.topicId(reconcilableTopic.topicName()).get().toString()).build());
topicNameIdMap.put(reconcilableTopic.topicName(), ctr.topicId(reconcilableTopic.topicName()).get().toString());
return new Pair<>(reconcilableTopic, Either.ofRight((null)));
} catch (ExecutionException e) {
if (e.getCause() != null && e.getCause() instanceof TopicExistsException) {
Expand Down Expand Up @@ -388,6 +391,9 @@ void onUpdate(List<ReconcilableTopic> topics) throws InterruptedException {

private void updateInternal(List<ReconcilableTopic> batch) {
LOGGER.debugOp("Reconciling batch {}", batch);
Map<ReconcilableTopic, Either<TopicOperatorException, Object>> results = new HashMap<>();
topicNameIdMap.clear();

// process deletions
var partitionedByDeletion = batch.stream().filter(reconcilableTopic -> {
var kt = reconcilableTopic.kt();
Expand All @@ -412,7 +418,6 @@ private void updateInternal(List<ReconcilableTopic> batch) {
}

// process remaining
Map<ReconcilableTopic, Either<TopicOperatorException, Object>> results = new HashMap<>();
var remainingAfterDeletions = partitionedByDeletion.get(false);
var timerSamples = remainingAfterDeletions.stream().collect(
Collectors.toMap(identity(), rt -> startReconciliationTimer(metrics)));
Expand Down Expand Up @@ -878,6 +883,7 @@ private PartitionedByError<ReconcilableTopic, TopicState> describeTopic(List<Rec
ExecutionException exception = null;
try {
description = cs1.get(reconcilableTopic.topicName()).get();
topicNameIdMap.put(reconcilableTopic.topicName(), description.topicId().toString());
} catch (ExecutionException e) {
exception = e;
} catch (InterruptedException e) {
Expand Down Expand Up @@ -1214,33 +1220,39 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) {
// the observedGeneration is a marker that shows that the operator works and that it saw the last update to the resource
reconcilableTopic.kt().getStatus().setObservedGeneration(reconcilableTopic.kt().getMetadata().getGeneration());

// set or reset the topicName
// add or remove topicName
reconcilableTopic.kt().getStatus().setTopicName(
!TopicOperatorUtil.isManaged(reconcilableTopic.kt())
? null
: oldStatus != null && oldStatus.getTopicName() != null
? oldStatus.getTopicName()
: TopicOperatorUtil.topicName(reconcilableTopic.kt())
? oldStatus.getTopicName()
: TopicOperatorUtil.topicName(reconcilableTopic.kt())
);

// add or remove topicId
reconcilableTopic.kt().getStatus().setTopicId(
(!TopicOperatorUtil.isManaged(reconcilableTopic.kt()) || TopicOperatorUtil.isPaused(reconcilableTopic.kt()))
? null : topicNameIdMap.get(reconcilableTopic.topicName())
);

StatusDiff statusDiff = new StatusDiff(oldStatus, reconcilableTopic.kt().getStatus());
if (!statusDiff.isEmpty()) {
var updatedTopic = new KafkaTopicBuilder(reconcilableTopic.kt())
.editOrNewMetadata()
.withResourceVersion(null)
.endMetadata()
.withStatus(reconcilableTopic.kt().getStatus())
.build();
LOGGER.debugCr(reconcilableTopic.reconciliation(), "Updating status with {}", updatedTopic.getStatus());
var timerSample = TopicOperatorUtil.startExternalRequestTimer(metrics, enableAdditionalMetrics);
try {
var updatedTopic = new KafkaTopicBuilder(reconcilableTopic.kt())
.editOrNewMetadata()
.withResourceVersion(null)
.endMetadata()
.withStatus(reconcilableTopic.kt().getStatus())
.build();
LOGGER.debugCr(reconcilableTopic.reconciliation(), "Updating status with {}", updatedTopic.getStatus());
var timerSample = TopicOperatorUtil.startExternalRequestTimer(metrics, enableAdditionalMetrics);
var got = Crds.topicOperation(kubeClient).resource(updatedTopic).updateStatus();
TopicOperatorUtil.stopExternalRequestTimer(timerSample, metrics::updateStatusTimer, enableAdditionalMetrics, namespace);
LOGGER.traceCr(reconcilableTopic.reconciliation(), "Updated status to observedGeneration {}, resourceVersion {}",
got.getStatus().getObservedGeneration(), got.getMetadata().getResourceVersion());
} catch (Throwable e) {
LOGGER.errorOp("Status update failed: {}", e.getMessage());
}
TopicOperatorUtil.stopExternalRequestTimer(timerSample, metrics::updateStatusTimer, enableAdditionalMetrics, namespace);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -548,14 +548,50 @@ private List<KafkaTopic> createTopicsConcurrently(KafkaCluster kc, KafkaTopic...

private KafkaTopic pauseTopic(String namespace, String topicName) {
var current = Crds.topicOperation(kubernetesClient).inNamespace(namespace).withName(topicName).get();
var paused = Crds.topicOperation(kubernetesClient).resource(new KafkaTopicBuilder(current)
var kafkaTopic = Crds.topicOperation(kubernetesClient).resource(new KafkaTopicBuilder(current)
.editMetadata()
.withAnnotations(Map.of(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION, "true"))
.withAnnotations(Map.of(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION, "true"))
.endMetadata()
.build()).update();
LOGGER.info("Test paused KafkaTopic {} with resourceVersion {}",
paused.getMetadata().getName(), TopicOperatorUtil.resourceVersion(paused));
return waitUntil(paused, pausedIsTrue());
kafkaTopic.getMetadata().getName(), TopicOperatorUtil.resourceVersion(kafkaTopic));
return waitUntil(kafkaTopic, pausedIsTrue());
}

private KafkaTopic unpauseTopic(String namespace, String topicName) {
var current = Crds.topicOperation(kubernetesClient).inNamespace(namespace).withName(topicName).get();
var kafkaTopic = Crds.topicOperation(kubernetesClient).resource(new KafkaTopicBuilder(current)
.editMetadata()
.withAnnotations(Map.of(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION, "false"))
.endMetadata()
.build()).update();
LOGGER.info("Test unpaused KafkaTopic {} with resourceVersion {}",
kafkaTopic.getMetadata().getName(), TopicOperatorUtil.resourceVersion(kafkaTopic));
return waitUntil(kafkaTopic, readyIsTrue());
}

private KafkaTopic unmanageTopic(String namespace, String topicName) {
var current = Crds.topicOperation(kubernetesClient).inNamespace(namespace).withName(topicName).get();
var kafkaTopic = Crds.topicOperation(kubernetesClient).resource(new KafkaTopicBuilder(current)
.editMetadata()
.withAnnotations(Map.of(TopicOperatorUtil.MANAGED, "false"))
.endMetadata()
.build()).update();
LOGGER.info("Test unmanaged KafkaTopic {} with resourceVersion {}",
kafkaTopic.getMetadata().getName(), TopicOperatorUtil.resourceVersion(kafkaTopic));
return waitUntil(kafkaTopic, unmanagedIsTrue());
}

private KafkaTopic manageTopic(String namespace, String topicName) {
var current = Crds.topicOperation(kubernetesClient).inNamespace(namespace).withName(topicName).get();
var kafkaTopic = Crds.topicOperation(kubernetesClient).resource(new KafkaTopicBuilder(current)
.editMetadata()
.withAnnotations(Map.of(TopicOperatorUtil.MANAGED, "true"))
.endMetadata()
.build()).update();
LOGGER.info("Test managed KafkaTopic {} with resourceVersion {}",
kafkaTopic.getMetadata().getName(), TopicOperatorUtil.resourceVersion(kafkaTopic));
return waitUntil(kafkaTopic, readyIsTrue());
}

private TopicDescription awaitTopicDescription(String expectedTopicName) throws InterruptedException, ExecutionException, TimeoutException {
Expand Down Expand Up @@ -671,17 +707,12 @@ public void shouldCreateTopicInKafkaWhenKafkaTopicHasOnlyConfigs(
@ParameterizedTest
@MethodSource("unmanagedKafkaTopics")
public void shouldNotCreateTopicInKafkaWhenUnmanagedTopicCreatedInKube(
KafkaTopic kt,
KafkaTopic kafkaTopic,
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
// given

// when
var reconciled = createTopic(kafkaCluster, kt, unmanagedStatusTrue());

// then
assertNull(reconciled.getStatus().getTopicName());
assertNotExistsInKafka(TopicOperatorUtil.topicName(kt));
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
createTopic(kafkaCluster, kafkaTopic);
assertNotExistsInKafka(TopicOperatorUtil.topicName(kafkaTopic));
}

@ParameterizedTest
Expand Down Expand Up @@ -1470,11 +1501,12 @@ public void shouldFailCreationIfIllegalTopicName(
@ParameterizedTest
@MethodSource("managedKafkaTopics")
public void shouldFailChangeToSpecTopicName(
KafkaTopic kt,
KafkaTopic kafkaTopic,
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException, TimeoutException {
var expectedTopicName = TopicOperatorUtil.topicName(kt);
shouldFailOnModification(kafkaCluster, kt,
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException, TimeoutException {
var expectedTopicName = TopicOperatorUtil.topicName(kafkaTopic);
shouldFailOnModification(kafkaCluster, kafkaTopic,
theKt -> {
theKt.getSpec().setTopicName("CHANGED-" + expectedTopicName);
return theKt;
Expand Down Expand Up @@ -2030,67 +2062,108 @@ public void shouldTerminateIfQueueFull(
@Test
public void shouldNotReconcilePausedKafkaTopicOnAdd(
@BrokerConfig(name = BatchingTopicController.AUTO_CREATE_TOPICS_ENABLE, value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
var topicName = "my-topic";

// generation: 1, observedGeneration: 0
KafkaTopic kt = createTopic(
var kafkaTopic = createTopic(
kafkaCluster,
kafkaTopic(NAMESPACE, topicName, SELECTOR,
Map.of(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION, "true"),
true, topicName, 1, 1, Map.of()),
pausedIsTrue()
);

assertEquals(1, kt.getStatus().getObservedGeneration());
assertEquals(1, kafkaTopic.getStatus().getObservedGeneration());
assertNotExistsInKafka(topicName);
}

@Test
public void shouldNotReconcilePausedKafkaTopicOnUpdate(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
var topicName = "my-topic";

// generation: 1, observedGeneration: 1
createTopic(kafkaCluster,
kafkaTopic(NAMESPACE, topicName, SELECTOR, null, true, topicName, 1, 1, Map.of()));

// generation: 2, observedGeneration: 1
KafkaTopic kt = pauseTopic(NAMESPACE, topicName);

// generation: 3, observedGeneration: 1
TopicOperatorTestUtil.changeTopic(kubernetesClient, kt, theKt -> {

var kafkaTopic = pauseTopic(NAMESPACE, topicName);

TopicOperatorTestUtil.changeTopic(kubernetesClient, kafkaTopic, theKt -> {
theKt.getSpec().setConfig(Map.of(TopicConfig.FLUSH_MS_CONFIG, "1000"));
return theKt;
});

assertEquals(1, kt.getStatus().getObservedGeneration());
assertEquals(1, kafkaTopic.getStatus().getObservedGeneration());
assertEquals(Map.of(), topicConfigMap(topicName));
}

@Test
public void shouldReconcilePausedKafkaTopicOnDelete(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
var topicName = "my-topic";

// generation: 1, observedGeneration: 1
createTopic(kafkaCluster,
kafkaTopic(NAMESPACE, topicName, SELECTOR, null, true, topicName, 1, 1, Map.of()));

var kafkaTopic = pauseTopic(NAMESPACE, topicName);

// generation: 2, observedGeneration: 1
KafkaTopic kt = pauseTopic(NAMESPACE, topicName);

Crds.topicOperation(kubernetesClient).resource(kt).delete();
Crds.topicOperation(kubernetesClient).resource(kafkaTopic).delete();
LOGGER.info("Test deleted KafkaTopic {} with resourceVersion {}",
kt.getMetadata().getName(), TopicOperatorUtil.resourceVersion(kt));
Resource<KafkaTopic> resource = Crds.topicOperation(kubernetesClient).resource(kt);
kafkaTopic.getMetadata().getName(), TopicOperatorUtil.resourceVersion(kafkaTopic));
Resource<KafkaTopic> resource = Crds.topicOperation(kubernetesClient).resource(kafkaTopic);
TopicOperatorTestUtil.waitUntilCondition(resource, Objects::isNull);

assertNotExistsInKafka(topicName);
}

@Test
public void topicIdShouldBeEmptyOnPausedKafkaTopic(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
var topicName = "my-topic";
var kafkaTopic = createTopic(kafkaCluster,
kafkaTopic(NAMESPACE, topicName, SELECTOR, null, true, topicName, 1, 1, Map.of()));

assertNotNull(kafkaTopic.getStatus().getTopicName());
assertNotNull(kafkaTopic.getStatus().getTopicId());

kafkaTopic = pauseTopic(NAMESPACE, topicName);

assertNotNull(kafkaTopic.getStatus().getTopicName());
assertNull(kafkaTopic.getStatus().getTopicId());

kafkaTopic = unpauseTopic(NAMESPACE, topicName);

assertNotNull(kafkaTopic.getStatus().getTopicName());
assertNotNull(kafkaTopic.getStatus().getTopicId());
}

@Test
public void topicNameAndIdShouldBeEmptyOnUnmanagedKafkaTopic(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
var topicName = "my-topic";

var kafkaTopic = createTopic(kafkaCluster,
kafkaTopic(NAMESPACE, topicName, SELECTOR, null, true, topicName, 1, 1, Map.of()));

assertNotNull(kafkaTopic.getStatus().getTopicName());
assertNotNull(kafkaTopic.getStatus().getTopicId());

kafkaTopic = unmanageTopic(NAMESPACE, topicName);

assertNull(kafkaTopic.getStatus().getTopicName());
assertNull(kafkaTopic.getStatus().getTopicId());

kafkaTopic = manageTopic(NAMESPACE, topicName);

assertNotNull(kafkaTopic.getStatus().getTopicName());
assertNotNull(kafkaTopic.getStatus().getTopicId());
}

@Test
public void shouldReconcileKafkaTopicWithoutPartitions(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
Expand Down Expand Up @@ -2141,7 +2214,7 @@ public void shouldNotReconcileKafkaTopicWithMissingSpec(
var topicName = "my-topic";
maybeStartOperator(topicOperatorConfig(NAMESPACE, kafkaCluster));

var created = Crds.topicOperation(kubernetesClient)
Crds.topicOperation(kubernetesClient)
.resource(kafkaTopicWithNoSpec(topicName, false))
.create();

Expand Down Expand Up @@ -2223,5 +2296,4 @@ public void shouldUpdateAnUnmanagedTopic(
resourceVersionAfterUpdate.equals(kt.getMetadata().getResourceVersion())
);
}

}
Loading