From ea0a0ace917ae73d58143a7ab15ce5d8cc9f17c7 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Mon, 27 Jan 2020 22:49:48 +0000 Subject: [PATCH] fix: reserve `WINDOWSTART` and `WINDOWEND` as system column names (#4388) * fix: reserve `WINDOWSTART` and `WINDOWEND` as system column names BREAKING CHANGE: `WINDOWSTART` and `WINDOWEND` are now reserved system column names. Any query that previously used those names will need to be changed: for example, alias the columns to a different name. These column names are being reserved for use as system columns when dealing with streams and tables that have a windowed key. --- .../ksql/schema/ksql/LogicalSchema.java | 12 ++++ .../io/confluent/ksql/util/SchemaUtil.java | 12 +++- .../ksql/schema/ksql/LogicalSchemaTest.java | 28 ++++++++ .../io/confluent/ksql/analyzer/Analyzer.java | 3 +- .../ddl/commands/CreateSourceFactory.java | 6 +- .../ksql/planner/LogicalPlanner.java | 4 +- .../ksql/structured/SchemaKStream.java | 12 ++-- .../ddl/commands/CreateSourceFactoryTest.java | 55 +++++++++++--- .../ddl/commands/CreateSourceCommand.java | 5 +- .../ddl/commands/CreateSourceCommandTest.java | 62 ++++++++++++++-- .../query-validation-tests/elements.json | 71 +++++++++++++++++-- .../metastore/model/StructuredDataSource.java | 6 +- .../model/StructuredDataSourceTest.java | 30 ++++++++ .../ksql/parser/tree/SingleColumn.java | 4 +- .../ksql/parser/tree/SingleColumnTest.java | 45 ++++++++++++ .../rest/entity/TableRowsEntityFactory.java | 6 +- 16 files changed, 314 insertions(+), 47 deletions(-) create mode 100644 ksql-parser/src/test/java/io/confluent/ksql/parser/tree/SingleColumnTest.java diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java index ecd7e19de6d7..a3ad16ad39cd 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java @@ -119,6 +119,18 @@ public Optional findValueColumn(final ColumnRef columnRef) { return findColumnMatching(withNamespace(Namespace.VALUE).and(withRef(columnRef))); } + /** + * Checks to see if value namespace contain any of the supplied names. + * + * @param names the names to check for. + * @return {@code true} if any of the supplied names exist in the value namespace. + */ + public boolean valueContainsAny(final Set names) { + return value().stream() + .map(Column::name) + .anyMatch(names::contains); + } + /** * Copies metadata and key columns to the value schema. * diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java b/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java index 63fb7625a764..9fd63577c0bc 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java @@ -45,8 +45,14 @@ public final class SchemaUtil { public static final ColumnName ROWKEY_NAME = ColumnName.of("ROWKEY"); public static final ColumnName ROWTIME_NAME = ColumnName.of("ROWTIME"); public static final ColumnName WINDOWSTART_NAME = ColumnName.of("WINDOWSTART"); + public static final ColumnName WINDOWEND_NAME = ColumnName.of("WINDOWEND"); - public static final int ROWKEY_INDEX = 1; + private static final Set SYSTEM_COLUMN_NAMES = ImmutableSet.of( + ROWKEY_NAME, + ROWTIME_NAME, + WINDOWSTART_NAME, + WINDOWEND_NAME + ); private static final Set ARITHMETIC_TYPES = ImmutableSet.of( Type.INT8, @@ -62,6 +68,10 @@ public final class SchemaUtil { private SchemaUtil() { } + public static Set systemColumnNames() { + return SYSTEM_COLUMN_NAMES; + } + // Do Not use in new code - use `SchemaConverters` directly. public static Class getJavaType(final Schema schema) { return SchemaConverters.sqlToJavaConverter().toJavaType( diff --git a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java index b9df155c0a7b..ffa879ac13aa 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java @@ -32,6 +32,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import com.google.common.collect.ImmutableSet; import com.google.common.testing.EqualsTester; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.Column.Namespace; @@ -632,6 +633,33 @@ public void shouldSupportCopyingColumnsFromOtherSchemas() { )); } + @Test + public void shouldMatchAnyValueSchema() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .valueColumn(F0, STRING) + .keyColumn(K0, BIGINT) + .valueColumn(F1, BIGINT) + .build(); + + // Then: + assertThat(schema.valueContainsAny(ImmutableSet.of(F0)), is(true)); + assertThat(schema.valueContainsAny(ImmutableSet.of(V0, F0, F1, V1)), is(true)); + } + + @Test + public void shouldOnlyMatchValueSchema() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .valueColumn(F0, STRING) + .keyColumn(K0, BIGINT) + .valueColumn(F1, BIGINT) + .build(); + + // Then: + assertThat(schema.valueContainsAny(ImmutableSet.of(K0, V0, ROWTIME_NAME)), is(false)); + } + private static org.apache.kafka.connect.data.Field connectField( final String fieldName, final int index, diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index 768f089446ac..cb368a419629 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -569,8 +569,7 @@ public void validate() { private void addSelectItem(final Expression exp, final ColumnName columnName) { if (persistent) { - if (SchemaUtil.ROWTIME_NAME.equals(columnName) - || SchemaUtil.ROWKEY_NAME.equals(columnName)) { + if (SchemaUtil.systemColumnNames().contains(columnName)) { throw new KsqlException("Reserved column name in select: " + columnName + ". " + "Please remove or alias the column."); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java index 9e75682cc685..7c9290388c69 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java @@ -171,12 +171,12 @@ private static LogicalSchema buildSchema(final TableElements tableElements) { } tableElements.forEach(e -> { - if (e.getName().equals(SchemaUtil.ROWTIME_NAME)) { + final boolean isRowKey = e.getName().equals(SchemaUtil.ROWKEY_NAME); + + if (!isRowKey && SchemaUtil.systemColumnNames().contains(e.getName())) { throw new KsqlException("'" + e.getName().name() + "' is a reserved column name."); } - final boolean isRowKey = e.getName().equals(SchemaUtil.ROWKEY_NAME); - if (e.getNamespace() == Namespace.KEY) { if (!isRowKey) { throw new KsqlException("'" + e.getName().name() + "' is an invalid KEY column name. " diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java index ba7fb6cf6d4f..6a20650c3ef3 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java @@ -175,9 +175,7 @@ private AggregateNode buildAggregateNode(final PlanNode sourcePlanNode) { : null; final Optional keyFieldName = getSelectAliasMatching((expression, alias) -> - expression.equals(groupBy) - && !SchemaUtil.isFieldName(alias.name(), SchemaUtil.ROWTIME_NAME.name()) - && !SchemaUtil.isFieldName(alias.name(), SchemaUtil.ROWKEY_NAME.name()), + expression.equals(groupBy) && !SchemaUtil.systemColumnNames().contains(alias), sourcePlanNode.getSelectExpressions()); return new AggregateNode( diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index eb5901251b21..c9e5950228b0 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -188,8 +188,8 @@ KeyField findKeyField(final List selectExpressions) { } final Optional filtered = found - .filter(f -> !SchemaUtil.isFieldName(f.name().name(), SchemaUtil.ROWTIME_NAME.name())) - .filter(f -> !SchemaUtil.isFieldName(f.name().name(), SchemaUtil.ROWKEY_NAME.name())) + // System columns can not be key fields: + .filter(f -> !SchemaUtil.systemColumnNames().contains(f.name())) .map(Column::ref); return KeyField.of(filtered); @@ -326,14 +326,14 @@ public SchemaKStream selectKey( final Expression keyExpression, final QueryContext.Stacker contextStacker ) { - if (keyFormat.isWindowed()) { - throw new UnsupportedOperationException("Can not selectKey of windowed stream"); - } - if (!needsRepartition(keyExpression)) { return (SchemaKStream) this; } + if (keyFormat.isWindowed()) { + throw new UnsupportedOperationException("Can not selectKey of windowed stream"); + } + final StreamSelectKey step = ExecutionStepFactory.streamSelectKey( contextStacker, sourceStep, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java index 013005a8a4c3..7c14f6e08bc8 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceFactoryTest.java @@ -21,11 +21,14 @@ import static io.confluent.ksql.parser.tree.TableElement.Namespace.KEY; import static io.confluent.ksql.parser.tree.TableElement.Namespace.VALUE; import static io.confluent.ksql.schema.ksql.ColumnMatchers.keyColumn; +import static io.confluent.ksql.schema.ksql.types.SqlTypes.BIGINT; import static io.confluent.ksql.serde.Format.AVRO; import static io.confluent.ksql.serde.Format.JSON; import static io.confluent.ksql.serde.Format.KAFKA; import static io.confluent.ksql.util.SchemaUtil.ROWKEY_NAME; import static io.confluent.ksql.util.SchemaUtil.ROWTIME_NAME; +import static io.confluent.ksql.util.SchemaUtil.WINDOWEND_NAME; +import static io.confluent.ksql.util.SchemaUtil.WINDOWSTART_NAME; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; @@ -94,10 +97,10 @@ public class CreateSourceFactoryTest { tableElement(Namespace.KEY, ROWKEY_NAME.name(), new Type(SqlTypes.INTEGER)); private static final TableElement ELEMENT1 = - tableElement(Namespace.VALUE, "bob", new Type(SqlTypes.STRING)); + tableElement(VALUE, "bob", new Type(SqlTypes.STRING)); private static final TableElement ELEMENT2 = - tableElement(Namespace.VALUE, "hojjat", new Type(SqlTypes.BIGINT)); + tableElement(VALUE, "hojjat", new Type(BIGINT)); private static final TableElements ONE_ELEMENTS = TableElements.of(ELEMENT1); @@ -107,7 +110,7 @@ public class CreateSourceFactoryTest { private static final LogicalSchema EXPECTED_SCHEMA = LogicalSchema.builder() .keyColumn(ROWKEY_NAME, SqlTypes.INTEGER) .valueColumn(ColumnName.of("bob"), SqlTypes.STRING) - .valueColumn(ColumnName.of("hojjat"), SqlTypes.BIGINT) + .valueColumn(ColumnName.of("hojjat"), BIGINT) .build(); private static final String TOPIC_NAME = "some topic"; @@ -186,8 +189,8 @@ public void shouldCreateCommandForCreateTable() { // Given: final CreateTable ddlStatement = new CreateTable(SOME_NAME, TableElements.of( - tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)), - tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))), + tableElement(VALUE, "COL1", new Type(BIGINT)), + tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))), true, withProperties); // When: @@ -578,7 +581,7 @@ public void shouldBuildSchemaWithExplicitKeyFieldForStream() { assertThat(result.getSchema(), is(LogicalSchema.builder() .keyColumn(ColumnName.of("ROWKEY"), SqlTypes.STRING) .valueColumn(ColumnName.of("bob"), SqlTypes.STRING) - .valueColumn(ColumnName.of("hojjat"), SqlTypes.BIGINT) + .valueColumn(ColumnName.of("hojjat"), BIGINT) .build() )); } @@ -732,7 +735,7 @@ public void shouldThrowOnRowTimeValueColumn() { // Given: final CreateStream statement = new CreateStream( SOME_NAME, - TableElements.of(tableElement(Namespace.VALUE, ROWTIME_NAME.name(), new Type(SqlTypes.BIGINT))), + TableElements.of(tableElement(VALUE, ROWTIME_NAME.name(), new Type(BIGINT))), true, withProperties ); @@ -750,7 +753,7 @@ public void shouldThrowOnRowTimeKeyColumn() { // Given: final CreateStream statement = new CreateStream( SOME_NAME, - TableElements.of(tableElement(Namespace.KEY, ROWTIME_NAME.name(), new Type(SqlTypes.BIGINT))), + TableElements.of(tableElement(Namespace.KEY, ROWTIME_NAME.name(), new Type(BIGINT))), true, withProperties ); @@ -763,6 +766,42 @@ public void shouldThrowOnRowTimeKeyColumn() { createSourceFactory.createStreamCommand(statement, ksqlConfig); } + @Test + public void shouldThrowOnWindowStartValueColumn() { + // Given: + final CreateStream statement = new CreateStream( + SOME_NAME, + TableElements.of(tableElement(VALUE, WINDOWSTART_NAME.name(), new Type(BIGINT))), + true, + withProperties + ); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("'WINDOWSTART' is a reserved column name."); + + // When: + createSourceFactory.createStreamCommand(statement, ksqlConfig); + } + + @Test + public void shouldThrowOnWindowEndValueColumn() { + // Given: + final CreateStream statement = new CreateStream( + SOME_NAME, + TableElements.of(tableElement(VALUE, WINDOWEND_NAME.name(), new Type(BIGINT))), + true, + withProperties + ); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("'WINDOWEND' is a reserved column name."); + + // When: + createSourceFactory.createStreamCommand(statement, ksqlConfig); + } + @Test public void shouldThrowOnRowKeyValueColumn() { // Given: diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateSourceCommand.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateSourceCommand.java index 60c775f2d395..ee3f8f7cab77 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateSourceCommand.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/ddl/commands/CreateSourceCommand.java @@ -92,9 +92,8 @@ public Optional getWindowInfo() { } private static void validate(final LogicalSchema schema, final Optional keyField) { - if (schema.findValueColumn(ColumnRef.of(SchemaUtil.ROWKEY_NAME)).isPresent() - || schema.findValueColumn(ColumnRef.of(SchemaUtil.ROWTIME_NAME)).isPresent()) { - throw new IllegalArgumentException("Schema contains implicit columns in value schema"); + if (schema.valueContainsAny(SchemaUtil.systemColumnNames())) { + throw new IllegalArgumentException("Schema contains system columns in value schema"); } if (schema.key().size() != 1) { diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/ddl/commands/CreateSourceCommandTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/ddl/commands/CreateSourceCommandTest.java index e83462fca350..44adaa7e6c7c 100644 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/ddl/commands/CreateSourceCommandTest.java +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/ddl/commands/CreateSourceCommandTest.java @@ -25,6 +25,7 @@ import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.serde.WindowInfo; import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.SchemaUtil; import java.util.Optional; import org.junit.Rule; import org.junit.Test; @@ -35,17 +36,20 @@ public class CreateSourceCommandTest { private static final SourceName SOURCE_NAME = SourceName.of("bob"); private static final String TOPIC_NAME = "vic"; private static final Formats FORAMTS = mock(Formats.class); + private static final ColumnName K0 = ColumnName.of("k0"); + private static final ColumnName K1 = ColumnName.of("k1"); + private static final ColumnName KEY_FIELD = ColumnName.of("keyField"); @Rule public final ExpectedException expectedException = ExpectedException.none(); - private static final ColumnName KEY_FIELD = ColumnName.of("keyField"); + @Test(expected = UnsupportedOperationException.class) public void shouldThrowOnMultipleKeyColumns() { // Given: final LogicalSchema schema = LogicalSchema.builder() - .keyColumn(ColumnName.of("k0"), SqlTypes.STRING) - .keyColumn(ColumnName.of("k1"), SqlTypes.STRING) + .keyColumn(K0, SqlTypes.STRING) + .keyColumn(K1, SqlTypes.STRING) .build(); // When: @@ -66,7 +70,7 @@ public void shouldThrowIfKeyFieldDoesNotMatchRowKeyType() { final ColumnName keyField = ColumnName.of("keyField"); final LogicalSchema schema = LogicalSchema.builder() - .keyColumn(ColumnName.of("k0"), SqlTypes.INTEGER) + .keyColumn(K0, SqlTypes.INTEGER) .valueColumn(keyField, SqlTypes.STRING) .build(); @@ -96,7 +100,7 @@ public void shouldThrowIfKeyFieldDoesNotMatchRowKeyType() { public void shouldNotThrowIfKeyFieldMatchesRowKeyType() { // Given: final LogicalSchema schema = LogicalSchema.builder() - .keyColumn(ColumnName.of("k0"), SqlTypes.INTEGER) + .keyColumn(K0, SqlTypes.INTEGER) .valueColumn(KEY_FIELD, SqlTypes.INTEGER) .build(); @@ -114,6 +118,54 @@ public void shouldNotThrowIfKeyFieldMatchesRowKeyType() { // Then: builds without error } + @Test + public void shouldThrowOnWindowStartColumn() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.INTEGER) + .valueColumn(SchemaUtil.WINDOWSTART_NAME, SqlTypes.INTEGER) + .build(); + + // Expect: + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Schema contains system columns in value schema"); + + // When: + new TestCommand( + SOURCE_NAME, + schema, + Optional.empty(), + Optional.empty(), + TOPIC_NAME, + FORAMTS, + Optional.empty() + ); + } + + @Test + public void shouldThrowOnWindowEndColumn() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.INTEGER) + .valueColumn(SchemaUtil.WINDOWEND_NAME, SqlTypes.INTEGER) + .build(); + + // Expect: + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Schema contains system columns in value schema"); + + // When: + new TestCommand( + SOURCE_NAME, + schema, + Optional.empty(), + Optional.empty(), + TOPIC_NAME, + FORAMTS, + Optional.empty() + ); + } + private static final class TestCommand extends CreateSourceCommand { TestCommand( diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json index ddbbf25eb14c..8f49b7297d23 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json @@ -399,7 +399,29 @@ } }, { - "name": "non-join leaves aliased ROWKEY and ROWTIME in output's value schema", + "name": "non-join should reject WINDOWSTART in projection", + "statements": [ + "CREATE STREAM INPUT (F0 INT) WITH (kafka_topic='input', value_format='JSON', window_type='session');", + "CREATE STREAM OUTPUT AS SELECT WINDOWSTART FROM INPUT;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlException", + "message": "Reserved column name in select: `WINDOWSTART`. Please remove or alias the column." + } + }, + { + "name": "non-join should reject WINDOWEND in projection", + "statements": [ + "CREATE STREAM INPUT (F0 INT) WITH (kafka_topic='input', value_format='JSON', window_type='session');", + "CREATE STREAM OUTPUT AS SELECT WINDOWEND FROM INPUT;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlException", + "message": "Reserved column name in select: `WINDOWEND`. Please remove or alias the column." + } + }, + { + "name": "non-join leaves aliased system columns in output's value schema", "statements": [ "CREATE STREAM INPUT (F0 INT) WITH (kafka_topic='input', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT F0, ROWTIME AS TIME, ROWKEY AS KEY FROM INPUT;" @@ -432,17 +454,41 @@ } }, { - "name": "join leaves aliased ROWKEY and ROWTIME in output's value schema", + "name": "join should reject WINDOWSTART in projection", "statements": [ - "CREATE STREAM LEFT_STREAM (F0 INT) WITH (kafka_topic='left', value_format='JSON');", - "CREATE STREAM RIGHT_STREAM (F1 INT) WITH (kafka_topic='right', value_format='JSON');", + "CREATE STREAM LEFT_STREAM (F0 INT) WITH (kafka_topic='left', value_format='JSON', window_type='tumbling', window_size='1 second');", + "CREATE STREAM RIGHT_STREAM (F1 INT) WITH (kafka_topic='right', value_format='JSON', window_type='tumbling', window_size='1 second');", + "CREATE STREAM OUTPUT as SELECT l.WINDOWSTART AS WINDOWSTART, f0 FROM left_stream l join right_stream r WITHIN 1 seconds ON l.rowkey = r.rowkey;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlException", + "message": "Reserved column name in select: `WINDOWSTART`. Please remove or alias the column." + } + }, + { + "name": "join should reject WINDOWEND in projection", + "statements": [ + "CREATE STREAM LEFT_STREAM (F0 INT) WITH (kafka_topic='left', value_format='JSON', window_type='tumbling', window_size='1 second');", + "CREATE STREAM RIGHT_STREAM (F1 INT) WITH (kafka_topic='right', value_format='JSON', window_type='tumbling', window_size='1 second');", + "CREATE STREAM OUTPUT as SELECT l.WINDOWEND AS WINDOWEND, f0 FROM left_stream l join right_stream r WITHIN 1 seconds ON l.rowkey = r.rowkey;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlException", + "message": "Reserved column name in select: `WINDOWEND`. Please remove or alias the column." + } + }, + { + "name": "join leaves aliased system columns in output's value schema", + "statements": [ + "CREATE STREAM LEFT_STREAM (F0 INT) WITH (kafka_topic='left', value_format='JSON', window_type='tumbling', window_size='1 second');", + "CREATE STREAM RIGHT_STREAM (F1 INT) WITH (kafka_topic='right', value_format='JSON', window_type='tumbling', window_size='1 second');", "CREATE STREAM OUTPUT as SELECT l.ROWTIME AS TIME, l.ROWKEY AS KEY, f0, f1 FROM left_stream l join right_stream r WITHIN 1 seconds ON l.rowkey = r.rowkey;" ], "inputs": [ - {"topic": "left", "key": "k", "value": {"F0": 4}, "timestamp": 1}, - {"topic": "right", "key": "k", "value": {"F1": 6}, "timestamp": 1} + {"topic": "left", "key": "k", "value": {"F0": 4}, "timestamp": 1, "window": {"start": 0, "end": 1000, "type": "time"}}, + {"topic": "right", "key": "k", "value": {"F1": 6}, "timestamp": 2, "window": {"start": 0, "end": 1000, "type": "time"}} ], - "outputs": [{"topic": "OUTPUT", "key": "k", "value": {"F0": 4, "F1": 6, "TIME": 1, "KEY": "k"}, "timestamp": 1}] + "outputs": [{"topic": "OUTPUT", "key": "k", "value": {"F0": 4, "F1": 6, "TIME": 1, "KEY": "k : Window{start=0 end=-}"}, "timestamp": 2, "window": {"start": 0, "end": 1000, "type": "time"}}] }, { "name": "group-by rejects ROWKEY in projection", @@ -455,6 +501,17 @@ "message": "Reserved column name in select: `ROWKEY`. Please remove or alias the column." } }, + { + "name": "group-by rejects window bounds in projection", + "statements": [ + "CREATE STREAM INPUT (F0 INT) WITH (kafka_topic='input', value_format='JSON');", + "CREATE TABLE OUTPUT AS SELECT WINDOWSTART, COUNT(*) AS COUNT FROM INPUT GROUP BY WINDOWSTART;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlException", + "message": "Reserved column name in select: `WINDOWSTART`. Please remove or alias the column." + } + }, { "name": "non-join qualified select star", "statements": [ diff --git a/ksql-metastore/src/main/java/io/confluent/ksql/metastore/model/StructuredDataSource.java b/ksql-metastore/src/main/java/io/confluent/ksql/metastore/model/StructuredDataSource.java index e29336eb5855..033797829d25 100644 --- a/ksql-metastore/src/main/java/io/confluent/ksql/metastore/model/StructuredDataSource.java +++ b/ksql-metastore/src/main/java/io/confluent/ksql/metastore/model/StructuredDataSource.java @@ -22,7 +22,6 @@ import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.execution.timestamp.TimestampColumn; import io.confluent.ksql.name.SourceName; -import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.util.SchemaUtil; @@ -64,9 +63,8 @@ abstract class StructuredDataSource implements DataSource { this.serdeOptions = ImmutableSet.copyOf(requireNonNull(serdeOptions, "serdeOptions")); this.casTarget = casTarget; - if (schema.findValueColumn(ColumnRef.of(SchemaUtil.ROWKEY_NAME)).isPresent() - || schema.findValueColumn(ColumnRef.of(SchemaUtil.ROWTIME_NAME)).isPresent()) { - throw new IllegalArgumentException("Schema contains implicit columns in value schema"); + if (schema.valueContainsAny(SchemaUtil.systemColumnNames())) { + throw new IllegalArgumentException("Schema contains system columns in value schema"); } } diff --git a/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java b/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java index 85b509555d23..3eaaecaf59c0 100644 --- a/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java +++ b/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java @@ -83,6 +83,36 @@ public void shouldThrowIfSchemaContainsRowKey() { ); } + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfSchemaContainsWindowStart() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .valueColumn(SchemaUtil.WINDOWSTART_NAME, SqlTypes.STRING) + .valueColumn(ColumnName.of("f0"), SqlTypes.BIGINT) + .build(); + + // When: + new TestStructuredDataSource( + schema, + keyField + ); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfSchemaContainsWindowEnd() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .valueColumn(SchemaUtil.WINDOWEND_NAME, SqlTypes.STRING) + .valueColumn(ColumnName.of("f0"), SqlTypes.BIGINT) + .build(); + + // When: + new TestStructuredDataSource( + schema, + keyField + ); + } + /** * Test class to allow the abstract base class to be instantiated. */ diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/SingleColumn.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/SingleColumn.java index 413bc5f41a75..0ded91929e27 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/SingleColumn.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/SingleColumn.java @@ -46,8 +46,8 @@ public SingleColumn( ) { super(location); - checkForReservedToken(expression, alias, SchemaUtil.ROWTIME_NAME); - checkForReservedToken(expression, alias, SchemaUtil.ROWKEY_NAME); + SchemaUtil.systemColumnNames() + .forEach(columnName -> checkForReservedToken(expression, alias, columnName)); this.expression = requireNonNull(expression, "expression"); this.alias = requireNonNull(alias, "alias"); diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/SingleColumnTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/SingleColumnTest.java new file mode 100644 index 000000000000..c8b64720a9d1 --- /dev/null +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/SingleColumnTest.java @@ -0,0 +1,45 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser.tree; + +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.StringLiteral; +import io.confluent.ksql.parser.NodeLocation; +import io.confluent.ksql.parser.exception.ParseFailedException; +import io.confluent.ksql.util.SchemaUtil; +import java.util.Optional; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class SingleColumnTest { + + private static final Optional A_LOCATION = Optional.empty(); + private static final Expression AN_EXPRESSION = new StringLiteral("foo"); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Test + public void shouldThrowIfAliasIsSystemColumnName() { + // Expect: + expectedException.expect(ParseFailedException.class); + expectedException.expectMessage("is a reserved token for implicit column."); + + // When: + new SingleColumn(A_LOCATION, AN_EXPRESSION, Optional.of(SchemaUtil.WINDOWSTART_NAME)); + } +} \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/TableRowsEntityFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/TableRowsEntityFactory.java index 87aed81d40f8..cb4147f2db5c 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/TableRowsEntityFactory.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/TableRowsEntityFactory.java @@ -18,10 +18,10 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.execution.streams.materialization.TableRow; import io.confluent.ksql.model.WindowType; -import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.util.SchemaUtil; import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -37,12 +37,12 @@ public final class TableRowsEntityFactory { @SuppressWarnings("deprecation") private static final List TIME_WINDOW_COLUMNS = ImmutableList - .of(Column.legacySystemWindowColumn(ColumnName.of("WINDOWSTART"), SqlTypes.BIGINT)); + .of(Column.legacySystemWindowColumn(SchemaUtil.WINDOWSTART_NAME, SqlTypes.BIGINT)); @SuppressWarnings("deprecation") private static final List SESSION_WINDOW_COLUMNS = ImmutableList.builder() .addAll(TIME_WINDOW_COLUMNS) - .add(Column.legacySystemWindowColumn(ColumnName.of("WINDOWEND"), SqlTypes.BIGINT)) + .add(Column.legacySystemWindowColumn(SchemaUtil.WINDOWEND_NAME, SqlTypes.BIGINT)) .build(); private TableRowsEntityFactory() {