From 8cc913a0ae81212323016916c4be258d613bb27c Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Thu, 21 Nov 2019 10:03:07 -0800 Subject: [PATCH] chore: remove all CompatibilityBreakingConfigs (#3933) --- .../java/io/confluent/ksql/cli/CliTest.java | 6 +- .../io/confluent/ksql/util/KsqlConfig.java | 170 +---- .../ksql/config/KsqlConfigResolverTest.java | 12 +- .../confluent/ksql/util/KsqlConfigTest.java | 96 +-- .../ksql/function/udf/string/Substring.java | 114 +-- .../ksql/planner/plan/AggregateNode.java | 10 +- .../confluent/ksql/query/QueryExecutor.java | 4 +- .../ksql/structured/SchemaKGroupedStream.java | 12 - .../ksql/topic/TopicCreateInjector.java | 10 +- .../confluent/ksql/topic/TopicProperties.java | 56 -- .../function/udf/string/SubstringTest.java | 170 ----- .../ksql/planner/plan/AggregateNodeTest.java | 44 -- .../ksql/streams/GroupedFactoryTest.java | 36 +- .../ksql/streams/JoinedFactoryTest.java | 32 +- .../structured/SchemaKGroupedStreamTest.java | 1 - .../ksql/topic/TopicCreateInjectorTest.java | 50 -- .../ksql/topic/TopicPropertiesTest.java | 665 +++++------------- .../ksql/test/tools/TestExecutor.java | 13 +- .../io/confluent/ksql/test/tools/Topic.java | 21 +- .../ksql/test/rest/RestTestExecutor.java | 2 +- .../session-windows.json | 35 - .../query-validation-tests/substring.json | 49 +- .../InteractiveStatementExecutor.java | 5 - .../InteractiveStatementExecutorTest.java | 52 +- .../ksql/serde/avro/AvroDataTranslator.java | 5 +- .../ksql/serde/avro/AvroSchemas.java | 19 +- .../ksql/serde/avro/KsqlAvroSerdeFactory.java | 4 +- .../serde/avro/AvroDataTranslatorTest.java | 26 +- .../serde/avro/KsqlAvroSerializerTest.java | 12 - .../execution/streams/GroupedFactory.java | 20 +- .../ksql/execution/streams/JoinedFactory.java | 21 +- .../streams/MaterializedFactory.java | 25 +- .../streams/StreamJoinedFactory.java | 20 +- .../streams/StreamSourceBuilder.java | 10 - .../execution/streams/StreamsFactories.java | 8 +- .../ksql/execution/streams/StreamsUtil.java | 9 - .../streams/MaterializedFactoryTest.java | 34 +- .../streams/StreamSourceBuilderTest.java | 49 -- 38 files changed, 278 insertions(+), 1649 deletions(-) diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index c52047239774..95e5856032a6 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -849,11 +849,11 @@ public void shouldDescribeOverloadedScalarFunction() { assertThat(output, containsString( "\tVariation : SUBSTRING(str VARCHAR, pos INT)\n" + "\tReturns : VARCHAR\n" - + "\tDescription : Returns a substring of str that starts at pos and continues to the end" + + "\tDescription : Returns a substring of str from pos to the end of str" )); assertThat(output, containsString( - "\tstr : The source string. If null, then function returns null.\n" - + "\tpos : The base-one position the substring starts from." + "\tstr : The source string.\n" + + "\tpos : The base-one position to start from." )); } diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 1549061aacd1..316a851abe84 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -59,10 +59,6 @@ public class KsqlConfig extends AbstractConfig { public static final String METRIC_REPORTER_CLASSES_DOC = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC; - public static final String SINK_NUMBER_OF_PARTITIONS_PROPERTY = "ksql.sink.partitions"; - - public static final String SINK_NUMBER_OF_REPLICAS_PROPERTY = "ksql.sink.replicas"; - public static final String KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY = "ksql.internal.topic.replicas"; public static final String KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS_PROPERTY = @@ -97,6 +93,8 @@ public class KsqlConfig extends AbstractConfig { KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG = "ksql.persistent.prefix"; public static final String KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT = "query_"; + public static final String + KSQL_PERSISTENT_QUERY_NAME_PREFIX_DOC = "Prefixes persistent queries with this value."; public static final String KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG = "ksql.transient.prefix"; @@ -111,28 +109,6 @@ public class KsqlConfig extends AbstractConfig { + "'CREATE STREAM S AS ...' will create a topic 'thing-S', where as the statement " + "'CREATE STREAM S WITH(KAFKA_TOPIC = 'foo') AS ...' will create a topic 'foo'."; - public static final String KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG = - KSQL_FUNCTIONS_PROPERTY_PREFIX + "substring.legacy.args"; - private static final String - KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_DOCS = "Switch the SUBSTRING function into legacy mode," - + " i.e. back to how it was in version 5.0 and earlier of KSQL." - + " Up to version 5.0.x substring took different args:" - + " VARCHAR SUBSTRING(str VARCHAR, startIndex INT, endIndex INT), where startIndex and" - + " endIndex were both base-zero indexed, e.g. a startIndex of '0' selected the start of the" - + " string, and the last argument is a character index, rather than the length of the" - + " substring to extract. Later versions of KSQL use:" - + " VARCHAR SUBSTRING(str VARCHAR, pos INT, length INT), where pos is base-one indexed," - + " and the last argument is the length of the substring to extract."; - - public static final String KSQL_WINDOWED_SESSION_KEY_LEGACY_CONFIG = - KSQL_CONFIG_PROPERTY_PREFIX + "windowed.session.key.legacy"; - - private static final String KSQL_WINDOWED_SESSION_KEY_LEGACY_DOC = "" - + "Version 5.1 of KSQL and earlier incorrectly excluded the end time in the record key in " - + "Kafka for session windowed data. Setting this value to true will make KSQL expect and " - + "continue to store session keys without the end time. With the default value of false " - + "new queries will now correctly store the session end time as part of the key"; - public static final String KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG = "ksql.query.persistent.active.limit"; private static final int KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DEFAULT = Integer.MAX_VALUE; @@ -141,22 +117,6 @@ public class KsqlConfig extends AbstractConfig { + "in interactive mode. Once this limit is reached, any further persistent queries will not " + "be accepted."; - public static final String KSQL_USE_NAMED_INTERNAL_TOPICS = "ksql.named.internal.topics"; - private static final String KSQL_USE_NAMED_INTERNAL_TOPICS_DOC = ""; - public static final String KSQL_USE_NAMED_INTERNAL_TOPICS_ON = "on"; - public static final String KSQL_USE_NAMED_INTERNAL_TOPICS_OFF = "off"; - private static final Validator KSQL_USE_NAMED_INTERNAL_TOPICS_VALIDATOR = ValidString.in( - KSQL_USE_NAMED_INTERNAL_TOPICS_ON, KSQL_USE_NAMED_INTERNAL_TOPICS_OFF - ); - - public static final String KSQL_USE_NAMED_AVRO_MAPS = "ksql.avro.maps.named"; - private static final String KSQL_USE_NAMED_AVRO_MAPS_DOC = ""; - - public static final String KSQL_LEGACY_REPARTITION_ON_GROUP_BY_ROWKEY = - "ksql.query.stream.groupby.rowkey.repartition"; - public static final String KSQL_INJECT_LEGACY_MAP_VALUES_NODE = - "ksql.query.inject.legacy.map.values.node"; - public static final String KSQL_WRAP_SINGLE_VALUES = "ksql.persistence.wrap.single.values"; @@ -224,99 +184,7 @@ public class KsqlConfig extends AbstractConfig { + "milliseconds when waiting for rebalancing of the stream store during a pull query"; public static final Collection COMPATIBLY_BREAKING_CONFIG_DEFS - = ImmutableList.of( - new CompatibilityBreakingConfigDef( - KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, - ConfigDef.Type.STRING, - KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT, - KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT, - ConfigDef.Importance.MEDIUM, - Optional.empty(), - "Second part of the prefix for persistent queries. For instance if " - + "the prefix is query_ the query name will be ksql_query_1."), - new CompatibilityBreakingConfigDef( - KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG, - ConfigDef.Type.BOOLEAN, - false, - false, - ConfigDef.Importance.LOW, - Optional.empty(), - KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_DOCS), - new CompatibilityBreakingConfigDef( - KSQL_WINDOWED_SESSION_KEY_LEGACY_CONFIG, - ConfigDef.Type.BOOLEAN, - false, - false, - ConfigDef.Importance.LOW, - Optional.empty(), - KSQL_WINDOWED_SESSION_KEY_LEGACY_DOC), - new CompatibilityBreakingConfigDef( - KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG, - ConfigDef.Type.INT, - KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DEFAULT, - KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DEFAULT, - ConfigDef.Importance.LOW, - Optional.empty(), - KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DOC), - new CompatibilityBreakingConfigDef( - KSQL_USE_NAMED_INTERNAL_TOPICS, - ConfigDef.Type.STRING, - KSQL_USE_NAMED_INTERNAL_TOPICS_ON, - KSQL_USE_NAMED_INTERNAL_TOPICS_ON, - ConfigDef.Importance.LOW, - KSQL_USE_NAMED_INTERNAL_TOPICS_DOC, - Optional.empty(), - KSQL_USE_NAMED_INTERNAL_TOPICS_VALIDATOR), - new CompatibilityBreakingConfigDef( - SINK_NUMBER_OF_PARTITIONS_PROPERTY, - Type.INT, - null, - null, - Importance.LOW, - Optional.empty(), - "The legacy default number of partitions for the topics created by KSQL" - + "in 5.2 and earlier versions." - + "This property should not be set for 5.3 and later versions."), - new CompatibilityBreakingConfigDef( - SINK_NUMBER_OF_REPLICAS_PROPERTY, - ConfigDef.Type.SHORT, - null, - null, - ConfigDef.Importance.LOW, - Optional.empty(), - "The default number of replicas for the topics created by KSQL " - + "in 5.2 and earlier versions." - + "This property should not be set for 5.3 and later versions." - ), - new CompatibilityBreakingConfigDef( - KSQL_USE_NAMED_AVRO_MAPS, - ConfigDef.Type.BOOLEAN, - true, - true, - ConfigDef.Importance.LOW, - Optional.empty(), - KSQL_USE_NAMED_AVRO_MAPS_DOC - ), - new CompatibilityBreakingConfigDef( - KSQL_LEGACY_REPARTITION_ON_GROUP_BY_ROWKEY, - ConfigDef.Type.BOOLEAN, - false, - false, - ConfigDef.Importance.LOW, - Optional.empty(), - "Ensures legacy queries that perform a 'GROUP BY ROWKEY' continue to " - + "perform an unnecessary repartition step" - ), - new CompatibilityBreakingConfigDef( - KSQL_INJECT_LEGACY_MAP_VALUES_NODE, - ConfigDef.Type.BOOLEAN, - false, - false, - ConfigDef.Importance.LOW, - Optional.empty(), - "Ensures legacy queries maintian the same topology" - ) - ); + = ImmutableList.of(); private enum ConfigGeneration { LEGACY, @@ -399,12 +267,7 @@ void defineCurrent(final ConfigDef configDef) { } private static final Collection - COMPATIBILITY_BREAKING_STREAMS_CONFIGS = ImmutableList.of( - new CompatibilityBreakingStreamsConfig( - StreamsConfig.TOPOLOGY_OPTIMIZATION, - StreamsConfig.OPTIMIZE, - StreamsConfig.OPTIMIZE) - ); + COMPATIBILITY_BREAKING_STREAMS_CONFIGS = ImmutableList.of(); private static final class CompatibilityBreakingStreamsConfig { final String name; @@ -617,8 +480,21 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DEFAULT, Importance.LOW, KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC + ).define( + KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, + Type.STRING, + KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT, + Importance.LOW, + KSQL_PERSISTENT_QUERY_NAME_PREFIX_DOC + ).define( + KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG, + Type.INT, + KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DEFAULT, + Importance.MEDIUM, + KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DOC ) .withClientSslSupport(); + for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef : COMPATIBLY_BREAKING_CONFIG_DEFS) { if (generation == ConfigGeneration.CURRENT) { @@ -722,7 +598,6 @@ private KsqlConfig(final ConfigGeneration generation, final Map props) { generation == ConfigGeneration.CURRENT ? config.defaultValueCurrent : config.defaultValueLegacy)); this.ksqlStreamConfigProps = buildStreamingConfig(streamsConfigDefaults, originals()); - validate(); } private boolean getBooleanConfig(final String config, final boolean defaultValue) { @@ -740,17 +615,6 @@ private KsqlConfig(final ConfigGeneration generation, this.ksqlStreamConfigProps = ksqlStreamConfigProps; } - private void validate() { - final Object optimizationsConfig = getKsqlStreamConfigProps().get( - StreamsConfig.TOPOLOGY_OPTIMIZATION); - final Object useInternalNamesConfig = get(KSQL_USE_NAMED_INTERNAL_TOPICS); - if (Objects.equals(optimizationsConfig, StreamsConfig.OPTIMIZE) - && useInternalNamesConfig.equals(KSQL_USE_NAMED_INTERNAL_TOPICS_OFF)) { - throw new RuntimeException( - "Internal topic naming must be enabled if streams optimizations enabled"); - } - } - public Map getKsqlStreamConfigProps() { final Map props = new HashMap<>(); for (final ConfigValue config : ksqlStreamConfigProps.values()) { diff --git a/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java b/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java index 8ca01d892a48..d20582535735 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java @@ -52,14 +52,14 @@ public void setUp() { @Test public void shouldResolveKsqlProperty() { - assertThat(resolver.resolve(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, true), - is(resolvedItem(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, KSQL_CONFIG_DEF))); + assertThat(resolver.resolve(KsqlConfig.CONNECT_URL_PROPERTY, true), + is(resolvedItem(KsqlConfig.CONNECT_URL_PROPERTY, KSQL_CONFIG_DEF))); } @Test public void shouldNotFindPrefixedKsqlProperty() { assertNotFound( - KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX + KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY); + KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX + KsqlConfig.CONNECT_URL_PROPERTY); } @Test @@ -67,12 +67,6 @@ public void shouldNotFindUnknownKsqlProperty() { assertNotFound(KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX + "you.won't.find.me...right"); } - @Test - public void shouldResolveKnownKsqlFunctionProperty() { - assertThat(resolver.resolve(KsqlConfig.KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG, true), - is(resolvedItem(KsqlConfig.KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG, KSQL_CONFIG_DEF))); - } - @Test public void shouldReturnUnresolvedForOtherKsqlFunctionProperty() { assertThat( diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java index 8244fa4ce64c..48f898f3898a 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java @@ -36,10 +36,10 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.streams.StreamsConfig; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -51,15 +51,10 @@ public class KsqlConfigTest { @Test public void shouldSetInitialValuesCorrectly() { final Map initialProps = new HashMap<>(); - initialProps.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, 10); - initialProps.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, (short) 3); initialProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 800); - initialProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5); final KsqlConfig ksqlConfig = new KsqlConfig(initialProps); - - assertThat(ksqlConfig.getInt(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY), equalTo(10)); - assertThat(ksqlConfig.getShort(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY), equalTo((short) 3)); + assertThat(ksqlConfig.getKsqlStreamConfigProps().get(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), is(800L)); } @Test @@ -308,7 +303,7 @@ public void shouldHaveCorrectOriginalsAfterCloneWithOverwrite() { // Given: final KsqlConfig initial = new KsqlConfig(ImmutableMap.of( KsqlConfig.KSQL_SERVICE_ID_CONFIG, "original-id", - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS, "on" + KsqlConfig.KSQL_WRAP_SINGLE_VALUES, "true" )); // When: @@ -320,7 +315,7 @@ public void shouldHaveCorrectOriginalsAfterCloneWithOverwrite() { // Then: assertThat(cloned.originals(), is(ImmutableMap.of( KsqlConfig.KSQL_SERVICE_ID_CONFIG, "overridden-id", - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS, "on", + KsqlConfig.KSQL_WRAP_SINGLE_VALUES, "true", KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, "bob" ))); } @@ -381,6 +376,7 @@ public void shouldCloneWithPrefixedStreamPropertyOverwrite() { } @Test + @Ignore // we don't have any compatibility sensitive configs! public void shouldPreserveOriginalCompatibilitySensitiveConfigs() { final Map originalProperties = ImmutableMap.of( KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, "not_the_default"); @@ -508,20 +504,6 @@ public void shouldListKnownKsqlConfig() { assertThat(result.get(KsqlConfig.KSQL_SERVICE_ID_CONFIG), is("not sensitive")); } - @Test - public void shouldListKnownKsqlFunctionConfig() { - // Given: - final KsqlConfig config = new KsqlConfig(ImmutableMap.of( - KsqlConfig.KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG, "true" - )); - - // When: - final Map result = config.getAllConfigPropsWithSecretsObfuscated(); - - // Then: - assertThat(result.get(KsqlConfig.KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG), is("true")); - } - @Test public void shouldListUnknownKsqlFunctionConfigObfuscated() { // Given: @@ -575,49 +557,6 @@ public void shouldNotListUnresolvedServerConfig() { assertThat(result.get("some.random.property"), is(nullValue())); } - @Test - public void shouldDefaultOptimizationsToOn() { - // When: - final KsqlConfig config = new KsqlConfig(Collections.emptyMap()); - - // Then: - assertThat( - config.getKsqlStreamConfigProps().get(StreamsConfig.TOPOLOGY_OPTIMIZATION), - equalTo(StreamsConfig.OPTIMIZE)); - } - - @Test - public void shouldDefaultOptimizationsToOnForOldConfigs() { - // When: - final KsqlConfig config = new KsqlConfig(Collections.emptyMap()) - .overrideBreakingConfigsWithOriginalValues(Collections.emptyMap()); - - // Then: - assertThat( - config.getKsqlStreamConfigProps().get(StreamsConfig.TOPOLOGY_OPTIMIZATION), - equalTo(StreamsConfig.OPTIMIZE)); - } - - @Test - public void shouldPreserveOriginalOptimizationConfig() { - // Given: - final KsqlConfig config = new KsqlConfig( - Collections.singletonMap( - StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE)); - final KsqlConfig saved = new KsqlConfig( - Collections.singletonMap( - StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION)); - - // When: - final KsqlConfig merged = config.overrideBreakingConfigsWithOriginalValues( - saved.getAllConfigPropsWithSecretsObfuscated()); - - // Then: - assertThat( - merged.getKsqlStreamConfigProps().get(StreamsConfig.TOPOLOGY_OPTIMIZATION), - equalTo(StreamsConfig.NO_OPTIMIZATION)); - } - @Test public void shouldFilterProducerConfigs() { // Given: @@ -633,29 +572,4 @@ public void shouldFilterProducerConfigs() { assertThat(ksqlConfig.getProducerClientConfigProps(), hasEntry(ProducerConfig.CLIENT_ID_CONFIG, null)); assertThat(ksqlConfig.getProducerClientConfigProps(), not(hasKey("not.a.config"))); } - - @Test - public void shouldRaiseIfInternalTopicNamingOffAndStreamsOptimizationsOn() { - expectedException.expect(RuntimeException.class); - expectedException.expectMessage( - "Internal topic naming must be enabled if streams optimizations enabled"); - new KsqlConfig( - ImmutableMap.of( - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS_OFF, - StreamsConfig.TOPOLOGY_OPTIMIZATION, - StreamsConfig.OPTIMIZE) - ); - } - - @Test - public void shouldRaiseOnInvalidInternalTopicNamingValue() { - expectedException.expect(ConfigException.class); - new KsqlConfig( - Collections.singletonMap( - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS, - "foobar" - ) - ); - } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java index e67d6fdf0e68..3311a0f67496 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java @@ -18,107 +18,47 @@ import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; -import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlConstants; -import java.util.Map; -import java.util.Objects; -import org.apache.kafka.common.Configurable; @SuppressWarnings("unused") // Invoked via reflection. @UdfDescription(name = "substring", author = KsqlConstants.CONFLUENT_AUTHOR, - description = "Returns a substring of the passed in value.\n" - + "The behaviour of this function changed in release 5.1. " - + "It is possible to switch the function back to pre-v5.1 functionality via the setting:\n" - + "\t" + KsqlConfig.KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG + "\n" - + "This can be set globally, through the server configuration file, " - + "or per sessions or query via the set command.") -public class Substring implements Configurable { + description = "Returns a substring of the passed in value.") +public class Substring { - private Impl impl = new CurrentImpl(); - - @Override - public void configure(final Map props) { - final KsqlConfig config = new KsqlConfig(props); - final boolean legacyArgs = - config.getBoolean(KsqlConfig.KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG); - - impl = legacyArgs ? new LegacyImpl() : new CurrentImpl(); - } - - @Udf(description = "Returns a substring of str that starts at pos" - + " and continues to the end of the string") + @Udf(description = "Returns a substring of str from pos to the end of str") public String substring( - @UdfParameter( - description = "The source string. If null, then function returns null.") final String str, - @UdfParameter( - description = "The base-one position the substring starts from." - + " If null, then function returns null." - + " (If in legacy mode, this argument is base-zero)") final Integer pos) { - return impl.substring(str, pos); + @UdfParameter(description = "The source string.") final String str, + @UdfParameter(description = "The base-one position to start from.") final Integer pos + ) { + if (str == null || pos == null) { + return null; + } + final int start = getStartIndex(str, pos); + return str.substring(start); } @Udf(description = "Returns a substring of str that starts at pos and is of length len") public String substring( - @UdfParameter( - description = "The source string. If null, then function returns null.") final String str, - @UdfParameter( - description = "The base-one position the substring starts from." - + " If null, then function returns null." - + " (If in legacy mode, this argument is base-zero)") final Integer pos, - @UdfParameter( - description = "The length of the substring to extract." - + " If null, then function returns null." - + " (If in legacy mode, this argument is the endIndex (exclusive)," - + " rather than the length") final Integer length) { - return impl.substring(str, pos, length); - } - - private interface Impl { - String substring(String value, Integer pos); - - String substring(String value, Integer pos, Integer length); - } - - private static final class LegacyImpl implements Impl { - public String substring(final String value, final Integer startIndex) { - Objects.requireNonNull(startIndex, "startIndex"); - return value.substring(startIndex); - } - - public String substring(final String value, final Integer startIndex, final Integer endIndex) { - Objects.requireNonNull(startIndex, "startIndex"); - Objects.requireNonNull(endIndex, "endIndex"); - return value.substring(startIndex, endIndex); + @UdfParameter(description = "The source string.") final String str, + @UdfParameter(description = "The base-one position to start from.") final Integer pos, + @UdfParameter(description = "The length to extract.") final Integer length + ) { + if (str == null || pos == null || length == null) { + return null; } + final int start = getStartIndex(str, pos); + final int end = getEndIndex(str, start, length); + return str.substring(start, end); } - private static final class CurrentImpl implements Impl { - public String substring(final String str, final Integer pos) { - if (str == null || pos == null) { - return null; - } - final int start = getStartIndex(str, pos); - return str.substring(start); - } - - public String substring(final String str, final Integer pos, final Integer length) { - if (str == null || pos == null || length == null) { - return null; - } - final int start = getStartIndex(str, pos); - final int end = getEndIndex(str, start, length); - return str.substring(start, end); - } - - private static int getStartIndex(final String value, final Integer pos) { - return pos < 0 - ? Math.max(value.length() + pos, 0) - : Math.max(Math.min(pos - 1, value.length()), 0); - } + private static int getStartIndex(final String value, final Integer pos) { + return pos < 0 + ? Math.max(value.length() + pos, 0) + : Math.max(Math.min(pos - 1, value.length()), 0); + } - private static int getEndIndex(final String value, final int start, final int length) { - return Math.max(Math.min(start + length, value.length()), start); - } + private static int getEndIndex(final String value, final int start, final int length) { + return Math.max(Math.min(start + length, value.length()), start); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java index fd5024221fd5..e2f44f241bac 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java @@ -39,7 +39,6 @@ import io.confluent.ksql.structured.SchemaKGroupedStream; import io.confluent.ksql.structured.SchemaKStream; import io.confluent.ksql.structured.SchemaKTable; -import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.SchemaUtil; import java.util.ArrayList; @@ -205,8 +204,7 @@ public SchemaKStream buildStream(final KsqlQueryBuilder builder) { final List internalGroupByColumns = internalSchema.resolveGroupByExpressions( getGroupByExpressions(), - aggregateArgExpanded, - builder.getKsqlConfig() + aggregateArgExpanded ); final SchemaKGroupedStream schemaKGroupedStream = aggregateArgExpanded.groupBy( @@ -290,11 +288,9 @@ private void collectAggregateArgExpressions( List resolveGroupByExpressions( final List expressionList, - final SchemaKStream aggregateArgExpanded, - final KsqlConfig ksqlConfig + final SchemaKStream aggregateArgExpanded ) { - final boolean specialRowTimeHandling = !(aggregateArgExpanded instanceof SchemaKTable) - && !ksqlConfig.getBoolean(KsqlConfig.KSQL_LEGACY_REPARTITION_ON_GROUP_BY_ROWKEY); + final boolean specialRowTimeHandling = !(aggregateArgExpanded instanceof SchemaKTable); final Function mapper = e -> { final boolean rowKey = e instanceof ColumnReferenceExp diff --git a/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index 9eb4d14acc90..3ea3d145d81c 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -201,11 +201,11 @@ public PersistentQueryMetadata buildQuery( final KsqlQueryBuilder ksqlQueryBuilder = queryBuilder(queryId); final PlanBuilder planBuilder = new KSPlanBuilder(ksqlQueryBuilder); final Object result = physicalPlan.build(planBuilder); - final String persistanceQueryPrefix = + final String persistenceQueryPrefix = ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG); final String applicationId = getQueryApplicationId( getServiceId(), - persistanceQueryPrefix, + persistenceQueryPrefix, queryId ); final Map streamsProperties = buildStreamsProperties(applicationId, queryId); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java index 5a06ba2fb432..1c936bffad1a 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKGroupedStream.java @@ -24,16 +24,13 @@ import io.confluent.ksql.execution.streams.ExecutionStepFactory; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.metastore.model.KeyField; -import io.confluent.ksql.model.WindowType; import io.confluent.ksql.parser.tree.WindowExpression; import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.FormatInfo; import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; -import io.confluent.ksql.serde.WindowInfo; import io.confluent.ksql.util.KsqlConfig; -import java.time.Duration; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -117,15 +114,6 @@ public SchemaKTable aggregate( } private KeyFormat getKeyFormat(final WindowExpression windowExpression) { - if (ksqlConfig.getBoolean(KsqlConfig.KSQL_WINDOWED_SESSION_KEY_LEGACY_CONFIG)) { - return KeyFormat.windowed( - FormatInfo.of(Format.KAFKA), - WindowInfo.of( - WindowType.TUMBLING, - Optional.of(Duration.ofMillis(Long.MAX_VALUE)) - ) - ); - } return KeyFormat.windowed( FormatInfo.of(Format.KAFKA), windowExpression.getKsqlWindowExpression().getWindowInfo() diff --git a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java index dd35158adec5..3f6b19d245bd 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicCreateInjector.java @@ -126,7 +126,7 @@ private ConfiguredStatement injectForCreateSource( properties.getPartitions(), properties.getReplicas()); - createTopic(topicPropertiesBuilder, statement, createSource instanceof CreateTable); + createTopic(topicPropertiesBuilder, createSource instanceof CreateTable); return statement; } @@ -160,7 +160,7 @@ private ConfiguredStatement injectForCreateAsSelec final boolean shouldCompactTopic = createAsSelect instanceof CreateTableAsSelect && !createAsSelect.getQuery().getWindow().isPresent(); - final TopicProperties info = createTopic(topicPropertiesBuilder, statement, shouldCompactTopic); + final TopicProperties info = createTopic(topicPropertiesBuilder, shouldCompactTopic); final T withTopic = (T) createAsSelect.copyWith(properties.withTopic( info.getTopicName(), @@ -175,13 +175,9 @@ private ConfiguredStatement injectForCreateAsSelec private TopicProperties createTopic( final Builder topicPropertiesBuilder, - final ConfiguredStatement statement, final boolean shouldCompactTopic ) { - final TopicProperties info = topicPropertiesBuilder - .withOverrides(statement.getOverrides()) - .withKsqlConfig(statement.getConfig()) - .build(); + final TopicProperties info = topicPropertiesBuilder.build(); final Map config = shouldCompactTopic ? ImmutableMap.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicProperties.java b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicProperties.java index 33be4c764ba2..a137930b0a1d 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicProperties.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicProperties.java @@ -17,9 +17,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Suppliers; -import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.function.Supplier; @@ -117,34 +115,6 @@ Builder withWithClause( return this; } - Builder withOverrides(final Map overrides) { - final Integer partitions = parsePartitionsOverride( - overrides.get(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY)); - - final Short replicas = parseReplicasOverride( - overrides.get(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY)); - - fromOverrides = new TopicProperties(null, partitions, replicas); - return this; - } - - Builder withKsqlConfig(final KsqlConfig config) { - // requires check for containsKey because `getInt` will return 0 otherwise - Integer partitions = null; - if (config.values().containsKey(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY)) { - partitions = config.getInt(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY); - } - - // requires check for containsKey because `getShort` will return 0 otherwise - Short replicas = null; - if (config.values().containsKey(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY)) { - replicas = config.getShort(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY); - } - - fromKsqlConfig = new TopicProperties(null, partitions, replicas); - return this; - } - Builder withSource(final Supplier descriptionSupplier) { fromSource = Suppliers.memoize(() -> { final TopicDescription description = descriptionSupplier.get(); @@ -187,30 +157,4 @@ public TopicProperties build() { return new TopicProperties(name, partitions, replicas); } } - - private static Integer parsePartitionsOverride(final Object value) { - if (value instanceof Integer || value == null) { - return (Integer) value; - } - - try { - return Integer.parseInt(value.toString()); - } catch (final Exception e) { - throw new KsqlException("Failed to parse property override '" - + KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY + "': " + e.getMessage(), e); - } - } - - private static Short parseReplicasOverride(final Object value) { - if (value instanceof Short || value == null) { - return (Short) value; - } - - try { - return Short.parseShort(value.toString()); - } catch (final Exception e) { - throw new KsqlException("Failed to parse property override '" - + KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY + "': " + e.getMessage(), e); - } - } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java index ac3f24dd058b..ed8d7bd6b994 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java @@ -19,9 +19,6 @@ import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; -import com.google.common.collect.ImmutableMap; -import io.confluent.ksql.util.KsqlConfig; -import org.apache.kafka.common.config.ConfigException; import org.junit.Before; import org.junit.Test; @@ -69,171 +66,4 @@ public void shouldTruncateOutOfBoundIndexes() { assertThat(udf.substring("a test string", 3, -100), is("")); } - @Test(expected = NullPointerException.class) - public void shouldThrowNPEOnNullValueWithTwoArgs() { - // Given: - givenInLegacyMode(); - - // Then: - udf.substring(null, 0); - } - - @Test(expected = NullPointerException.class) - public void shouldThrowNPEOnNullValueWithThreeArgs() { - // Given: - givenInLegacyMode(); - - // Then: - udf.substring(null, 0, 0); - } - - @Test(expected = NullPointerException.class) - public void shouldThrowNPEOnNullStartIndexWithTwoArgs() { - // Given: - givenInLegacyMode(); - - // Then: - udf.substring("some-string", null); - } - - @Test(expected = NullPointerException.class) - public void shouldThrowNPEOnNullStartIndexWithThreeArgs() { - // Given: - givenInLegacyMode(); - - // Then: - udf.substring("some-string", null, 0); - } - - @Test(expected = NullPointerException.class) - public void shouldThrowNPEOnNullEndIndex() { - // Given: - givenInLegacyMode(); - - // Then: - udf.substring("some-string", 1, null); - } - - @Test - public void shouldUseZeroBasedIndexingIfInLegacyMode() { - // Given: - givenInLegacyMode(); - - // Then: - assertThat(udf.substring("a test string", 0, 1), is("a")); - } - - @Test(expected = StringIndexOutOfBoundsException.class) - public void shouldThrowInLegacyModeIfStartIndexIsNegative() { - // Given: - givenInLegacyMode(); - - // Then: - udf.substring("a test string", -1, 1); - } - - @Test - public void shouldExtractFromStartInLegacyMode() { - // Given: - givenInLegacyMode(); - - // Then: - assertThat(udf.substring("a test string", 2), is("test string")); - assertThat(udf.substring("a test string", 2, 6), is("test")); - } - - @Test(expected = StringIndexOutOfBoundsException.class) - public void shouldThrowInLegacyModeIfEndIndexIsLessThanStartIndex() { - // Given: - givenInLegacyMode(); - - // Then: - assertThat(udf.substring("a test string", 4, 2), is("st")); - } - - @Test(expected = StringIndexOutOfBoundsException.class) - public void shouldThrowInLegacyModeIfStartIndexOutOfBounds() { - // Given: - givenInLegacyMode(); - - // Then: - udf.substring("a test string", 100); - } - - @Test(expected = StringIndexOutOfBoundsException.class) - public void shouldThrowInLegacyModeIfEndIndexOutOfBounds() { - // Given: - givenInLegacyMode(); - - // Then: - udf.substring("a test string", 3, 100); - } - - @Test - public void shouldNotEnterLegacyModeIfConfigMissing() { - // When: - udf.configure(ImmutableMap.of()); - - // Then: - assertThat(udfIsInLegacyMode(), is(false)); - } - - @Test - public void shouldEnterLegacyModeWithTrueStringConfig() { - // When: - configure("true"); - - // Then: - assertThat(udfIsInLegacyMode(), is(true)); - } - - @Test - public void shouldEnterLegacyModeWithTrueBooleanConfig() { - // When: - configure(true); - - // Then: - assertThat(udfIsInLegacyMode(), is(true)); - } - - @Test - public void shouldNotEnterLegacyModeWithFalseStringConfig() { - // When: - configure("false"); - - // Then: - assertThat(udfIsInLegacyMode(), is(false)); - } - - @Test - public void shouldNotEnterLegacyModeWithFalseBooleanConfig() { - // When: - configure(false); - - // Then: - assertThat(udfIsInLegacyMode(), is(false)); - } - - @Test(expected = ConfigException.class) - public void shouldThrowOnInvalidLegacyModeValueType() { - configure(1.0); - } - - private boolean udfIsInLegacyMode() { - // In legacy mode an NPE is thrown on null args: - try { - udf.substring(null, null); - return false; - } catch (final NullPointerException e) { - return true; - } - } - - private void givenInLegacyMode() { - configure(true); - } - - private void configure(final Object legacyMode) { - udf.configure(ImmutableMap.of(KsqlConfig.KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG, legacyMode)); - } } \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java index 37eef8d08abf..9ff131dc4570 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java @@ -19,7 +19,6 @@ import static io.confluent.ksql.planner.plan.PlanTestUtil.TRANSFORM_NODE; import static io.confluent.ksql.planner.plan.PlanTestUtil.getNodeByName; import static io.confluent.ksql.util.LimitedProxyBuilder.methodParams; -import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; @@ -35,7 +34,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Streams; import io.confluent.ksql.GenericRow; @@ -68,7 +66,6 @@ import java.util.stream.Stream; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Grouped; @@ -215,47 +212,6 @@ public void shouldHaveSourceNodeForSecondSubtopolgyWithKsqlNameForRepartition() assertThat(node.topicSet(), containsInAnyOrder("Aggregate-groupby-repartition")); } - @Test - public void shouldHaveSourceNodeForSecondSubtopolgyWithDefaultNameForRepartition() { - buildRequireRekey( - new KsqlConfig( - ImmutableMap.of( - StreamsConfig.TOPOLOGY_OPTIMIZATION, - StreamsConfig.NO_OPTIMIZATION, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS_OFF) - ) - ); - final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName( - builder.build(), - "KSTREAM-SOURCE-0000000009"); - final List successors = node.successors().stream() - .map(TopologyDescription.Node::name) - .collect(Collectors.toList()); - assertThat(node.predecessors(), equalTo(Collections.emptySet())); - assertThat(successors, equalTo(Collections.singletonList("KSTREAM-AGGREGATE-0000000006"))); - assertThat( - node.topicSet(), - hasItem(containsString("KSTREAM-AGGREGATE-STATE-STORE-0000000005"))); - assertThat(node.topicSet(), hasItem(containsString("-repartition"))); - } - - @Test - public void shouldHaveDefaultNameForAggregationStateStoreIfInternalTopicNamingOff() { - build( - new KsqlConfig( - ImmutableMap.of( - StreamsConfig.TOPOLOGY_OPTIMIZATION, - StreamsConfig.NO_OPTIMIZATION, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS_OFF) - ) - ); - final TopologyDescription.Processor node = (TopologyDescription.Processor) getNodeByName( - builder.build(), "KSTREAM-AGGREGATE-0000000005"); - assertThat(node.stores(), hasItem(equalTo("KSTREAM-AGGREGATE-STATE-STORE-0000000004"))); - } - @Test public void shouldHaveKsqlNameForAggregationStateStore() { build(); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/streams/GroupedFactoryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/streams/GroupedFactoryTest.java index e72bf4183615..e551e6b74df5 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/streams/GroupedFactoryTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/streams/GroupedFactoryTest.java @@ -20,12 +20,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableMap; import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.streams.GroupedFactory; -import io.confluent.ksql.util.KsqlConfig; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Grouped; import org.junit.Test; import org.junit.runner.RunWith; @@ -47,41 +44,12 @@ public class GroupedFactoryTest { private Grouped grouped; @Test - public void shouldCreateGroupedCorrectlyWhenOptimizationsDisabled() { + public void shouldCreateGroupedCorrectlyWhenOptimizationsEnabled() { // Given: - final KsqlConfig ksqlConfig = new KsqlConfig( - ImmutableMap.of( - StreamsConfig.TOPOLOGY_OPTIMIZATION, - StreamsConfig.NO_OPTIMIZATION, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS_OFF) - ); - when(grouper.groupedWith(null, keySerde, rowSerde)).thenReturn(grouped); - - // When: - final Grouped returned = GroupedFactory.create(ksqlConfig, grouper).create( - OP_NAME, - keySerde, - rowSerde - ); - - // Then: - assertThat(returned, is(grouped)); - verify(grouper).groupedWith(null, keySerde, rowSerde); - } - - @Test - public void shouldCreateGroupedCorrectlyWhenOptimationsEnabled() { - // Given: - final KsqlConfig ksqlConfig = new KsqlConfig( - ImmutableMap.of( - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS_ON) - ); when(grouper.groupedWith(OP_NAME, keySerde, rowSerde)).thenReturn(grouped); // When: - final Grouped returned = GroupedFactory.create(ksqlConfig, grouper).create( + final Grouped returned = GroupedFactory.create(grouper).create( OP_NAME, keySerde, rowSerde diff --git a/ksql-engine/src/test/java/io/confluent/ksql/streams/JoinedFactoryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/streams/JoinedFactoryTest.java index 61c77ae94ee6..89a54dab1973 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/streams/JoinedFactoryTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/streams/JoinedFactoryTest.java @@ -20,12 +20,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableMap; import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.streams.JoinedFactory; -import io.confluent.ksql.util.KsqlConfig; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Joined; import org.junit.Test; import org.junit.runner.RunWith; @@ -48,41 +45,14 @@ public class JoinedFactoryTest { @Mock private Joined joined; - @Test - public void shouldCreateJoinedCorrectlyWhenOptimizationsDisabled() { - // Given: - final KsqlConfig ksqlConfig = new KsqlConfig( - ImmutableMap.of( - StreamsConfig.TOPOLOGY_OPTIMIZATION, - StreamsConfig.NO_OPTIMIZATION, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS_OFF) - ); - when(joiner.joinedWith(keySerde, leftSerde, rightSerde, null)).thenReturn(joined); - - // When: - final Joined returned - = JoinedFactory.create(ksqlConfig, joiner).create( - keySerde, leftSerde, rightSerde, OP_NAME); - - // Then: - assertThat(returned, is(joined)); - verify(joiner).joinedWith(keySerde, leftSerde, rightSerde, null); - } - @Test public void shouldCreateJoinedCorrectlyWhenOptimizationsEnabled() { // Given: - final KsqlConfig ksqlConfig = new KsqlConfig( - ImmutableMap.of( - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS_ON) - ); when(joiner.joinedWith(keySerde, leftSerde, rightSerde, OP_NAME)).thenReturn(joined); // When: final Joined returned - = JoinedFactory.create(ksqlConfig, joiner).create( + = JoinedFactory.create(joiner).create( keySerde, leftSerde, rightSerde, OP_NAME); // Then: diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java index d7ff35900ab2..4ebc3df36646 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java @@ -107,7 +107,6 @@ public void setUp() { functionRegistry ); when(windowExp.getKsqlWindowExpression()).thenReturn(KSQL_WINDOW_EXP); - when(config.getBoolean(KsqlConfig.KSQL_WINDOWED_SESSION_KEY_LEGACY_CONFIG)).thenReturn(false); } @Test diff --git a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java index 71d4b69be611..9dde5bddd3f0 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicCreateInjectorTest.java @@ -142,8 +142,6 @@ public void setUp() { when(topicClient.isTopicExists("source")).thenReturn(true); when(builder.withName(any())).thenReturn(builder); when(builder.withWithClause(any(), any(), any())).thenReturn(builder); - when(builder.withOverrides(any())).thenReturn(builder); - when(builder.withKsqlConfig(any())).thenReturn(builder); when(builder.withSource(any())).thenReturn(builder); when(builder.build()).thenReturn(new TopicProperties("name", 1, (short) 1)); } @@ -254,54 +252,6 @@ public void shouldPassThroughWithClauseToBuilderForCreate() { ); } - @Test - public void shouldPassThroughOverridesToBuilder() { - // Given: - givenStatement("CREATE STREAM x WITH (kafka_topic='topic') AS SELECT * FROM SOURCE;"); - - // When: - injector.inject(statement, builder); - - // Then: - verify(builder).withOverrides(overrides); - } - - @Test - public void shouldPassThroughOverridesToBuilderForCreate() { - // Given: - givenStatement("CREATE STREAM x (FOO VARCHAR) WITH(value_format='avro', kafka_topic='topic', partitions=2);"); - - // When: - injector.inject(statement, builder); - - // Then: - verify(builder).withOverrides(overrides); - } - - @Test - public void shouldPassThroughConfigToBuilder() { - // Given: - givenStatement("CREATE STREAM x WITH (kafka_topic='topic') AS SELECT * FROM SOURCE;"); - - // When: - injector.inject(statement, builder); - - // Then: - verify(builder).withKsqlConfig(config); - } - - @Test - public void shouldPassThroughConfigToBuilderForCreate() { - // Given: - givenStatement("CREATE STREAM x (FOO VARCHAR) WITH(value_format='avro', kafka_topic='topic', partitions=2);"); - - // When: - injector.inject(statement, builder); - - // Then: - verify(builder).withKsqlConfig(config); - } - @Test public void shouldNotUseSourceTopicForCreateMissingTopic() { // Given: diff --git a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java index 4400ff199e0c..25b224dbccdf 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicPropertiesTest.java @@ -15,19 +15,6 @@ package io.confluent.ksql.topic; -import static io.confluent.ksql.topic.TopicPropertiesTest.Inject.KSQL_CONFIG; -import static io.confluent.ksql.topic.TopicPropertiesTest.Inject.KSQL_CONFIG_P; -import static io.confluent.ksql.topic.TopicPropertiesTest.Inject.KSQL_CONFIG_R; -import static io.confluent.ksql.topic.TopicPropertiesTest.Inject.NO_CONFIG; -import static io.confluent.ksql.topic.TopicPropertiesTest.Inject.NO_OVERRIDES; -import static io.confluent.ksql.topic.TopicPropertiesTest.Inject.NO_WITH; -import static io.confluent.ksql.topic.TopicPropertiesTest.Inject.OVERRIDES; -import static io.confluent.ksql.topic.TopicPropertiesTest.Inject.OVERRIDES_P; -import static io.confluent.ksql.topic.TopicPropertiesTest.Inject.OVERRIDES_R; -import static io.confluent.ksql.topic.TopicPropertiesTest.Inject.SOURCE; -import static io.confluent.ksql.topic.TopicPropertiesTest.Inject.WITH; -import static io.confluent.ksql.topic.TopicPropertiesTest.Inject.WITH_P; -import static io.confluent.ksql.topic.TopicPropertiesTest.Inject.WITH_R; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -35,23 +22,10 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import io.confluent.ksql.topic.TopicProperties.Builder; -import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartitionInfo; @@ -60,501 +34,184 @@ import org.junit.experimental.runners.Enclosed; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; -@RunWith(Enclosed.class) public class TopicPropertiesTest { - public static class Tests { - - public @Rule ExpectedException expectedException = ExpectedException.none(); - - private final KsqlConfig config = new KsqlConfig(ImmutableMap.of( - KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, 1, - KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, (short) 1 - )); - - @Test - public void shouldUseNameFromWithClause() { - // When: - final TopicProperties properties = new TopicProperties.Builder() - .withWithClause( - Optional.of("name"), - Optional.empty(), - Optional.empty() - ) - .withKsqlConfig(config) - .build(); - - // Then: - assertThat(properties.getTopicName(), equalTo("name")); - } - - @Test - public void shouldUseNameFromWithClauseWhenNameIsAlsoPresent() { - // When: - final TopicProperties properties = new TopicProperties.Builder() - .withName("oh no!") - .withWithClause( - Optional.of("name"), - Optional.empty(), - Optional.empty() - ) - .withKsqlConfig(config) - .build(); - - // Then: - assertThat(properties.getTopicName(), equalTo("name")); - } - - @Test - public void shouldUseNameIfNoWIthClause() { - // When: - final TopicProperties properties = new TopicProperties.Builder() - .withName("name") - .withKsqlConfig(config) - .build(); - - // Then: - assertThat(properties.getTopicName(), equalTo("name")); - } - - @Test - public void shouldFailIfNoNameSupplied() { - // Expect: - expectedException.expect(NullPointerException.class); - expectedException.expectMessage("Was not supplied with any valid source for topic name!"); - - // When: - new TopicProperties.Builder() - .withKsqlConfig(config) - .build(); - } - - @Test - public void shouldFailIfEmptyNameSupplied() { - // Expect: - expectedException.expect(KsqlException.class); - expectedException.expectMessage("Must have non-empty topic name."); - - // When: - new TopicProperties.Builder() - .withName("") - .withKsqlConfig(config) - .build(); - } - - @Test - public void shouldFailIfNoPartitionsSupplied() { - // Given: - final KsqlConfig config = new KsqlConfig(ImmutableMap.of( - KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, (short) 1 - )); - - // Expect: - expectedException.expect(KsqlException.class); - expectedException.expectMessage("Cannot determine partitions for creating topic"); - - // When: - new TopicProperties.Builder() - .withName("name") - .withKsqlConfig(config) - .build(); - } - - @Test - public void shouldDefaultIfNoReplicasSupplied() { - // Given: - final KsqlConfig config = new KsqlConfig(ImmutableMap.of( - KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, 1 - )); - - // When: - final TopicProperties properties = new Builder() - .withName("name") - .withKsqlConfig(config) - .build(); - - // Then: - assertThat(properties.getReplicas(), is(TopicProperties.DEFAULT_REPLICAS)); - } - - @Test - public void shouldNotMakeRemoteCallIfUnnecessary() { - // When: - final TopicProperties properties = new TopicProperties.Builder() - .withWithClause( - Optional.of("name"), - Optional.of(1), - Optional.of((short) 1) - ) - .withKsqlConfig(config) - .withSource(() -> { - throw new RuntimeException(); - }) - .build(); - - // Then: - assertThat(properties.getPartitions(), equalTo(1)); - assertThat(properties.getReplicas(), equalTo((short) 1)); - } - - @SuppressWarnings("unchecked") - @Test - public void shouldNotMakeMultipleRemoteCalls() { - // Given: - final Supplier source = mock(Supplier.class); - when(source.get()) - .thenReturn( - new TopicDescription( - "", - false, - ImmutableList.of( - new TopicPartitionInfo( - 0, - null, - ImmutableList.of(new Node(1, "", 1)), - ImmutableList.of())))) - .thenThrow(); - - // When: - final TopicProperties properties = new TopicProperties.Builder() - .withName("name") - .withSource(source) - .build(); - - // Then: - assertThat(properties.getPartitions(), equalTo(1)); - assertThat(properties.getReplicas(), equalTo((short) 1)); - } - - @Test - public void shouldHandleStringOverrides() { - // Given: - final Map propertyOverrides = ImmutableMap.of( - KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, "1", - KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, "2" - ); - - // When: - final TopicProperties properties = new TopicProperties.Builder() - .withName("name") - .withOverrides(propertyOverrides) - .build(); - - // Then: - assertThat(properties.getPartitions(), is(1)); - assertThat(properties.getReplicas(), is((short)2)); - } - - @Test - public void shouldHandleNumberOverrides() { - // Given: - final Map propertyOverrides = ImmutableMap.of( - KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, 1, - KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, 2 - ); - - // When: - final TopicProperties properties = new TopicProperties.Builder() - .withName("name") - .withOverrides(propertyOverrides) - .build(); - - // Then: - assertThat(properties.getPartitions(), is(1)); - assertThat(properties.getReplicas(), is((short)2)); - } - - @Test - public void shouldThrowOnInvalidPartitionsOverride() { - // Given: - final Map propertyOverrides = ImmutableMap.of( - KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, "I ain't no number", - KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, "2" - ); - - // Then: - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Failed to parse property override 'ksql.sink.partitions': " - + "For input string: \"I ain't no number\""); - - // When: - new TopicProperties.Builder() - .withName("name") - .withOverrides(propertyOverrides) - .build(); - } - - @Test - public void shouldThrowOnInvalidReplicasOverride() { - // Given: - final Map propertyOverrides = ImmutableMap.of( - KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, "1", - KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, "I ain't no number" - ); - - // Then: - expectedException.expect(KsqlException.class); - expectedException.expectMessage( - "Failed to parse property override 'ksql.sink.replicas': " - + "For input string: \"I ain't no number\""); - - // When: - new TopicProperties.Builder() - .withName("name") - .withOverrides(propertyOverrides) - .build(); - } + public @Rule ExpectedException expectedException = ExpectedException.none(); + + @Test + public void shouldPreferWithClauseToSourceReplicas() { + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withWithClause(Optional.of("name"), Optional.empty(), Optional.of((short) 3)) + .withSource(() -> new TopicDescription( + "", + false, + ImmutableList.of( + new TopicPartitionInfo( + 0, new Node(0, "", 0), ImmutableList.of(new Node(0, "", 0)), ImmutableList.of())))) + .build(); + + // Then: + assertThat(properties.getReplicas(), is((short) 3)); + assertThat(properties.getPartitions(), is(1)); } - @RunWith(Parameterized.class) - public static class PartitionsAndReplicasPrecedence { - - @Parameters(name = "given {0} -> expect({2} partitions, {3} replicas)") - public static Iterable data() { - final Object[][] data = new Object[][]{ - // THIS LIST WAS GENERATED BY RUNNING Inject#main - // - // Given: Overrides Expect: [Partitions, Replicas ] - {new Inject[]{WITH, OVERRIDES, KSQL_CONFIG }, WITH , WITH }, - {new Inject[]{WITH, OVERRIDES, KSQL_CONFIG_P }, WITH , WITH }, - {new Inject[]{WITH, OVERRIDES, KSQL_CONFIG_R }, WITH , WITH }, - {new Inject[]{WITH, OVERRIDES, NO_CONFIG }, WITH , WITH }, - {new Inject[]{WITH, OVERRIDES_P, KSQL_CONFIG }, WITH , WITH }, - {new Inject[]{WITH, OVERRIDES_P, KSQL_CONFIG_P }, WITH , WITH }, - {new Inject[]{WITH, OVERRIDES_P, KSQL_CONFIG_R }, WITH , WITH }, - {new Inject[]{WITH, OVERRIDES_P, NO_CONFIG }, WITH , WITH }, - {new Inject[]{WITH, OVERRIDES_R, KSQL_CONFIG }, WITH , WITH }, - {new Inject[]{WITH, OVERRIDES_R, KSQL_CONFIG_P }, WITH , WITH }, - {new Inject[]{WITH, OVERRIDES_R, KSQL_CONFIG_R }, WITH , WITH }, - {new Inject[]{WITH, OVERRIDES_R, NO_CONFIG }, WITH , WITH }, - {new Inject[]{WITH, NO_OVERRIDES, KSQL_CONFIG }, WITH , WITH }, - {new Inject[]{WITH, NO_OVERRIDES, KSQL_CONFIG_P }, WITH , WITH }, - {new Inject[]{WITH, NO_OVERRIDES, KSQL_CONFIG_R }, WITH , WITH }, - {new Inject[]{WITH, NO_OVERRIDES, NO_CONFIG }, WITH , WITH }, - {new Inject[]{WITH_P, OVERRIDES, KSQL_CONFIG }, WITH_P , OVERRIDES }, - {new Inject[]{WITH_P, OVERRIDES, KSQL_CONFIG_P }, WITH_P , OVERRIDES }, - {new Inject[]{WITH_P, OVERRIDES, KSQL_CONFIG_R }, WITH_P , OVERRIDES }, - {new Inject[]{WITH_P, OVERRIDES, NO_CONFIG }, WITH_P , OVERRIDES }, - {new Inject[]{WITH_P, OVERRIDES_P, KSQL_CONFIG }, WITH_P , KSQL_CONFIG }, - {new Inject[]{WITH_P, OVERRIDES_P, KSQL_CONFIG_P }, WITH_P , SOURCE }, - {new Inject[]{WITH_P, OVERRIDES_P, KSQL_CONFIG_R }, WITH_P , KSQL_CONFIG_R }, - {new Inject[]{WITH_P, OVERRIDES_P, NO_CONFIG }, WITH_P , SOURCE }, - {new Inject[]{WITH_P, OVERRIDES_R, KSQL_CONFIG }, WITH_P , OVERRIDES_R }, - {new Inject[]{WITH_P, OVERRIDES_R, KSQL_CONFIG_P }, WITH_P , OVERRIDES_R }, - {new Inject[]{WITH_P, OVERRIDES_R, KSQL_CONFIG_R }, WITH_P , OVERRIDES_R }, - {new Inject[]{WITH_P, OVERRIDES_R, NO_CONFIG }, WITH_P , OVERRIDES_R }, - {new Inject[]{WITH_P, NO_OVERRIDES, KSQL_CONFIG }, WITH_P , KSQL_CONFIG }, - {new Inject[]{WITH_P, NO_OVERRIDES, KSQL_CONFIG_P }, WITH_P , SOURCE }, - {new Inject[]{WITH_P, NO_OVERRIDES, KSQL_CONFIG_R }, WITH_P , KSQL_CONFIG_R }, - {new Inject[]{WITH_P, NO_OVERRIDES, NO_CONFIG }, WITH_P , SOURCE }, - {new Inject[]{WITH_R, OVERRIDES, KSQL_CONFIG }, OVERRIDES , WITH_R }, - {new Inject[]{WITH_R, OVERRIDES, KSQL_CONFIG_P }, OVERRIDES , WITH_R }, - {new Inject[]{WITH_R, OVERRIDES, KSQL_CONFIG_R }, OVERRIDES , WITH_R }, - {new Inject[]{WITH_R, OVERRIDES, NO_CONFIG }, OVERRIDES , WITH_R }, - {new Inject[]{WITH_R, OVERRIDES_P, KSQL_CONFIG }, OVERRIDES_P , WITH_R }, - {new Inject[]{WITH_R, OVERRIDES_P, KSQL_CONFIG_P }, OVERRIDES_P , WITH_R }, - {new Inject[]{WITH_R, OVERRIDES_P, KSQL_CONFIG_R }, OVERRIDES_P , WITH_R }, - {new Inject[]{WITH_R, OVERRIDES_P, NO_CONFIG }, OVERRIDES_P , WITH_R }, - {new Inject[]{WITH_R, OVERRIDES_R, KSQL_CONFIG }, KSQL_CONFIG , WITH_R }, - {new Inject[]{WITH_R, OVERRIDES_R, KSQL_CONFIG_P }, KSQL_CONFIG_P , WITH_R }, - {new Inject[]{WITH_R, OVERRIDES_R, KSQL_CONFIG_R }, SOURCE , WITH_R }, - {new Inject[]{WITH_R, OVERRIDES_R, NO_CONFIG }, SOURCE , WITH_R }, - {new Inject[]{WITH_R, NO_OVERRIDES, KSQL_CONFIG }, KSQL_CONFIG , WITH_R }, - {new Inject[]{WITH_R, NO_OVERRIDES, KSQL_CONFIG_P }, KSQL_CONFIG_P , WITH_R }, - {new Inject[]{WITH_R, NO_OVERRIDES, KSQL_CONFIG_R }, SOURCE , WITH_R }, - {new Inject[]{WITH_R, NO_OVERRIDES, NO_CONFIG }, SOURCE , WITH_R }, - {new Inject[]{NO_WITH, OVERRIDES, KSQL_CONFIG }, OVERRIDES , OVERRIDES }, - {new Inject[]{NO_WITH, OVERRIDES, KSQL_CONFIG_P }, OVERRIDES , OVERRIDES }, - {new Inject[]{NO_WITH, OVERRIDES, KSQL_CONFIG_R }, OVERRIDES , OVERRIDES }, - {new Inject[]{NO_WITH, OVERRIDES, NO_CONFIG }, OVERRIDES , OVERRIDES }, - {new Inject[]{NO_WITH, OVERRIDES_P, KSQL_CONFIG }, OVERRIDES_P , KSQL_CONFIG }, - {new Inject[]{NO_WITH, OVERRIDES_P, KSQL_CONFIG_P }, OVERRIDES_P , SOURCE }, - {new Inject[]{NO_WITH, OVERRIDES_P, KSQL_CONFIG_R }, OVERRIDES_P , KSQL_CONFIG_R }, - {new Inject[]{NO_WITH, OVERRIDES_P, NO_CONFIG }, OVERRIDES_P , SOURCE }, - {new Inject[]{NO_WITH, OVERRIDES_R, KSQL_CONFIG }, KSQL_CONFIG , OVERRIDES_R }, - {new Inject[]{NO_WITH, OVERRIDES_R, KSQL_CONFIG_P }, KSQL_CONFIG_P , OVERRIDES_R }, - {new Inject[]{NO_WITH, OVERRIDES_R, KSQL_CONFIG_R }, SOURCE , OVERRIDES_R }, - {new Inject[]{NO_WITH, OVERRIDES_R, NO_CONFIG }, SOURCE , OVERRIDES_R }, - {new Inject[]{NO_WITH, NO_OVERRIDES, KSQL_CONFIG }, KSQL_CONFIG , KSQL_CONFIG }, - {new Inject[]{NO_WITH, NO_OVERRIDES, KSQL_CONFIG_P }, KSQL_CONFIG_P , SOURCE }, - {new Inject[]{NO_WITH, NO_OVERRIDES, KSQL_CONFIG_R }, SOURCE , KSQL_CONFIG_R }, - {new Inject[]{NO_WITH, NO_OVERRIDES, NO_CONFIG }, SOURCE , SOURCE }, - }; - - // generate the description from the given injections and put it at the beginning - return Lists.newArrayList(data) - .stream() - .map(params -> Lists.asList( - Arrays.stream((Inject[]) params[0]) - .map(Objects::toString) - .collect(Collectors.joining(", ")), - params)) - .map(List::toArray) - .collect(Collectors.toList()); - } - - @Parameter - public String description; - - @Parameter(1) - public Inject[] injects; - - @Parameter(2) - public Inject expectedPartitions; - - @Parameter(3) - public Inject expectedReplicas; - - private KsqlConfig ksqlConfig = new KsqlConfig(new HashMap<>()); - private final Map propertyOverrides = new HashMap<>(); - private Optional withClausePartitionCount = Optional.empty(); - private Optional withClauseReplicationFactor = Optional.empty(); - - @Test - public void shouldInferCorrectPartitionsAndReplicas() { - // Given: - Arrays.stream(injects).forEach(this::givenInject); - - // When: - final TopicProperties properties = new TopicProperties.Builder() - .withName("name") - .withWithClause( - Optional.empty(), - withClausePartitionCount, - withClauseReplicationFactor - ) - .withOverrides(propertyOverrides) - .withKsqlConfig(ksqlConfig) - .withSource(() -> source(SOURCE)) - .build(); - - // Then: - assertThat(properties.getPartitions(), equalTo(expectedPartitions.partitions.orElse(null))); - assertThat(properties.getReplicas(), equalTo(expectedReplicas.replicas.orElse(null))); - } - - private void givenInject(final Inject inject) { - switch (inject.type) { - case WITH: - withClausePartitionCount = inject.partitions; - withClauseReplicationFactor = inject.replicas; - break; - case OVERRIDES: - inject.partitions.ifPresent(partitions -> - propertyOverrides.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, partitions) - ); - - inject.replicas.ifPresent(replicas -> - propertyOverrides.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, replicas) - ); - break; - case KSQL_CONFIG: - final Map cfg = new HashMap<>(); - inject.partitions.ifPresent(partitions -> - cfg.put(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, partitions) - ); - - inject.replicas.ifPresent(replicas -> - cfg.put(KsqlConfig.SINK_NUMBER_OF_REPLICAS_PROPERTY, replicas) - ); - ksqlConfig = new KsqlConfig(cfg); - break; - case SOURCE: - default: - throw new IllegalArgumentException(inject.type.toString()); - } - } - - public static TopicDescription source(final Inject source) { - return new TopicDescription( - "source", - false, - Collections.nCopies(source.partitions.get(), - new TopicPartitionInfo( - 0, - null, - Collections.nCopies(source.replicas.get(), new Node(0, "", 0)), - ImmutableList.of()) - ) - ); - } + @Test + public void shouldPreferWithClauseToSourcePartitions() { + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withWithClause(Optional.of("name"), Optional.of(3), Optional.empty()) + .withSource(() -> new TopicDescription( + "", + false, + ImmutableList.of( + new TopicPartitionInfo( + 0, new Node(0, "", 0), ImmutableList.of(new Node(0, "", 0)), ImmutableList.of())))) + .build(); + + // Then: + assertThat(properties.getReplicas(), is((short) 1)); + assertThat(properties.getPartitions(), is(3)); } - enum Inject { - SOURCE(Type.SOURCE, 1, (short) 1), - SOURCE2(Type.SOURCE, 12, (short) 12), - - WITH(Type.WITH, 2, (short) 2), - OVERRIDES(Type.OVERRIDES, 3, (short) 3), - KSQL_CONFIG(Type.KSQL_CONFIG, 4, (short) 4), + @Test + public void shouldUseNameFromWithClause() { + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withWithClause( + Optional.of("name"), + Optional.of(1), + Optional.empty() + ) + .build(); + + // Then: + assertThat(properties.getTopicName(), equalTo("name")); + } - WITH_P(Type.WITH, 5, null), - OVERRIDES_P(Type.OVERRIDES, 6, null), - KSQL_CONFIG_P(Type.KSQL_CONFIG, 7, null), + @Test + public void shouldUseNameFromWithClauseWhenNameIsAlsoPresent() { + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withName("oh no!") + .withWithClause( + Optional.of("name"), + Optional.of(1), + Optional.empty() + ) + .build(); + + // Then: + assertThat(properties.getTopicName(), equalTo("name")); + } - WITH_R(Type.WITH, null, (short) 8), - OVERRIDES_R(Type.OVERRIDES, null, (short) 9), - KSQL_CONFIG_R(Type.KSQL_CONFIG, null, (short) 10), + @Test + public void shouldUseNameIfNoWIthClause() { + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withName("name") + .withWithClause(Optional.empty(), Optional.of(1), Optional.empty()) + .build(); - NO_WITH(Type.WITH, null, null), - NO_OVERRIDES(Type.OVERRIDES, null, null), - NO_CONFIG(Type.KSQL_CONFIG, null, null) - ; + // Then: + assertThat(properties.getTopicName(), equalTo("name")); + } - final Type type; - final Optional partitions; - final Optional replicas; + @Test + public void shouldFailIfNoNameSupplied() { + // Expect: + expectedException.expect(NullPointerException.class); + expectedException.expectMessage("Was not supplied with any valid source for topic name!"); - Inject(final Type type, final Integer partitions, final Short replicas) { - this.type = type; - this.partitions = Optional.ofNullable(partitions); - this.replicas = Optional.ofNullable(replicas); - } + // When: + new TopicProperties.Builder() + .build(); + } - enum Type { - WITH, - OVERRIDES, - KSQL_CONFIG, - SOURCE - } + @Test + public void shouldFailIfEmptyNameSupplied() { + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Must have non-empty topic name."); - /** - * Generates code for all combinations of Injects - */ - public static void main(String[] args) { - final List withs = EnumSet.allOf(Inject.class) - .stream().filter(i -> i.type == Type.WITH).collect(Collectors.toList()); - final List overrides = EnumSet.allOf(Inject.class) - .stream().filter(i -> i.type == Type.OVERRIDES).collect(Collectors.toList()); - final List ksqlConfigs = EnumSet.allOf(Inject.class) - .stream().filter(i -> i.type == Type.KSQL_CONFIG).collect(Collectors.toList()); + // When: + new TopicProperties.Builder() + .withName("") + .build(); + } - for (List injects : Lists.cartesianProduct(withs, overrides, ksqlConfigs)) { - // sort by precedence order - injects = new ArrayList<>(injects); - injects.sort(Comparator.comparing(i -> i.type)); + @Test + public void shouldFailIfNoPartitionsSupplied() { + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Cannot determine partitions for creating topic"); + + // When: + new TopicProperties.Builder() + .withName("name") + .withWithClause(Optional.empty(), Optional.empty(), Optional.of((short) 1)) + .build(); + } - final Inject expectedPartitions = injects.stream() - .filter(i -> i.partitions.isPresent()) - .findFirst() - .orElse(Inject.SOURCE); + @Test + public void shouldDefaultIfNoReplicasSupplied() { + // Given: + // When: + final TopicProperties properties = new Builder() + .withName("name") + .withWithClause(Optional.empty(), Optional.of(1), Optional.empty()) + .build(); + + // Then: + assertThat(properties.getReplicas(), is(TopicProperties.DEFAULT_REPLICAS)); + } - final Inject expectedReplicas = injects.stream() - .filter(i -> i.replicas.isPresent()) - .findFirst() - .orElse(Inject.SOURCE); + @Test + public void shouldNotMakeRemoteCallIfUnnecessary() { + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withWithClause( + Optional.of("name"), + Optional.of(1), + Optional.of((short) 1) + ) + .withSource(() -> { + throw new RuntimeException(); + }) + .build(); + + // Then: + assertThat(properties.getPartitions(), equalTo(1)); + assertThat(properties.getReplicas(), equalTo((short) 1)); + } - System.out.println(String.format("{new Inject[]{%-38s}, %-15s, %-15s},", - injects.stream().map(Objects::toString).collect(Collectors.joining(", ")), - expectedPartitions, - expectedReplicas) - ); - } - } + @SuppressWarnings("unchecked") + @Test + public void shouldNotMakeMultipleRemoteCalls() { + // Given: + final Supplier source = mock(Supplier.class); + when(source.get()) + .thenReturn( + new TopicDescription( + "", + false, + ImmutableList.of( + new TopicPartitionInfo( + 0, + null, + ImmutableList.of(new Node(1, "", 1)), + ImmutableList.of())))) + .thenThrow(); + + // When: + final TopicProperties properties = new TopicProperties.Builder() + .withName("name") + .withSource(source) + .build(); + + // Then: + assertThat(properties.getPartitions(), equalTo(1)); + assertThat(properties.getReplicas(), equalTo((short) 1)); } } diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java index f4d784c58f0e..7c6b36a4d133 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java @@ -80,9 +80,6 @@ public class TestExecutor implements Closeable { .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0) .put(KsqlConfig.KSQL_SERVICE_ID_CONFIG, "some.ksql.service.id") - .put( - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS_ON) .put(StreamsConfig.TOPOLOGY_OPTIMIZATION, "all") .build(); @@ -395,13 +392,10 @@ private static void processRecordsForTopic( final StubKafkaService stubKafkaService, final SchemaRegistryClient schemaRegistryClient ) { - final boolean legacySessionWindow = isLegacySessionWindow(testCase.properties()); - while (true) { final ProducerRecord producerRecord = topologyTestDriver.readOutput( sinkTopic.getName(), - sinkTopic.getKeyDeserializer(schemaRegistryClient, - legacySessionWindow), + sinkTopic.getKeyDeserializer(schemaRegistryClient), sinkTopic.getValueDeserializer(schemaRegistryClient) ); if (producerRecord == null) { @@ -417,11 +411,6 @@ private static void processRecordsForTopic( } } - private static boolean isLegacySessionWindow(final Map properties) { - final Object config = properties.get(KsqlConfig.KSQL_WINDOWED_SESSION_KEY_LEGACY_CONFIG); - return config != null && Boolean.parseBoolean(config.toString()); - } - private static Object getKey(final StubKafkaRecord stubKafkaRecord) { return stubKafkaRecord.getProducerRecord() == null ? stubKafkaRecord.getTestRecord().key() diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/Topic.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/Topic.java index 4b3464e159de..9d3781b19f2c 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/Topic.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/Topic.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; @SuppressWarnings("rawtypes") @@ -107,27 +106,15 @@ public Serializer getKeySerializer(final SchemaRegistryClient schemaRegistryClie return serializer; } - public Deserializer getKeyDeserializer( - final SchemaRegistryClient schemaRegistryClient, - final boolean isLegacySessionWindow) { - final Deserializer deserializer = createKeyDeserializer( - schemaRegistryClient, - isLegacySessionWindow); + public Deserializer getKeyDeserializer(final SchemaRegistryClient schemaRegistryClient) { + final Deserializer deserializer = createKeyDeserializer(schemaRegistryClient); deserializer.configure(ImmutableMap.of(), true); return deserializer; } - private Deserializer createKeyDeserializer( - final SchemaRegistryClient schemaRegistryClient, - final boolean isLegacySessionWindow) { + private Deserializer createKeyDeserializer(final SchemaRegistryClient schemaRegistryClient) { final Deserializer deserializer = keySerdeFactory.getDeserializer(schemaRegistryClient); - if (deserializer instanceof SessionWindowedDeserializer) { - if (!isLegacySessionWindow) { - return deserializer; - } else { - return new TimeWindowedDeserializer<>(new StringDeserializer(), Long.MAX_VALUE); - } - } else if (!(deserializer instanceof TimeWindowedDeserializer)) { + if (!(deserializer instanceof TimeWindowedDeserializer)) { return deserializer; } diff --git a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java index 785b805c8314..b200cf6e5c26 100644 --- a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java +++ b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java @@ -260,7 +260,7 @@ private Optional sendQueryStatement( private void verifyOutput(final RestTestCase testCase) { testCase.getOutputsByTopic().forEach((topic, records) -> { final Deserializer keyDeserializer = - topic.getKeyDeserializer(serviceContext.getSchemaRegistryClient(), false); + topic.getKeyDeserializer(serviceContext.getSchemaRegistryClient()); final Deserializer valueDeserializer = topic.getValueDeserializer(serviceContext.getSchemaRegistryClient()); diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/session-windows.json b/ksql-functional-tests/src/test/resources/query-validation-tests/session-windows.json index db682d7b4c87..a5a2e2980ff2 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/session-windows.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/session-windows.json @@ -23,9 +23,6 @@ {"topic": "S2", "key": 1, "value": null, "timestamp": 10000, "window": {"start": 10000, "end": 10000, "type": "session"}}, {"topic": "S2", "key": 1, "value": "1,200", "timestamp": 40000, "window": {"start": 10000, "end": 40000, "type": "session"}} ], - "properties": { - "ksql.windowed.session.key.legacy": "false" - }, "post": { "sources": [ { @@ -78,35 +75,6 @@ {"topic": "S2", "key": 1, "value": "1,200,1 : Window{start=10000 end=40000}", "timestamp": 40000, "window": {"start": 10000, "end": 40000, "type": "session"}} ] }, - { - "name": "legacy session", - "comment": [ - "in version 5.1 and before the key of session windowed data was actually a TimeWindow, ", - "i.e. it serialized with no end time. This was fixed in later versions, but this breaks ", - "compatibility with data running in older queries." - ], - "properties": { - "ksql.windowed.session.key.legacy": "true" - }, - "statements": [ - "CREATE STREAM TEST (ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", - "CREATE TABLE S2 as SELECT id, max(value) FROM test WINDOW SESSION (30 SECONDS) group by id;" - ], - "inputs": [ - {"topic": "test_topic", "key": 0, "value": "0,zero,0", "timestamp": 0}, - {"topic": "test_topic", "key": 0, "value": "0,100,5", "timestamp": 10000}, - {"topic": "test_topic", "key": 1, "value": "1,100,100", "timestamp": 10000}, - {"topic": "test_topic", "key": 1, "value": "1,100,200", "timestamp": 40000} - ], - "outputs": [ - {"topic": "S2", "key": 0, "value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 9223372036854775807, "type": "time"}}, - {"topic": "S2", "key": 0, "value": null, "timestamp": 0, "window": {"start": 0, "end": 9223372036854775807, "type": "time"}}, - {"topic": "S2", "key": 0, "value": "0,5", "timestamp": 10000, "window": {"start": 0, "end": 9223372036854775807, "type": "time"}}, - {"topic": "S2", "key": 1, "value": "1,100", "timestamp": 10000, "window": {"start": 10000, "end": 9223372036854775807, "type": "time"}}, - {"topic": "S2", "key": 1, "value": null, "timestamp": 10000, "window": {"start": 10000, "end": 9223372036854775807, "type": "time"}}, - {"topic": "S2", "key": 1, "value": "1,200", "timestamp": 40000, "window": {"start": 10000, "end": 9223372036854775807, "type": "time"}} - ] - }, { "name": "inherit windowed keys", "statements": [ @@ -122,9 +90,6 @@ {"topic": "S3", "key": 0, "value": "0,0", "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}}, {"topic": "S3", "key": 0, "value": null, "timestamp": 0, "window": {"start": 0, "end": 0, "type": "session"}} ], - "properties": { - "ksql.windowed.session.key.legacy": "false" - }, "post": { "sources": [ { diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/substring.json b/ksql-functional-tests/src/test/resources/query-validation-tests/substring.json index 94e5e94ac602..8d4ae1e6b813 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/substring.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/substring.json @@ -9,9 +9,6 @@ { "name": "do substring with just pos", "format": ["JSON"], - "properties": { - "ksql.functions.substring.legacy.args": false - }, "statements": [ "CREATE STREAM TEST (source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT SUBSTRING(source, 6) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS FROM TEST;" @@ -30,9 +27,7 @@ { "name": "do substring with pos and length", "format": ["JSON"], - "properties": { - "ksql.functions.substring.legacy.args": false - },"statements": [ + "statements": [ "CREATE STREAM TEST (source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" ], @@ -47,48 +42,6 @@ {"topic": "OUTPUT", "value": {"SUBSTRING":"", "NULL_STR":null, "NULL_POS":null, "NULL_LEN":null}} ] }, - { - "name": "do substring in legacy mode with just start index", - "format": ["JSON"], - "properties": { - "ksql.functions.substring.legacy.args": true - }, - "statements": [ - "CREATE STREAM TEST (source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT SUBSTRING(source, 5) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_START FROM TEST;" - ], - "inputs": [ - {"topic": "test_topic", "value": {"source": "some_string"}}, - {"topic": "test_topic", "value": {"source": "another"}}, - {"topic": "test_topic", "value": {"source": "short"}} - ], - "outputs": [ - {"topic": "OUTPUT", "value": {"SUBSTRING":"string", "NULL_STR":null, "NULL_START":null}}, - {"topic": "OUTPUT", "value": {"SUBSTRING":"er", "NULL_STR":null, "NULL_START":null}}, - {"topic": "OUTPUT", "value": {"SUBSTRING":"", "NULL_STR":null, "NULL_START":null}} - ] - }, - { - "name": "do substring in legacy mode with start and end indexes", - "format": ["JSON"], - "properties": { - "ksql.functions.substring.legacy.args": true - }, - "statements": [ - "CREATE STREAM TEST (source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", - "CREATE STREAM OUTPUT AS SELECT SUBSTRING(source, 1, 6) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_START, SUBSTRING(source, 6, null) AS NULL_END FROM TEST;" - ], - "inputs": [ - {"topic": "test_topic", "value": {"source": "some_string"}}, - {"topic": "test_topic", "value": {"source": "another"}}, - {"topic": "test_topic", "value": {"source": "short"}} - ], - "outputs": [ - {"topic": "OUTPUT", "value": {"SUBSTRING":"ome_s", "NULL_STR":null, "NULL_START":null, "NULL_END":null}}, - {"topic": "OUTPUT", "value": {"SUBSTRING":"nothe", "NULL_STR":null, "NULL_START":null, "NULL_END":null}}, - {"topic": "OUTPUT", "value": {"SUBSTRING":null, "NULL_STR":null, "NULL_START":null, "NULL_END":null}} - ] - }, { "name": "should default to current mode for new queries", "format": ["JSON"], diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java index a3fa31e69f9e..241a87e57e43 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.java @@ -333,11 +333,6 @@ private PersistentQueryMetadata startQuery( ) { final KsqlConfig mergedConfig = buildMergedConfig(command); - if (QueryCapacityUtil.exceedsPersistentQueryCapacity(ksqlEngine, mergedConfig,1)) { - QueryCapacityUtil.throwTooManyActivePersistentQueriesException( - ksqlEngine, mergedConfig, statement.getStatementText()); - } - final ConfiguredStatement configured = ConfiguredStatement.of( statement, command.getOverwriteProperties(), mergedConfig); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java index da3aee586aa8..c53ae02a07d6 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/InteractiveStatementExecutorTest.java @@ -81,6 +81,7 @@ import java.util.concurrent.TimeUnit; import kafka.zookeeper.ZooKeeperClientException; import org.apache.kafka.streams.StreamsConfig; +import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.easymock.IArgumentMatcher; import org.easymock.Mock; @@ -279,7 +280,6 @@ public void shouldBuildQueriesWithPersistedConfig() { csasStatement, emptyMap(), expectedConfig); expect(mockParser.parseSingleStatement(statementText)).andReturn(csasStatement); - expect(mockEngine.getPersistentQueries()).andReturn(ImmutableList.of()); final KsqlPlan plan = Mockito.mock(KsqlPlan.class); expect(mockEngine.plan(eq(serviceContext), eq(configuredCsas))).andReturn(plan); expect(mockEngine.execute(eq(serviceContext), eqConfigured(plan))) @@ -489,7 +489,6 @@ private PersistentQueryMetadata mockReplayCSAS( expect(mockParser.parseSingleStatement(statement)) .andReturn(csas); expect(mockMetaStore.getSource(SourceName.of(name))).andStubReturn(null); - expect(mockEngine.getPersistentQueries()).andReturn(ImmutableList.of()); expect(mockEngine.plan(eq(serviceContext), eqConfigured(csas))) .andReturn(mockPlan); expect(mockEngine.execute(eq(serviceContext), eqConfigured(mockPlan))) @@ -634,55 +633,6 @@ public void shouldNotCascadeDropStreamCommand() { verify(mockParser, mockEngine, mockMetaStore); } - @Test - public void shouldFailCreateAsSelectIfExceedActivePersistentQueriesLimit() { - // Given: - createStreamsAndStartTwoPersistentQueries(); - // Prepare to try adding a third - final KsqlConfig cmdConfig = - givenCommandConfig(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG, 2); - final Command csasCommand = - givenCommand("CREATE STREAM user2pv AS select * from pageview;", cmdConfig); - final CommandId csasCommandId = - new CommandId(CommandId.Type.STREAM, "_CSASGen2", CommandId.Action.CREATE); - - // When: - handleStatement(csasCommand, csasCommandId, Optional.empty(), 2); - - // Then: - final CommandStatus commandStatus = getCommandStatus(csasCommandId); - assertThat("CSAS statement should fail since exceeds limit of 2 active persistent queries", - commandStatus.getStatus(), is(CommandStatus.Status.ERROR)); - assertThat( - commandStatus.getMessage(), - containsString("would cause the number of active, persistent queries " - + "to exceed the configured limit")); - } - - @Test - public void shouldFailInsertIntoIfExceedActivePersistentQueriesLimit() { - // Given: - createStreamsAndStartTwoPersistentQueries(); - // Set limit and prepare to try adding a query that exceeds the limit - final KsqlConfig cmdConfig = - givenCommandConfig(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG, 1); - final Command insertIntoCommand = - givenCommand("INSERT INTO user1pv select * from pageview;", cmdConfig); - final CommandId insertIntoCommandId = - new CommandId(CommandId.Type.STREAM, "_InsertQuery1", CommandId.Action.CREATE); - - // When: - handleStatement(insertIntoCommand, insertIntoCommandId, Optional.empty(), 2); - - // Then: statement should fail since exceeds limit of 1 active persistent query - final CommandStatus commandStatus = getCommandStatus(insertIntoCommandId); - assertThat(commandStatus.getStatus(), is(CommandStatus.Status.ERROR)); - assertThat( - commandStatus.getMessage(), - containsString("would cause the number of active, persistent queries " - + "to exceed the configured limit")); - } - @Test public void shouldHandleLegacyRunScriptCommand() { // Given: diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java index e7eeb1174d4e..a9fad3d77ba9 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java @@ -49,13 +49,12 @@ public class AvroDataTranslator implements DataTranslator { AvroDataTranslator( final Schema schema, - final String schemaFullName, - final boolean useNamedMaps + final String schemaFullName ) { this.ksqlSchema = throwOnInvalidSchema(Objects.requireNonNull(schema, "schema")); this.avroCompatibleSchema = AvroSchemas.getAvroCompatibleConnectSchema( - schema, schemaFullName, useNamedMaps + schema, schemaFullName ); this.innerTranslator = new ConnectDataTranslator(avroCompatibleSchema); diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSchemas.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSchemas.java index 3bdf752f7d74..b1124dbbfb07 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSchemas.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSchemas.java @@ -35,19 +35,18 @@ public static org.apache.avro.Schema getAvroSchema( final PersistenceSchema schema, final String name, final KsqlConfig ksqlConfig) { - final boolean useNamedMaps = ksqlConfig.getBoolean(KsqlConfig.KSQL_USE_NAMED_AVRO_MAPS); return new AvroData(0).fromConnectSchema( - getAvroCompatibleConnectSchema(schema.serializedSchema(), name, useNamedMaps) + getAvroCompatibleConnectSchema(schema.serializedSchema(), name) ); } public static Schema getAvroCompatibleConnectSchema( final Schema schema, - final String schemaFullName, - final boolean useNamedMaps) { + final String schemaFullName + ) { return buildAvroCompatibleSchema( schema, - new Context(Collections.singleton(schemaFullName), useNamedMaps, true) + new Context(Collections.singleton(schemaFullName), true) ); } @@ -58,21 +57,18 @@ private static final class Context { static final String MAP_VALUE_NAME = "MapValue"; private final Iterable names; - private final boolean useNamedMaps; private boolean root; private Context( final Iterable names, - final boolean useNamedMaps, final boolean root ) { this.names = requireNonNull(names, "names"); - this.useNamedMaps = useNamedMaps; this.root = root; } Context with(final String name) { - return new Context(Iterables.concat(names, ImmutableList.of(name)), useNamedMaps, root); + return new Context(Iterables.concat(names, ImmutableList.of(name)), root); } public String name() { @@ -138,10 +134,7 @@ private static SchemaBuilder buildAvroCompatibleMap( valueSchema ); - if (context.useNamedMaps) { - schemaBuilder.name(context.name()); - } - + schemaBuilder.name(context.name()); return schemaBuilder; } diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/KsqlAvroSerdeFactory.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/KsqlAvroSerdeFactory.java index bb9662ab2961..ebffc631de0c 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/KsqlAvroSerdeFactory.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/KsqlAvroSerdeFactory.java @@ -130,9 +130,7 @@ private AvroDataTranslator createAvroTranslator( final PersistenceSchema schema, final KsqlConfig ksqlConfig ) { - final boolean useNamedMaps = ksqlConfig.getBoolean(KsqlConfig.KSQL_USE_NAMED_AVRO_MAPS); - - return new AvroDataTranslator(schema.serializedSchema(), fullSchemaName, useNamedMaps); + return new AvroDataTranslator(schema.serializedSchema(), fullSchemaName); } private static AvroConverter getAvroConverter( diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/avro/AvroDataTranslatorTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/avro/AvroDataTranslatorTest.java index 8d650412e63b..b836991a539c 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/avro/AvroDataTranslatorTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/avro/AvroDataTranslatorTest.java @@ -40,8 +40,8 @@ public void shoudRenameSourceDereference() { final AvroDataTranslator dataTranslator = new AvroDataTranslator( schema, - KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME, - true); + KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME + ); final Struct ksqlRow = new Struct(schema) .put("STREAM_NAME.COLUMN_NAME", 123); @@ -98,8 +98,8 @@ public void shouldAddNamesToSchema() { final AvroDataTranslator dataTranslator = new AvroDataTranslator( schema, - KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME, - true); + KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME + ); final Struct ksqlRow = new Struct(schema) .put("ARRAY", ImmutableList.of(arrayInnerStruct)) @@ -168,8 +168,8 @@ public void shouldReplaceNullWithNull() { final AvroDataTranslator dataTranslator = new AvroDataTranslator( schema, - KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME, - true); + KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME + ); final Struct ksqlRow = new Struct(schema); @@ -192,8 +192,8 @@ public void shoudlReplacePrimitivesCorrectly() { final AvroDataTranslator dataTranslator = new AvroDataTranslator( schema, - KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME, - true); + KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME + ); final Struct ksqlRow = new Struct(schema) .put("COLUMN_NAME", 123L); @@ -217,7 +217,7 @@ public void shouldUseExplicitSchemaName() { String schemaFullName = "com.custom.schema"; - final AvroDataTranslator dataTranslator = new AvroDataTranslator(schema, schemaFullName, true); + final AvroDataTranslator dataTranslator = new AvroDataTranslator(schema, schemaFullName); final Struct ksqlRow = new Struct(schema) .put("COLUMN_NAME", 123L); @@ -235,7 +235,7 @@ public void shouldDropOptionalFromRootPrimitiveSchema() { // When: final AvroDataTranslator translator = - new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME, true); + new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME); // Then: assertThat("Root required", translator.getAvroCompatibleSchema().isOptional(), is(false)); @@ -251,7 +251,7 @@ public void shouldDropOptionalFromRootArraySchema() { // When: final AvroDataTranslator translator = - new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME, true); + new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME); // Then: assertThat("Root required", translator.getAvroCompatibleSchema().isOptional(), is(false)); @@ -267,7 +267,7 @@ public void shouldDropOptionalFromRootMapSchema() { // When: final AvroDataTranslator translator = - new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME, true); + new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME); // Then: assertThat("Root required", translator.getAvroCompatibleSchema().isOptional(), is(false)); @@ -284,7 +284,7 @@ public void shouldDropOptionalFromRootStructSchema() { // When: final AvroDataTranslator translator = - new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME, true); + new AvroDataTranslator(schema, KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME); // Then: assertThat("Root required", translator.getAvroCompatibleSchema().isOptional(), is(false)); diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/avro/KsqlAvroSerializerTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/avro/KsqlAvroSerializerTest.java index 68ed8450994c..9022098e8166 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/avro/KsqlAvroSerializerTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/avro/KsqlAvroSerializerTest.java @@ -856,18 +856,6 @@ public void shouldSerializeMapFieldWithName() { shouldSerializeMap(avroSchema); } - @Test - public void shouldSerializeMapFieldWithoutNameIfDisabled() { - ksqlConfig = new KsqlConfig(ImmutableMap.of( - KsqlConfig.KSQL_USE_NAMED_AVRO_MAPS, false - )); - - final org.apache.avro.Schema avroSchema = - AvroTestUtil.connectOptionalKeyMapSchema(legacyMapEntrySchema()); - - shouldSerializeMap(avroSchema); - } - @Test public void shouldSerializeMultipleMapFields() { final org.apache.avro.Schema avroInnerSchema0 diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/GroupedFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/GroupedFactory.java index 7c72e0c38ca6..e860593ca4e4 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/GroupedFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/GroupedFactory.java @@ -15,30 +15,18 @@ package io.confluent.ksql.execution.streams; -import io.confluent.ksql.util.KsqlConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.Grouped; public interface GroupedFactory { Grouped create(String name, Serde keySerde, Serde valSerde); - static GroupedFactory create(final KsqlConfig ksqlConfig) { - return create(ksqlConfig, Grouped::with); + static GroupedFactory create() { + return create(Grouped::with); } - static GroupedFactory create(final KsqlConfig ksqlConfig, final Grouper grouper) { - if (StreamsUtil.useProvidedName(ksqlConfig)) { - return grouper::groupedWith; - } - return new GroupedFactory() { - @Override - public Grouped create( - final String name, - final Serde keySerde, - final Serde valSerde) { - return grouper.groupedWith(null, keySerde, valSerde); - } - }; + static GroupedFactory create(final Grouper grouper) { + return grouper::groupedWith; } @FunctionalInterface diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/JoinedFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/JoinedFactory.java index d3325401f215..69c7064e231c 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/JoinedFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/JoinedFactory.java @@ -15,7 +15,6 @@ package io.confluent.ksql.execution.streams; -import io.confluent.ksql.util.KsqlConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.Joined; @@ -26,24 +25,12 @@ Joined create( Serde rightSerde, String name); - static JoinedFactory create(final KsqlConfig ksqlConfig) { - return create(ksqlConfig, Joined::with); + static JoinedFactory create() { + return create(Joined::with); } - static JoinedFactory create(final KsqlConfig ksqlConfig, final Joiner joiner) { - if (StreamsUtil.useProvidedName(ksqlConfig)) { - return joiner::joinedWith; - } - return new JoinedFactory() { - @Override - public Joined create( - final Serde keySerde, - final Serde leftSerde, - final Serde rightSerde, - final String name) { - return joiner.joinedWith(keySerde, leftSerde, rightSerde, null); - } - }; + static JoinedFactory create(final Joiner joiner) { + return joiner::joinedWith; } @FunctionalInterface diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java index ada79f5fe82d..35eeb835176a 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/MaterializedFactory.java @@ -16,7 +16,6 @@ package io.confluent.ksql.execution.streams; import io.confluent.ksql.GenericRow; -import io.confluent.ksql.util.KsqlConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.StateStore; @@ -25,9 +24,8 @@ public interface MaterializedFactory { Materialized create( Serde keySerde, Serde valSerde, String name); - static MaterializedFactory create(final KsqlConfig ksqlConfig) { + static MaterializedFactory create() { return create( - ksqlConfig, new Materializer() { @Override public Materialized materializedWith( @@ -45,29 +43,16 @@ public Materialized materializedAs( ); } - static MaterializedFactory create( - final KsqlConfig ksqlConfig, - final Materializer materializer) { - if (StreamsUtil.useProvidedName(ksqlConfig)) { - return new MaterializedFactory() { - @Override - public Materialized create( - final Serde keySerde, - final Serde valSerde, - final String name) { - return materializer.materializedAs(name) - .withKeySerde(keySerde) - .withValueSerde(valSerde); - } - }; - } + static MaterializedFactory create(final Materializer materializer) { return new MaterializedFactory() { @Override public Materialized create( final Serde keySerde, final Serde valSerde, final String name) { - return materializer.materializedWith(keySerde, valSerde); + return materializer.materializedAs(name) + .withKeySerde(keySerde) + .withValueSerde(valSerde); } }; } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamJoinedFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamJoinedFactory.java index 02883ff7c1f6..feef1eda4da1 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamJoinedFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamJoinedFactory.java @@ -15,7 +15,6 @@ package io.confluent.ksql.execution.streams; -import io.confluent.ksql.util.KsqlConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.StreamJoined; @@ -28,21 +27,7 @@ StreamJoined create( String storeName); - static StreamJoinedFactory create(final KsqlConfig ksqlConfig) { - if (StreamsUtil.useProvidedName(ksqlConfig)) { - return new StreamJoinedFactory() { - @Override - public StreamJoined create( - final Serde keySerde, - final Serde leftSerde, - final Serde rightSerde, - final String name, - final String storeName) { - return StreamJoined.with(keySerde, leftSerde, rightSerde) - .withName(name); - } - }; - } + static StreamJoinedFactory create() { return new StreamJoinedFactory() { @Override public StreamJoined create( @@ -51,7 +36,8 @@ public StreamJoined create( final Serde rightSerde, final String name, final String storeName) { - return StreamJoined.with(keySerde, leftSerde, rightSerde); + return StreamJoined.with(keySerde, leftSerde, rightSerde) + .withName(name); } }; } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSourceBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSourceBuilder.java index cd4b793c1540..eb61639a6a5b 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSourceBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamSourceBuilder.java @@ -26,7 +26,6 @@ import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.serde.KeySerde; -import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy; import java.util.function.Function; import org.apache.kafka.common.serialization.Serde; @@ -158,18 +157,9 @@ private static KStream buildKStream( final Consumed consumed, final Function rowKeyGenerator ) { - // for really old topologies, we must inject a dummy step to sure state store names, - // which use the node id in the store name, stay consistent: - final boolean legacy = queryBuilder.getKsqlConfig() - .getBoolean(KsqlConfig.KSQL_INJECT_LEGACY_MAP_VALUES_NODE); - KStream stream = queryBuilder.getStreamsBuilder() .stream(streamSource.getTopicName(), consumed); - if (legacy) { - stream = stream.mapValues(value -> value); - } - return stream .transformValues(new AddKeyAndTimestampColumns<>(rowKeyGenerator)); } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsFactories.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsFactories.java index a1b361773620..a4ae750ab200 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsFactories.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsFactories.java @@ -27,10 +27,10 @@ public class StreamsFactories { public static StreamsFactories create(final KsqlConfig ksqlConfig) { Objects.requireNonNull(ksqlConfig); return new StreamsFactories( - GroupedFactory.create(ksqlConfig), - JoinedFactory.create(ksqlConfig), - MaterializedFactory.create(ksqlConfig), - StreamJoinedFactory.create(ksqlConfig) + GroupedFactory.create(), + JoinedFactory.create(), + MaterializedFactory.create(), + StreamJoinedFactory.create() ); } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsUtil.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsUtil.java index e1f2d884a738..6c9b09888b31 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsUtil.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamsUtil.java @@ -16,20 +16,11 @@ package io.confluent.ksql.execution.streams; import io.confluent.ksql.execution.context.QueryContext; -import io.confluent.ksql.util.KsqlConfig; -import java.util.Objects; public final class StreamsUtil { private StreamsUtil() { } - public static boolean useProvidedName(final KsqlConfig ksqlConfig) { - return Objects.equals( - ksqlConfig.getString(KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS), - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS_ON - ); - } - public static String buildOpName(final QueryContext opContext) { return String.join("-", opContext.getContext()); } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java index d1a3a37b0723..04a244359188 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/MaterializedFactoryTest.java @@ -21,11 +21,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableMap; import io.confluent.ksql.GenericRow; -import io.confluent.ksql.util.KsqlConfig; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.StateStore; import org.junit.Test; @@ -44,40 +41,11 @@ public class MaterializedFactoryTest { private Serde rowSerde; @Mock private MaterializedFactory.Materializer materializer; - @Mock - private Materialized materialized; - - @Test - public void shouldCreateMaterializedCorrectlyWhenOptimizationsDisabled() { - // Given: - final KsqlConfig ksqlConfig = new KsqlConfig( - ImmutableMap.of( - StreamsConfig.TOPOLOGY_OPTIMIZATION, - StreamsConfig.NO_OPTIMIZATION, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS_OFF) - ); - when(materializer.materializedWith(keySerde, rowSerde)).thenReturn(materialized); - - // When: - final Materialized returned - = MaterializedFactory.create(ksqlConfig, materializer).create( - keySerde, rowSerde, OP_NAME); - - // Then: - assertThat(returned, is(materialized)); - verify(materializer).materializedWith(keySerde, rowSerde); - } @Test @SuppressWarnings("unchecked") public void shouldCreateJoinedCorrectlyWhenOptimizationsEnabled() { // Given: - final KsqlConfig ksqlConfig = new KsqlConfig( - ImmutableMap.of( - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS, - KsqlConfig.KSQL_USE_NAMED_INTERNAL_TOPICS_ON) - ); final Materialized asName = mock(Materialized.class); when(materializer.materializedAs(OP_NAME)).thenReturn(asName); final Materialized withKeySerde = mock(Materialized.class); @@ -87,7 +55,7 @@ public void shouldCreateJoinedCorrectlyWhenOptimizationsEnabled() { // When: final Materialized returned - = MaterializedFactory.create(ksqlConfig, materializer).create( + = MaterializedFactory.create(materializer).create( keySerde, rowSerde, OP_NAME); // Then: diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSourceBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSourceBuilderTest.java index c90ea64215cd..037fb334724b 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSourceBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSourceBuilderTest.java @@ -104,10 +104,6 @@ public class StreamSourceBuilderTest { SOURCE_SCHEMA.withMetaAndKeyColsInValue().withAlias(ALIAS); private static final KsqlConfig KSQL_CONFIG = new KsqlConfig(ImmutableMap.of()); - private static final KsqlConfig LEGACY_KSQL_CONFIG = new KsqlConfig(ImmutableMap.of( - KsqlConfig.KSQL_INJECT_LEGACY_MAP_VALUES_NODE, - true - )); private final Set SERDE_OPTIONS = new HashSet<>(); private final PhysicalSchema PHYSICAL_SCHEMA = PhysicalSchema.from(SOURCE_SCHEMA, SERDE_OPTIONS); @@ -221,48 +217,6 @@ public void shouldReturnCorrectSchemaForUnwindowedSource() { assertThat(builtKstream.getSchema(), is(SCHEMA)); } - @Test - @SuppressWarnings("unchecked") - public void shouldApplyCorrectTransformationsToLegacySourceStream() { - // Given: - givenLegacyQuery(); - givenUnwindowedSource(); - - // When: - final KStreamHolder builtKstream = streamSource.build(planBuilder); - - // Then: - assertThat(builtKstream.getStream(), is(kStream)); - final InOrder validator = inOrder(streamsBuilder, kStream); - validator.verify(streamsBuilder).stream(eq(TOPIC_NAME), consumedCaptor.capture()); - validator.verify(kStream).mapValues(any(ValueMapper.class)); - validator.verify(kStream).transformValues(any(ValueTransformerWithKeySupplier.class)); - final Consumed consumed = consumedCaptor.getValue(); - final Consumed expected = Consumed - .with(keySerde, valueSerde) - .withTimestampExtractor(extractor) - .withOffsetResetPolicy(AutoOffsetReset.EARLIEST); - assertThat(consumed, equalTo(expected)); - } - - @Test - public void shouldAddPassThroughValueMapperForLegacy() { - // Given: - givenLegacyQuery(); - givenUnwindowedSource(); - - final ValueMapper mapper = - getMapperFromStreamSource(streamSource); - - final GenericRow expected = new GenericRow(new ArrayList<>(row.getColumns())); - - // When: - final GenericRow result = mapper.apply(row); - - // Then: - assertThat(result, is(expected)); - } - @Test @SuppressWarnings("unchecked") public void shouldNotBuildWithOffsetResetIfNotProvided() { @@ -532,7 +486,4 @@ private ValueMapper getMapperFromStreamSource( return mapperCaptor.getValue(); } - private void givenLegacyQuery() { - when(queryBuilder.getKsqlConfig()).thenReturn(LEGACY_KSQL_CONFIG); - } }