Skip to content

Commit

Permalink
fix: configure topic retention based on retention clause for windowed…
Browse files Browse the repository at this point in the history
… tables (#5835)
  • Loading branch information
vcrfxia authored Jul 16, 2020
1 parent 44a3027 commit b509c99
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package io.confluent.ksql.topic;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.SqlFormatter;
Expand All @@ -34,6 +33,8 @@
import io.confluent.ksql.topic.TopicProperties.Builder;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -160,15 +161,26 @@ private <T extends CreateAsSelect> ConfiguredStatement<?> injectForCreateAsSelec
properties.getReplicas());

final String topicCleanUpPolicy;
final Map<String, Object> additionalTopicConfigs = new HashMap<>();
if (createAsSelect instanceof CreateStreamAsSelect) {
topicCleanUpPolicy = TopicConfig.CLEANUP_POLICY_DELETE;
} else {
topicCleanUpPolicy = createAsSelect.getQuery().getWindow().isPresent()
? TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE
: TopicConfig.CLEANUP_POLICY_COMPACT;
if (createAsSelect.getQuery().getWindow().isPresent()) {
topicCleanUpPolicy =
TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE;

createAsSelect.getQuery().getWindow().get().getKsqlWindowExpression().getRetention()
.ifPresent(retention -> additionalTopicConfigs.put(
TopicConfig.RETENTION_MS_CONFIG,
retention.toDuration().toMillis()
));
} else {
topicCleanUpPolicy = TopicConfig.CLEANUP_POLICY_COMPACT;
}
}

final TopicProperties info = createTopic(topicPropertiesBuilder, topicCleanUpPolicy);
final TopicProperties info
= createTopic(topicPropertiesBuilder, topicCleanUpPolicy, additionalTopicConfigs);

final T withTopic = (T) createAsSelect.copyWith(properties.withTopic(
info.getTopicName(),
Expand All @@ -184,11 +196,20 @@ private <T extends CreateAsSelect> ConfiguredStatement<?> injectForCreateAsSelec
private TopicProperties createTopic(
final Builder topicPropertiesBuilder,
final String topicCleanUpPolicy
) {
return createTopic(topicPropertiesBuilder, topicCleanUpPolicy, Collections.emptyMap());
}

private TopicProperties createTopic(
final Builder topicPropertiesBuilder,
final String topicCleanUpPolicy,
final Map<String, Object> additionalTopicConfigs
) {
final TopicProperties info = topicPropertiesBuilder.build();

final Map<String, ?> config =
ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy);
final Map<String, Object> config = new HashMap<>();
config.put(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy);
config.putAll(additionalTopicConfigs);

topicClient.createTopic(info.getTopicName(), info.getPartitions(), info.getReplicas(), config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -419,6 +420,29 @@ public void shouldCreateMissingTopicWithCompactAndDeleteCleanupPolicyForWindowed
TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE));
}

@Test
public void shouldCreateMissingTopicWithSpecifiedRetentionForWindowedTables() {
// Given:
givenStatement("CREATE TABLE x WITH (kafka_topic='topic') "
+ "AS SELECT * FROM SOURCE WINDOW TUMBLING (SIZE 10 SECONDS, RETENTION 4 DAYS);");
when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10));

// When:
injector.inject(statement, builder);

// Then:
verify(topicClient).createTopic(
"expectedName",
10,
(short) 10,
ImmutableMap.of(
TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE,
TopicConfig.RETENTION_MS_CONFIG,
Duration.ofDays(4).toMillis()
));
}

@Test
public void shouldHaveSuperUsefulErrorMessageIfCreateWithNoPartitions() {
// Given:
Expand Down

0 comments on commit b509c99

Please sign in to comment.