Skip to content

Commit

Permalink
chore: remove all CompatibilityBreakingConfigs (#3933)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Nov 21, 2019
1 parent f35e2c0 commit 8cc913a
Show file tree
Hide file tree
Showing 38 changed files with 278 additions and 1,649 deletions.
6 changes: 3 additions & 3 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -849,11 +849,11 @@ public void shouldDescribeOverloadedScalarFunction() {
assertThat(output, containsString(
"\tVariation : SUBSTRING(str VARCHAR, pos INT)\n"
+ "\tReturns : VARCHAR\n"
+ "\tDescription : Returns a substring of str that starts at pos and continues to the end"
+ "\tDescription : Returns a substring of str from pos to the end of str"
));
assertThat(output, containsString(
"\tstr : The source string. If null, then function returns null.\n"
+ "\tpos : The base-one position the substring starts from."
"\tstr : The source string.\n"
+ "\tpos : The base-one position to start from."
));
}

Expand Down
170 changes: 17 additions & 153 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ public class KsqlConfig extends AbstractConfig {
public static final String METRIC_REPORTER_CLASSES_DOC =
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;

public static final String SINK_NUMBER_OF_PARTITIONS_PROPERTY = "ksql.sink.partitions";

public static final String SINK_NUMBER_OF_REPLICAS_PROPERTY = "ksql.sink.replicas";

public static final String KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY = "ksql.internal.topic.replicas";

public static final String KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS_PROPERTY =
Expand Down Expand Up @@ -97,6 +93,8 @@ public class KsqlConfig extends AbstractConfig {
KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG = "ksql.persistent.prefix";
public static final String
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT = "query_";
public static final String
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DOC = "Prefixes persistent queries with this value.";

public static final String
KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG = "ksql.transient.prefix";
Expand All @@ -111,28 +109,6 @@ public class KsqlConfig extends AbstractConfig {
+ "'CREATE STREAM S AS ...' will create a topic 'thing-S', where as the statement "
+ "'CREATE STREAM S WITH(KAFKA_TOPIC = 'foo') AS ...' will create a topic 'foo'.";

public static final String KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG =
KSQL_FUNCTIONS_PROPERTY_PREFIX + "substring.legacy.args";
private static final String
KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_DOCS = "Switch the SUBSTRING function into legacy mode,"
+ " i.e. back to how it was in version 5.0 and earlier of KSQL."
+ " Up to version 5.0.x substring took different args:"
+ " VARCHAR SUBSTRING(str VARCHAR, startIndex INT, endIndex INT), where startIndex and"
+ " endIndex were both base-zero indexed, e.g. a startIndex of '0' selected the start of the"
+ " string, and the last argument is a character index, rather than the length of the"
+ " substring to extract. Later versions of KSQL use:"
+ " VARCHAR SUBSTRING(str VARCHAR, pos INT, length INT), where pos is base-one indexed,"
+ " and the last argument is the length of the substring to extract.";

public static final String KSQL_WINDOWED_SESSION_KEY_LEGACY_CONFIG =
KSQL_CONFIG_PROPERTY_PREFIX + "windowed.session.key.legacy";

private static final String KSQL_WINDOWED_SESSION_KEY_LEGACY_DOC = ""
+ "Version 5.1 of KSQL and earlier incorrectly excluded the end time in the record key in "
+ "Kafka for session windowed data. Setting this value to true will make KSQL expect and "
+ "continue to store session keys without the end time. With the default value of false "
+ "new queries will now correctly store the session end time as part of the key";

public static final String KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG =
"ksql.query.persistent.active.limit";
private static final int KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DEFAULT = Integer.MAX_VALUE;
Expand All @@ -141,22 +117,6 @@ public class KsqlConfig extends AbstractConfig {
+ "in interactive mode. Once this limit is reached, any further persistent queries will not "
+ "be accepted.";

public static final String KSQL_USE_NAMED_INTERNAL_TOPICS = "ksql.named.internal.topics";
private static final String KSQL_USE_NAMED_INTERNAL_TOPICS_DOC = "";
public static final String KSQL_USE_NAMED_INTERNAL_TOPICS_ON = "on";
public static final String KSQL_USE_NAMED_INTERNAL_TOPICS_OFF = "off";
private static final Validator KSQL_USE_NAMED_INTERNAL_TOPICS_VALIDATOR = ValidString.in(
KSQL_USE_NAMED_INTERNAL_TOPICS_ON, KSQL_USE_NAMED_INTERNAL_TOPICS_OFF
);

public static final String KSQL_USE_NAMED_AVRO_MAPS = "ksql.avro.maps.named";
private static final String KSQL_USE_NAMED_AVRO_MAPS_DOC = "";

public static final String KSQL_LEGACY_REPARTITION_ON_GROUP_BY_ROWKEY =
"ksql.query.stream.groupby.rowkey.repartition";
public static final String KSQL_INJECT_LEGACY_MAP_VALUES_NODE =
"ksql.query.inject.legacy.map.values.node";

public static final String KSQL_WRAP_SINGLE_VALUES =
"ksql.persistence.wrap.single.values";

Expand Down Expand Up @@ -224,99 +184,7 @@ public class KsqlConfig extends AbstractConfig {
+ "milliseconds when waiting for rebalancing of the stream store during a pull query";

public static final Collection<CompatibilityBreakingConfigDef> COMPATIBLY_BREAKING_CONFIG_DEFS
= ImmutableList.of(
new CompatibilityBreakingConfigDef(
KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG,
ConfigDef.Type.STRING,
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT,
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT,
ConfigDef.Importance.MEDIUM,
Optional.empty(),
"Second part of the prefix for persistent queries. For instance if "
+ "the prefix is query_ the query name will be ksql_query_1."),
new CompatibilityBreakingConfigDef(
KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG,
ConfigDef.Type.BOOLEAN,
false,
false,
ConfigDef.Importance.LOW,
Optional.empty(),
KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_DOCS),
new CompatibilityBreakingConfigDef(
KSQL_WINDOWED_SESSION_KEY_LEGACY_CONFIG,
ConfigDef.Type.BOOLEAN,
false,
false,
ConfigDef.Importance.LOW,
Optional.empty(),
KSQL_WINDOWED_SESSION_KEY_LEGACY_DOC),
new CompatibilityBreakingConfigDef(
KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG,
ConfigDef.Type.INT,
KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DEFAULT,
KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DEFAULT,
ConfigDef.Importance.LOW,
Optional.empty(),
KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DOC),
new CompatibilityBreakingConfigDef(
KSQL_USE_NAMED_INTERNAL_TOPICS,
ConfigDef.Type.STRING,
KSQL_USE_NAMED_INTERNAL_TOPICS_ON,
KSQL_USE_NAMED_INTERNAL_TOPICS_ON,
ConfigDef.Importance.LOW,
KSQL_USE_NAMED_INTERNAL_TOPICS_DOC,
Optional.empty(),
KSQL_USE_NAMED_INTERNAL_TOPICS_VALIDATOR),
new CompatibilityBreakingConfigDef(
SINK_NUMBER_OF_PARTITIONS_PROPERTY,
Type.INT,
null,
null,
Importance.LOW,
Optional.empty(),
"The legacy default number of partitions for the topics created by KSQL"
+ "in 5.2 and earlier versions."
+ "This property should not be set for 5.3 and later versions."),
new CompatibilityBreakingConfigDef(
SINK_NUMBER_OF_REPLICAS_PROPERTY,
ConfigDef.Type.SHORT,
null,
null,
ConfigDef.Importance.LOW,
Optional.empty(),
"The default number of replicas for the topics created by KSQL "
+ "in 5.2 and earlier versions."
+ "This property should not be set for 5.3 and later versions."
),
new CompatibilityBreakingConfigDef(
KSQL_USE_NAMED_AVRO_MAPS,
ConfigDef.Type.BOOLEAN,
true,
true,
ConfigDef.Importance.LOW,
Optional.empty(),
KSQL_USE_NAMED_AVRO_MAPS_DOC
),
new CompatibilityBreakingConfigDef(
KSQL_LEGACY_REPARTITION_ON_GROUP_BY_ROWKEY,
ConfigDef.Type.BOOLEAN,
false,
false,
ConfigDef.Importance.LOW,
Optional.empty(),
"Ensures legacy queries that perform a 'GROUP BY ROWKEY' continue to "
+ "perform an unnecessary repartition step"
),
new CompatibilityBreakingConfigDef(
KSQL_INJECT_LEGACY_MAP_VALUES_NODE,
ConfigDef.Type.BOOLEAN,
false,
false,
ConfigDef.Importance.LOW,
Optional.empty(),
"Ensures legacy queries maintian the same topology"
)
);
= ImmutableList.of();

private enum ConfigGeneration {
LEGACY,
Expand Down Expand Up @@ -399,12 +267,7 @@ void defineCurrent(final ConfigDef configDef) {
}

private static final Collection<CompatibilityBreakingStreamsConfig>
COMPATIBILITY_BREAKING_STREAMS_CONFIGS = ImmutableList.of(
new CompatibilityBreakingStreamsConfig(
StreamsConfig.TOPOLOGY_OPTIMIZATION,
StreamsConfig.OPTIMIZE,
StreamsConfig.OPTIMIZE)
);
COMPATIBILITY_BREAKING_STREAMS_CONFIGS = ImmutableList.of();

private static final class CompatibilityBreakingStreamsConfig {
final String name;
Expand Down Expand Up @@ -617,8 +480,21 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DEFAULT,
Importance.LOW,
KSQL_PULL_QUERIES_SKIP_ACCESS_VALIDATOR_DOC
).define(
KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG,
Type.STRING,
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT,
Importance.LOW,
KSQL_PERSISTENT_QUERY_NAME_PREFIX_DOC
).define(
KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG,
Type.INT,
KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DEFAULT,
Importance.MEDIUM,
KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
: COMPATIBLY_BREAKING_CONFIG_DEFS) {
if (generation == ConfigGeneration.CURRENT) {
Expand Down Expand Up @@ -722,7 +598,6 @@ private KsqlConfig(final ConfigGeneration generation, final Map<?, ?> props) {
generation == ConfigGeneration.CURRENT
? config.defaultValueCurrent : config.defaultValueLegacy));
this.ksqlStreamConfigProps = buildStreamingConfig(streamsConfigDefaults, originals());
validate();
}

private boolean getBooleanConfig(final String config, final boolean defaultValue) {
Expand All @@ -740,17 +615,6 @@ private KsqlConfig(final ConfigGeneration generation,
this.ksqlStreamConfigProps = ksqlStreamConfigProps;
}

private void validate() {
final Object optimizationsConfig = getKsqlStreamConfigProps().get(
StreamsConfig.TOPOLOGY_OPTIMIZATION);
final Object useInternalNamesConfig = get(KSQL_USE_NAMED_INTERNAL_TOPICS);
if (Objects.equals(optimizationsConfig, StreamsConfig.OPTIMIZE)
&& useInternalNamesConfig.equals(KSQL_USE_NAMED_INTERNAL_TOPICS_OFF)) {
throw new RuntimeException(
"Internal topic naming must be enabled if streams optimizations enabled");
}
}

public Map<String, Object> getKsqlStreamConfigProps() {
final Map<String, Object> props = new HashMap<>();
for (final ConfigValue config : ksqlStreamConfigProps.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,21 @@ public void setUp() {

@Test
public void shouldResolveKsqlProperty() {
assertThat(resolver.resolve(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, true),
is(resolvedItem(KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY, KSQL_CONFIG_DEF)));
assertThat(resolver.resolve(KsqlConfig.CONNECT_URL_PROPERTY, true),
is(resolvedItem(KsqlConfig.CONNECT_URL_PROPERTY, KSQL_CONFIG_DEF)));
}

@Test
public void shouldNotFindPrefixedKsqlProperty() {
assertNotFound(
KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX + KsqlConfig.SINK_NUMBER_OF_PARTITIONS_PROPERTY);
KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX + KsqlConfig.CONNECT_URL_PROPERTY);
}

@Test
public void shouldNotFindUnknownKsqlProperty() {
assertNotFound(KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX + "you.won't.find.me...right");
}

@Test
public void shouldResolveKnownKsqlFunctionProperty() {
assertThat(resolver.resolve(KsqlConfig.KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG, true),
is(resolvedItem(KsqlConfig.KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG, KSQL_CONFIG_DEF)));
}

@Test
public void shouldReturnUnresolvedForOtherKsqlFunctionProperty() {
assertThat(
Expand Down
Loading

0 comments on commit 8cc913a

Please sign in to comment.