diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java index c1a1504c825e..40d98dd1f19b 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java @@ -15,7 +15,6 @@ package io.confluent.ksql.topic; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.SqlFormatter; @@ -34,6 +33,8 @@ import io.confluent.ksql.topic.TopicProperties.Builder; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -160,15 +161,26 @@ private ConfiguredStatement injectForCreateAsSelec properties.getReplicas()); final String topicCleanUpPolicy; + final Map additionalTopicConfigs = new HashMap<>(); if (createAsSelect instanceof CreateStreamAsSelect) { topicCleanUpPolicy = TopicConfig.CLEANUP_POLICY_DELETE; } else { - topicCleanUpPolicy = createAsSelect.getQuery().getWindow().isPresent() - ? TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE - : TopicConfig.CLEANUP_POLICY_COMPACT; + if (createAsSelect.getQuery().getWindow().isPresent()) { + topicCleanUpPolicy = + TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE; + + createAsSelect.getQuery().getWindow().get().getKsqlWindowExpression().getRetention() + .ifPresent(retention -> additionalTopicConfigs.put( + TopicConfig.RETENTION_MS_CONFIG, + retention.toDuration().toMillis() + )); + } else { + topicCleanUpPolicy = TopicConfig.CLEANUP_POLICY_COMPACT; + } } - final TopicProperties info = createTopic(topicPropertiesBuilder, topicCleanUpPolicy); + final TopicProperties info + = createTopic(topicPropertiesBuilder, topicCleanUpPolicy, additionalTopicConfigs); final T withTopic = (T) createAsSelect.copyWith(properties.withTopic( info.getTopicName(), @@ -184,11 +196,20 @@ private ConfiguredStatement injectForCreateAsSelec private TopicProperties createTopic( final Builder topicPropertiesBuilder, final String topicCleanUpPolicy + ) { + return createTopic(topicPropertiesBuilder, topicCleanUpPolicy, Collections.emptyMap()); + } + + private TopicProperties createTopic( + final Builder topicPropertiesBuilder, + final String topicCleanUpPolicy, + final Map additionalTopicConfigs ) { final TopicProperties info = topicPropertiesBuilder.build(); - final Map config = - ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy); + final Map config = new HashMap<>(); + config.put(TopicConfig.CLEANUP_POLICY_CONFIG, topicCleanUpPolicy); + config.putAll(additionalTopicConfigs); topicClient.createTopic(info.getTopicName(), info.getPartitions(), info.getReplicas(), config); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java index b5304ff3205d..d10622fd0cb8 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java @@ -54,6 +54,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -419,6 +420,29 @@ public void shouldCreateMissingTopicWithCompactAndDeleteCleanupPolicyForWindowed TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE)); } + @Test + public void shouldCreateMissingTopicWithSpecifiedRetentionForWindowedTables() { + // Given: + givenStatement("CREATE TABLE x WITH (kafka_topic='topic') " + + "AS SELECT * FROM SOURCE WINDOW TUMBLING (SIZE 10 SECONDS, RETENTION 4 DAYS);"); + when(builder.build()).thenReturn(new TopicProperties("expectedName", 10, (short) 10)); + + // When: + injector.inject(statement, builder); + + // Then: + verify(topicClient).createTopic( + "expectedName", + 10, + (short) 10, + ImmutableMap.of( + TopicConfig.CLEANUP_POLICY_CONFIG, + TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, + TopicConfig.RETENTION_MS_CONFIG, + Duration.ofDays(4).toMillis() + )); + } + @Test public void shouldHaveSuperUsefulErrorMessageIfCreateWithNoPartitions() { // Given: