Skip to content

Commit

Permalink
Improve handling of invalid topic configurations (#10517)
Browse files Browse the repository at this point in the history
Signed-off-by: Federico Valeri <[email protected]>
Signed-off-by: Tom Bentley <[email protected]>
Co-authored-by: Tom Bentley <[email protected]>
  • Loading branch information
fvaleri and tombentley authored Sep 2, 2024
1 parent e815d6b commit 5e7a711
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.strimzi.api.kafka.model.topic.KafkaTopicStatusBuilder;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.model.InvalidResourceException;
import io.strimzi.operator.common.model.StatusDiff;
import io.strimzi.operator.common.model.StatusUtils;
import io.strimzi.operator.topic.metrics.TopicOperatorMetricsHolder;
Expand Down Expand Up @@ -292,10 +293,15 @@ private static Either<TopicOperatorException, Boolean> validateUnchangedTopicNam
}

private PartitionedByError<ReconcilableTopic, Void> createTopics(List<ReconcilableTopic> kts) {
Map<ReconcilableTopic, TopicOperatorException> newTopicsErrors = new HashMap<>();
var newTopics = kts.stream().map(reconcilableTopic -> {
// Admin create
return buildNewTopic(reconcilableTopic.kt(), reconcilableTopic.topicName());
}).collect(Collectors.toSet());
try {
return buildNewTopic(reconcilableTopic.kt(), reconcilableTopic.topicName());
} catch (InvalidResourceException e) {
newTopicsErrors.put(reconcilableTopic, new TopicOperatorException.InternalError(e));
return null;
}
}).filter(Objects::nonNull).collect(Collectors.toSet());

LOGGER.debugOp("Admin.createTopics({})", newTopics);
var timerSample = TopicOperatorUtil.startExternalRequestTimer(metrics, enableAdditionalMetrics);
Expand All @@ -310,6 +316,9 @@ private PartitionedByError<ReconcilableTopic, Void> createTopics(List<Reconcilab
});
Map<String, KafkaFuture<Void>> values = ctr.values();
return partitionedByError(kts.stream().map(reconcilableTopic -> {
if (newTopicsErrors.containsKey(reconcilableTopic)) {
return new Pair<>(reconcilableTopic, Either.ofLeft(newTopicsErrors.get(reconcilableTopic)));
}
try {
values.get(reconcilableTopic.topicName()).get();
reconcilableTopic.kt().setStatus(new KafkaTopicStatusBuilder()
Expand Down Expand Up @@ -345,25 +354,27 @@ private static Map<String, String> buildConfigsMap(KafkaTopic kt) {
Map<String, String> configs = new HashMap<>();
if (hasConfig(kt)) {
for (var entry : kt.getSpec().getConfig().entrySet()) {
configs.put(entry.getKey(), configValueAsString(entry.getValue()));
configs.put(entry.getKey(), configValueAsString(entry.getKey(), entry.getValue()));
}
}
return configs;
}

private static String configValueAsString(Object value) {
private static String configValueAsString(String key, Object value) {
String valueStr;
if (value instanceof String
if (value == null) {
valueStr = null;
} else if (value instanceof String
|| value instanceof Boolean) {
valueStr = value.toString();
} else if (value instanceof Number) {
valueStr = value.toString();
} else if (value instanceof List) {
valueStr = ((List<?>) value).stream()
.map(BatchingTopicController::configValueAsString)
.map(v -> BatchingTopicController.configValueAsString(key, v))
.collect(Collectors.joining(","));
} else {
throw new RuntimeException("Cannot convert " + value);
throw new InvalidResourceException("Invalid value for topic config '" + key + "': " + value);
}
return valueStr;
}
Expand Down Expand Up @@ -1061,7 +1072,7 @@ private Collection<AlterConfigOp> buildAlterConfigOps(Reconciliation reconciliat
if (hasConfig(kt)) {
for (var specConfigEntry : kt.getSpec().getConfig().entrySet()) {
String key = specConfigEntry.getKey();
var specValueStr = configValueAsString(specConfigEntry.getValue());
var specValueStr = configValueAsString(specConfigEntry.getKey(), specConfigEntry.getValue());
var kafkaConfigEntry = configs.get(key);
if (kafkaConfigEntry == null
|| !Objects.equals(specValueStr, kafkaConfigEntry.value())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1781,6 +1781,56 @@ public void shouldFailAlterConfigIfNoTopicAuthz(KafkaTopic kt,
assertEquals("KafkaError", condition.getReason());
assertEquals("org.apache.kafka.common.errors.TopicAuthorizationException: not allowed", condition.getMessage());
}

@Test
public void shouldFailTheReconciliationWithNullConfig(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
invalidConfigFailsReconciliation(
kafkaCluster,
null,
"KafkaError",
"org.apache.kafka.common.errors.InvalidConfigurationException: Null value not supported for topic configs: cleanup.policy");
}

@Test
public void shouldFailTheReconciliationWithUnexpectedConfig(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
invalidConfigFailsReconciliation(
kafkaCluster,
Map.of("foo", 12),
"InternalError",
"io.strimzi.operator.common.model.InvalidResourceException: Invalid value for topic config 'cleanup.policy': {foo=12}");
}

private void invalidConfigFailsReconciliation(
KafkaCluster kafkaCluster,
Map<String, Integer> policy,
String expectedReasons,
String expectedMessage
) throws ExecutionException, InterruptedException {
Map<String, Object> configs = new HashMap<>();
configs.put("cleanup.policy", policy);
KafkaTopic kafkaTopic = new KafkaTopicBuilder()
.withNewMetadata()
.withNamespace(NAMESPACE)
.withName("my-topic")
.withLabels(SELECTOR)
.endMetadata()
.withNewSpec()
.withConfig(configs)
.withPartitions(1)
.withReplicas(1)
.endSpec()
.build();
var created = createTopic(kafkaCluster, kafkaTopic);
var condition = assertExactlyOneCondition(created);
assertEquals(expectedReasons, condition.getReason());
assertEquals(expectedMessage, condition.getMessage());
}

private static KafkaTopic setGzipCompression(KafkaTopic kt) {
return setCompression(kt, "gzip");
Expand Down

0 comments on commit 5e7a711

Please sign in to comment.