Skip to content

Commit

Permalink
feat: drop legacy key field functionality (#3764)
Browse files Browse the repository at this point in the history
* feat: drop legacy key field functionality

BREAKING CHANGE

This drops functionality that maintained a legacy version of the key field.
This legacy version was often wrong and resulted in unnecessary repartition steps being performed.
The functionality was previously required to ensure existing legacy queries continued to have the unnecessary repartitions to ensure no data loss.

With a major version bump this can now be dropped.
  • Loading branch information
big-andy-coates authored Nov 6, 2019
1 parent 1dfdb68 commit 5369dc2
Show file tree
Hide file tree
Showing 50 changed files with 397 additions and 2,124 deletions.
13 changes: 0 additions & 13 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_USE_NAMED_AVRO_MAPS = "ksql.avro.maps.named";
private static final String KSQL_USE_NAMED_AVRO_MAPS_DOC = "";

public static final String KSQL_USE_LEGACY_KEY_FIELD = "ksql.query.fields.key.legacy";
public static final String KSQL_LEGACY_REPARTITION_ON_GROUP_BY_ROWKEY =
"ksql.query.stream.groupby.rowkey.repartition";
public static final String KSQL_INJECT_LEGACY_MAP_VALUES_NODE =
Expand Down Expand Up @@ -271,18 +270,6 @@ public class KsqlConfig extends AbstractConfig {
Optional.empty(),
KSQL_USE_NAMED_AVRO_MAPS_DOC
),
new CompatibilityBreakingConfigDef(
KSQL_USE_LEGACY_KEY_FIELD,
ConfigDef.Type.BOOLEAN,
false,
false,
ConfigDef.Importance.LOW,
Optional.empty(),
"Determines if the legacy key field is used when building queries. "
+ "This setting is automatically applied for persistent queries started by "
+ "older versions of KSQL. "
+ "This setting should not be set manually."
),
new CompatibilityBreakingConfigDef(
KSQL_LEGACY_REPARTITION_ON_GROUP_BY_ROWKEY,
ConfigDef.Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlType;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -74,7 +72,7 @@ public DdlCommandResult executeCreateStream(final CreateStreamCommand createStre
createStream.getSourceName(),
createStream.getSchema(),
createStream.getSerdeOptions(),
getKeyField(createStream.getKeyField(), createStream.getSchema()),
getKeyField(createStream.getKeyField()),
createStream.getTimestampExtractionPolicy(),
createStream.getTopic()
);
Expand All @@ -89,7 +87,7 @@ public DdlCommandResult executeCreateTable(final CreateTableCommand createTable)
createTable.getSourceName(),
createTable.getSchema(),
createTable.getSerdeOptions(),
getKeyField(createTable.getKeyField(), createTable.getSchema()),
getKeyField(createTable.getKeyField()),
createTable.getTimestampExtractionPolicy(),
createTable.getTopic()
);
Expand Down Expand Up @@ -129,25 +127,11 @@ public DdlCommandResult executeDropType(final DropTypeCommand dropType) {
: new DdlCommandResult(true, "Type '" + typeName + "' does not exist");
}

private KeyField getKeyField(
final Optional<ColumnName> keyFieldName,
final LogicalSchema schema
) {
if (keyFieldOverride.isPresent()) {
return keyFieldOverride.get();
}
if (keyFieldName.isPresent()) {
// for DDL commands, the key name is never specified with a source
final ColumnRef columnRef = ColumnRef.withoutSource(keyFieldName.get());
final Column keyColumn = schema.findValueColumn(columnRef)
.orElseThrow(() -> new IllegalStateException(
"The KEY column set in the WITH clause does not exist in the schema: '"
+ keyFieldName + "'"
));
return KeyField.of(columnRef, keyColumn);
} else {
return KeyField.none();
}
private KeyField getKeyField(final Optional<ColumnName> keyFieldName) {
return keyFieldOverride
.orElseGet(() -> keyFieldName
.map(columnName -> KeyField.of(ColumnRef.withoutSource(columnName)))
.orElseGet(KeyField::none));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ private void validateExistingSink(
}

enforceKeyEquivalence(
existing.getKeyField().resolve(existingSchema, ksqlConfig),
keyField.resolve(resultSchema, ksqlConfig)
existing.getKeyField().resolve(existingSchema),
keyField.resolve(resultSchema)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ private KeyField buildOutputKeyField(
final LogicalSchema schema = sourcePlanNode.getSchema();

if (schema.isMetaColumn(partitionBy.name())) {
return sourceKeyField.withName(Optional.empty());
return KeyField.none();
}

if (schema.isKeyColumn(partitionBy.name())) {
return sourceKeyField;
}

return sourceKeyField.withName(partitionBy);
return KeyField.of(partitionBy);
}

private TimestampExtractionPolicy getTimestampExtractionPolicy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,7 @@ public AggregateNode(
this.finalSelectExpressions =
requireNonNull(finalSelectExpressions, "finalSelectExpressions");
this.havingExpressions = havingExpressions;
this.keyField = KeyField.of(
requireNonNull(keyFieldName, "keyFieldName"),
Optional.empty())
this.keyField = KeyField.of(requireNonNull(keyFieldName, "keyFieldName"))
.validateKeyExistsIn(schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,8 @@ public DataSourceNode(
// and a KS transformValues to add the implicit fields
this.schema = StreamSource.getSchemaWithMetaAndKeyFields(alias, dataSource.getSchema());

final Optional<ColumnRef> keyFieldName = dataSource.getKeyField()
this.keyField = dataSource.getKeyField()
.withAlias(alias)
.ref();

this.keyField = KeyField.of(keyFieldName, dataSource.getKeyField().legacy())
.validateKeyExistsIn(schema.getSchema());

this.schemaKStreamFactory = requireNonNull(schemaKStreamFactory, "schemaKStreamFactory");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KeyField.LegacyField;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.schema.ksql.Column;
Expand Down Expand Up @@ -73,7 +72,7 @@ public JoinNode(
final Optional<WithinExpression> withinExpression
) {
super(id, calculateSinkType(left, right));
this.joinType = joinType;
this.joinType = Objects.requireNonNull(joinType, "joinType");
this.left = Objects.requireNonNull(left, "left");
this.right = Objects.requireNonNull(right, "right");
this.leftJoinFieldName = Objects.requireNonNull(leftJoinFieldName, "leftJoinFieldName");
Expand All @@ -84,9 +83,11 @@ public JoinNode(
final Column leftKeyCol = validateSchemaColumn(leftJoinFieldName, left.getSchema());
validateSchemaColumn(rightJoinFieldName, right.getSchema());

this.keyField = KeyField
.of(leftKeyCol.ref(),
LegacyField.of(leftKeyCol.ref(), leftKeyCol.type()));
this.keyField = joinType == JoinType.OUTER
? KeyField.none() // Both source key columns can be null, hence neither can be the keyField
: left.getSchema().isKeyColumn(leftKeyCol.name())
? left.getKeyField()
: KeyField.of(leftKeyCol.ref());

this.schema = buildSchema(left, right);
}
Expand Down Expand Up @@ -216,18 +217,19 @@ private abstract static class Joiner<K> {

public abstract SchemaKStream<K> join();

protected SchemaKStream<K> buildStream(
SchemaKStream<K> buildStream(
final PlanNode node,
final ColumnRef joinFieldName
) {
return maybeRePartitionByKey(
node.buildStream(builder),
joinFieldName,
contextStacker);
contextStacker
);
}

@SuppressWarnings("unchecked")
protected SchemaKTable<K> buildTable(
SchemaKTable<K> buildTable(
final PlanNode node,
final ColumnRef joinFieldName,
final SourceName tableName
Expand All @@ -244,7 +246,7 @@ protected SchemaKTable<K> buildTable(

final Optional<Column> keyColumn = schemaKStream
.getKeyField()
.resolve(schemaKStream.getSchema(), builder.getKsqlConfig());
.resolve(schemaKStream.getSchema());

final ColumnRef rowKey = ColumnRef.of(
tableName,
Expand Down Expand Up @@ -281,12 +283,6 @@ static <K> SchemaKStream<K> maybeRePartitionByKey(
final ColumnRef joinFieldName,
final Stacker contextStacker
) {
final LogicalSchema schema = stream.getSchema();

schema.findValueColumn(joinFieldName)
.orElseThrow(() ->
new KsqlException("couldn't find key field: " + joinFieldName + " in schema"));

return stream.selectKey(joinFieldName, true, contextStacker);
}

Expand All @@ -295,39 +291,6 @@ static ValueFormat getFormatForSource(final DataSourceNode sourceNode) {
.getKsqlTopic()
.getValueFormat();
}

/**
* The key field of the resultant joined stream.
*
* @param leftAlias the alias of the left source.
* @param leftKeyField the key field of the left source.
* @return the key field that should be used by the resultant joined stream.
*/
static KeyField getJoinedKeyField(final SourceName leftAlias, final KeyField leftKeyField) {
final Optional<ColumnRef> latest = Optional
.of(leftKeyField.ref().orElse(ColumnRef.withoutSource(SchemaUtil.ROWKEY_NAME)));

return KeyField.of(latest, leftKeyField.legacy())
.withAlias(leftAlias);
}

/**
* The key field of the resultant joined stream for OUTER joins.
*
* <p>Note: for outer joins neither source's key field can be used as they may be null.
*
* @param leftAlias the alias of the left source.
* @param leftKeyField the key field of the left source.
* @return the key field that should be used by the resultant joined stream.
*/
static KeyField getOuterJoinedKeyField(
final SourceName leftAlias,
final KeyField leftKeyField
) {
return KeyField.none()
.withLegacy(leftKeyField.legacy())
.withAlias(leftAlias);
}
}

private static final class StreamToStreamJoiner<K> extends Joiner<K> {
Expand Down Expand Up @@ -360,7 +323,7 @@ public SchemaKStream<K> join() {
return leftStream.leftJoin(
rightStream,
joinNode.schema,
getJoinedKeyField(joinNode.left.getAlias(), leftStream.getKeyField()),
joinNode.keyField,
joinNode.withinExpression.get().joinWindow(),
getFormatForSource(joinNode.left),
getFormatForSource(joinNode.right),
Expand All @@ -370,7 +333,7 @@ public SchemaKStream<K> join() {
return leftStream.outerJoin(
rightStream,
joinNode.schema,
getOuterJoinedKeyField(joinNode.left.getAlias(), leftStream.getKeyField()),
joinNode.keyField,
joinNode.withinExpression.get().joinWindow(),
getFormatForSource(joinNode.left),
getFormatForSource(joinNode.right),
Expand All @@ -380,7 +343,7 @@ public SchemaKStream<K> join() {
return leftStream.join(
rightStream,
joinNode.schema,
getJoinedKeyField(joinNode.left.getAlias(), leftStream.getKeyField()),
joinNode.keyField,
joinNode.withinExpression.get().joinWindow(),
getFormatForSource(joinNode.left),
getFormatForSource(joinNode.right),
Expand Down Expand Up @@ -421,7 +384,7 @@ public SchemaKStream<K> join() {
return leftStream.leftJoin(
rightTable,
joinNode.schema,
getJoinedKeyField(joinNode.left.getAlias(), leftStream.getKeyField()),
joinNode.keyField,
getFormatForSource(joinNode.left),
contextStacker
);
Expand All @@ -430,7 +393,7 @@ public SchemaKStream<K> join() {
return leftStream.join(
rightTable,
joinNode.schema,
getJoinedKeyField(joinNode.left.getAlias(), leftStream.getKeyField()),
joinNode.keyField,
getFormatForSource(joinNode.left),
contextStacker
);
Expand Down Expand Up @@ -472,19 +435,19 @@ public SchemaKTable<K> join() {
return leftTable.leftJoin(
rightTable,
joinNode.schema,
getJoinedKeyField(joinNode.left.getAlias(), leftTable.getKeyField()),
joinNode.keyField,
contextStacker);
case INNER:
return leftTable.join(
rightTable,
joinNode.schema,
getJoinedKeyField(joinNode.left.getAlias(), leftTable.getKeyField()),
joinNode.keyField,
contextStacker);
case OUTER:
return leftTable.outerJoin(
rightTable,
joinNode.schema,
getOuterJoinedKeyField(joinNode.left.getAlias(), leftTable.getKeyField()),
joinNode.keyField,
contextStacker);
default:
throw new KsqlException("Invalid join type encountered: " + joinNode.joinType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.ThreadLocalRandom;

Expand All @@ -39,7 +38,7 @@ public KsqlBareOutputNode(
final TimestampExtractionPolicy extractionPolicy
) {
super(id, source, schema, limit, extractionPolicy);
this.keyField = KeyField.of(source.getKeyField().ref(), Optional.empty())
this.keyField = KeyField.of(source.getKeyField().ref())
.validateKeyExistsIn(schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,23 +141,15 @@ private SchemaKStream<?> createOutputStream(
final SchemaKStream schemaKStream,
final QueryContext.Stacker contextStacker
) {

if (schemaKStream instanceof SchemaKTable) {
return schemaKStream;
}

final KeyField resultKeyField = KeyField.of(
schemaKStream.getKeyField().ref(),
getKeyField().legacy()
);

final SchemaKStream result = schemaKStream.withKeyField(resultKeyField);

if (!partitionByField.isPresent()) {
return result;
return schemaKStream;
}

return result.selectKey(partitionByField.get(), false, contextStacker);
return schemaKStream.selectKey(partitionByField.get(), false, contextStacker);
}

private void validatePartitionByField() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ public ProjectNode(
this.source = requireNonNull(source, "source");
this.schema = requireNonNull(schema, "schema");
this.projectExpressions = ImmutableList.copyOf(source.getSelectExpressions());
this.keyField = KeyField.of(
requireNonNull(keyFieldName, "keyFieldName"),
source.getKeyField().legacy())
this.keyField = KeyField.of(requireNonNull(keyFieldName, "keyFieldName"))
.validateKeyExistsIn(schema);

if (schema.value().size() != projectExpressions.size()) {
Expand Down
Loading

0 comments on commit 5369dc2

Please sign in to comment.