Skip to content

Commit

Permalink
fix: reserve WINDOWSTART and WINDOWEND as system column names (#4388
Browse files Browse the repository at this point in the history
)

* fix: reserve `WINDOWSTART` and `WINDOWEND` as system column names

BREAKING CHANGE: `WINDOWSTART` and `WINDOWEND` are now reserved system column names. Any query that previously used those names will need to be changed: for example, alias the columns to a different name.

These column names are being reserved for use as system columns when dealing with streams and tables that have a windowed key.
  • Loading branch information
big-andy-coates authored Jan 27, 2020
1 parent b372af6 commit ea0a0ac
Show file tree
Hide file tree
Showing 16 changed files with 314 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ public Optional<Column> findValueColumn(final ColumnRef columnRef) {
return findColumnMatching(withNamespace(Namespace.VALUE).and(withRef(columnRef)));
}

/**
* Checks to see if value namespace contain any of the supplied names.
*
* @param names the names to check for.
* @return {@code true} if <i>any</i> of the supplied names exist in the value namespace.
*/
public boolean valueContainsAny(final Set<ColumnName> names) {
return value().stream()
.map(Column::name)
.anyMatch(names::contains);
}

/**
* Copies metadata and key columns to the value schema.
*
Expand Down
12 changes: 11 additions & 1 deletion ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,14 @@ public final class SchemaUtil {
public static final ColumnName ROWKEY_NAME = ColumnName.of("ROWKEY");
public static final ColumnName ROWTIME_NAME = ColumnName.of("ROWTIME");
public static final ColumnName WINDOWSTART_NAME = ColumnName.of("WINDOWSTART");
public static final ColumnName WINDOWEND_NAME = ColumnName.of("WINDOWEND");

public static final int ROWKEY_INDEX = 1;
private static final Set<ColumnName> SYSTEM_COLUMN_NAMES = ImmutableSet.of(
ROWKEY_NAME,
ROWTIME_NAME,
WINDOWSTART_NAME,
WINDOWEND_NAME
);

private static final Set<Schema.Type> ARITHMETIC_TYPES = ImmutableSet.of(
Type.INT8,
Expand All @@ -62,6 +68,10 @@ public final class SchemaUtil {
private SchemaUtil() {
}

public static Set<ColumnName> systemColumnNames() {
return SYSTEM_COLUMN_NAMES;
}

// Do Not use in new code - use `SchemaConverters` directly.
public static Class<?> getJavaType(final Schema schema) {
return SchemaConverters.sqlToJavaConverter().toJavaType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;

import com.google.common.collect.ImmutableSet;
import com.google.common.testing.EqualsTester;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.schema.ksql.Column.Namespace;
Expand Down Expand Up @@ -632,6 +633,33 @@ public void shouldSupportCopyingColumnsFromOtherSchemas() {
));
}

@Test
public void shouldMatchAnyValueSchema() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.valueColumn(F0, STRING)
.keyColumn(K0, BIGINT)
.valueColumn(F1, BIGINT)
.build();

// Then:
assertThat(schema.valueContainsAny(ImmutableSet.of(F0)), is(true));
assertThat(schema.valueContainsAny(ImmutableSet.of(V0, F0, F1, V1)), is(true));
}

@Test
public void shouldOnlyMatchValueSchema() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.valueColumn(F0, STRING)
.keyColumn(K0, BIGINT)
.valueColumn(F1, BIGINT)
.build();

// Then:
assertThat(schema.valueContainsAny(ImmutableSet.of(K0, V0, ROWTIME_NAME)), is(false));
}

private static org.apache.kafka.connect.data.Field connectField(
final String fieldName,
final int index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,7 @@ public void validate() {

private void addSelectItem(final Expression exp, final ColumnName columnName) {
if (persistent) {
if (SchemaUtil.ROWTIME_NAME.equals(columnName)
|| SchemaUtil.ROWKEY_NAME.equals(columnName)) {
if (SchemaUtil.systemColumnNames().contains(columnName)) {
throw new KsqlException("Reserved column name in select: " + columnName + ". "
+ "Please remove or alias the column.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ private static LogicalSchema buildSchema(final TableElements tableElements) {
}

tableElements.forEach(e -> {
if (e.getName().equals(SchemaUtil.ROWTIME_NAME)) {
final boolean isRowKey = e.getName().equals(SchemaUtil.ROWKEY_NAME);

if (!isRowKey && SchemaUtil.systemColumnNames().contains(e.getName())) {
throw new KsqlException("'" + e.getName().name() + "' is a reserved column name.");
}

final boolean isRowKey = e.getName().equals(SchemaUtil.ROWKEY_NAME);

if (e.getNamespace() == Namespace.KEY) {
if (!isRowKey) {
throw new KsqlException("'" + e.getName().name() + "' is an invalid KEY column name. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,7 @@ private AggregateNode buildAggregateNode(final PlanNode sourcePlanNode) {
: null;

final Optional<ColumnName> keyFieldName = getSelectAliasMatching((expression, alias) ->
expression.equals(groupBy)
&& !SchemaUtil.isFieldName(alias.name(), SchemaUtil.ROWTIME_NAME.name())
&& !SchemaUtil.isFieldName(alias.name(), SchemaUtil.ROWKEY_NAME.name()),
expression.equals(groupBy) && !SchemaUtil.systemColumnNames().contains(alias),
sourcePlanNode.getSelectExpressions());

return new AggregateNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ KeyField findKeyField(final List<SelectExpression> selectExpressions) {
}

final Optional<ColumnRef> filtered = found
.filter(f -> !SchemaUtil.isFieldName(f.name().name(), SchemaUtil.ROWTIME_NAME.name()))
.filter(f -> !SchemaUtil.isFieldName(f.name().name(), SchemaUtil.ROWKEY_NAME.name()))
// System columns can not be key fields:
.filter(f -> !SchemaUtil.systemColumnNames().contains(f.name()))
.map(Column::ref);

return KeyField.of(filtered);
Expand Down Expand Up @@ -326,14 +326,14 @@ public SchemaKStream<Struct> selectKey(
final Expression keyExpression,
final QueryContext.Stacker contextStacker
) {
if (keyFormat.isWindowed()) {
throw new UnsupportedOperationException("Can not selectKey of windowed stream");
}

if (!needsRepartition(keyExpression)) {
return (SchemaKStream<Struct>) this;
}

if (keyFormat.isWindowed()) {
throw new UnsupportedOperationException("Can not selectKey of windowed stream");
}

final StreamSelectKey step = ExecutionStepFactory.streamSelectKey(
contextStacker,
sourceStep,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import static io.confluent.ksql.parser.tree.TableElement.Namespace.KEY;
import static io.confluent.ksql.parser.tree.TableElement.Namespace.VALUE;
import static io.confluent.ksql.schema.ksql.ColumnMatchers.keyColumn;
import static io.confluent.ksql.schema.ksql.types.SqlTypes.BIGINT;
import static io.confluent.ksql.serde.Format.AVRO;
import static io.confluent.ksql.serde.Format.JSON;
import static io.confluent.ksql.serde.Format.KAFKA;
import static io.confluent.ksql.util.SchemaUtil.ROWKEY_NAME;
import static io.confluent.ksql.util.SchemaUtil.ROWTIME_NAME;
import static io.confluent.ksql.util.SchemaUtil.WINDOWEND_NAME;
import static io.confluent.ksql.util.SchemaUtil.WINDOWSTART_NAME;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -94,10 +97,10 @@ public class CreateSourceFactoryTest {
tableElement(Namespace.KEY, ROWKEY_NAME.name(), new Type(SqlTypes.INTEGER));

private static final TableElement ELEMENT1 =
tableElement(Namespace.VALUE, "bob", new Type(SqlTypes.STRING));
tableElement(VALUE, "bob", new Type(SqlTypes.STRING));

private static final TableElement ELEMENT2 =
tableElement(Namespace.VALUE, "hojjat", new Type(SqlTypes.BIGINT));
tableElement(VALUE, "hojjat", new Type(BIGINT));

private static final TableElements ONE_ELEMENTS = TableElements.of(ELEMENT1);

Expand All @@ -107,7 +110,7 @@ public class CreateSourceFactoryTest {
private static final LogicalSchema EXPECTED_SCHEMA = LogicalSchema.builder()
.keyColumn(ROWKEY_NAME, SqlTypes.INTEGER)
.valueColumn(ColumnName.of("bob"), SqlTypes.STRING)
.valueColumn(ColumnName.of("hojjat"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("hojjat"), BIGINT)
.build();

private static final String TOPIC_NAME = "some topic";
Expand Down Expand Up @@ -186,8 +189,8 @@ public void shouldCreateCommandForCreateTable() {
// Given:
final CreateTable ddlStatement = new CreateTable(SOME_NAME,
TableElements.of(
tableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)),
tableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))),
tableElement(VALUE, "COL1", new Type(BIGINT)),
tableElement(VALUE, "COL2", new Type(SqlTypes.STRING))),
true, withProperties);

// When:
Expand Down Expand Up @@ -578,7 +581,7 @@ public void shouldBuildSchemaWithExplicitKeyFieldForStream() {
assertThat(result.getSchema(), is(LogicalSchema.builder()
.keyColumn(ColumnName.of("ROWKEY"), SqlTypes.STRING)
.valueColumn(ColumnName.of("bob"), SqlTypes.STRING)
.valueColumn(ColumnName.of("hojjat"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("hojjat"), BIGINT)
.build()
));
}
Expand Down Expand Up @@ -732,7 +735,7 @@ public void shouldThrowOnRowTimeValueColumn() {
// Given:
final CreateStream statement = new CreateStream(
SOME_NAME,
TableElements.of(tableElement(Namespace.VALUE, ROWTIME_NAME.name(), new Type(SqlTypes.BIGINT))),
TableElements.of(tableElement(VALUE, ROWTIME_NAME.name(), new Type(BIGINT))),
true,
withProperties
);
Expand All @@ -750,7 +753,7 @@ public void shouldThrowOnRowTimeKeyColumn() {
// Given:
final CreateStream statement = new CreateStream(
SOME_NAME,
TableElements.of(tableElement(Namespace.KEY, ROWTIME_NAME.name(), new Type(SqlTypes.BIGINT))),
TableElements.of(tableElement(Namespace.KEY, ROWTIME_NAME.name(), new Type(BIGINT))),
true,
withProperties
);
Expand All @@ -763,6 +766,42 @@ public void shouldThrowOnRowTimeKeyColumn() {
createSourceFactory.createStreamCommand(statement, ksqlConfig);
}

@Test
public void shouldThrowOnWindowStartValueColumn() {
// Given:
final CreateStream statement = new CreateStream(
SOME_NAME,
TableElements.of(tableElement(VALUE, WINDOWSTART_NAME.name(), new Type(BIGINT))),
true,
withProperties
);

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("'WINDOWSTART' is a reserved column name.");

// When:
createSourceFactory.createStreamCommand(statement, ksqlConfig);
}

@Test
public void shouldThrowOnWindowEndValueColumn() {
// Given:
final CreateStream statement = new CreateStream(
SOME_NAME,
TableElements.of(tableElement(VALUE, WINDOWEND_NAME.name(), new Type(BIGINT))),
true,
withProperties
);

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("'WINDOWEND' is a reserved column name.");

// When:
createSourceFactory.createStreamCommand(statement, ksqlConfig);
}

@Test
public void shouldThrowOnRowKeyValueColumn() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@ public Optional<WindowInfo> getWindowInfo() {
}

private static void validate(final LogicalSchema schema, final Optional<ColumnName> keyField) {
if (schema.findValueColumn(ColumnRef.of(SchemaUtil.ROWKEY_NAME)).isPresent()
|| schema.findValueColumn(ColumnRef.of(SchemaUtil.ROWTIME_NAME)).isPresent()) {
throw new IllegalArgumentException("Schema contains implicit columns in value schema");
if (schema.valueContainsAny(SchemaUtil.systemColumnNames())) {
throw new IllegalArgumentException("Schema contains system columns in value schema");
}

if (schema.key().size() != 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import java.util.Optional;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -35,17 +36,20 @@ public class CreateSourceCommandTest {
private static final SourceName SOURCE_NAME = SourceName.of("bob");
private static final String TOPIC_NAME = "vic";
private static final Formats FORAMTS = mock(Formats.class);
private static final ColumnName K0 = ColumnName.of("k0");
private static final ColumnName K1 = ColumnName.of("k1");
private static final ColumnName KEY_FIELD = ColumnName.of("keyField");

@Rule
public final ExpectedException expectedException = ExpectedException.none();
private static final ColumnName KEY_FIELD = ColumnName.of("keyField");


@Test(expected = UnsupportedOperationException.class)
public void shouldThrowOnMultipleKeyColumns() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.keyColumn(ColumnName.of("k0"), SqlTypes.STRING)
.keyColumn(ColumnName.of("k1"), SqlTypes.STRING)
.keyColumn(K0, SqlTypes.STRING)
.keyColumn(K1, SqlTypes.STRING)
.build();

// When:
Expand All @@ -66,7 +70,7 @@ public void shouldThrowIfKeyFieldDoesNotMatchRowKeyType() {
final ColumnName keyField = ColumnName.of("keyField");

final LogicalSchema schema = LogicalSchema.builder()
.keyColumn(ColumnName.of("k0"), SqlTypes.INTEGER)
.keyColumn(K0, SqlTypes.INTEGER)
.valueColumn(keyField, SqlTypes.STRING)
.build();

Expand Down Expand Up @@ -96,7 +100,7 @@ public void shouldThrowIfKeyFieldDoesNotMatchRowKeyType() {
public void shouldNotThrowIfKeyFieldMatchesRowKeyType() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.keyColumn(ColumnName.of("k0"), SqlTypes.INTEGER)
.keyColumn(K0, SqlTypes.INTEGER)
.valueColumn(KEY_FIELD, SqlTypes.INTEGER)
.build();

Expand All @@ -114,6 +118,54 @@ public void shouldNotThrowIfKeyFieldMatchesRowKeyType() {
// Then: builds without error
}

@Test
public void shouldThrowOnWindowStartColumn() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.INTEGER)
.valueColumn(SchemaUtil.WINDOWSTART_NAME, SqlTypes.INTEGER)
.build();

// Expect:
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Schema contains system columns in value schema");

// When:
new TestCommand(
SOURCE_NAME,
schema,
Optional.empty(),
Optional.empty(),
TOPIC_NAME,
FORAMTS,
Optional.empty()
);
}

@Test
public void shouldThrowOnWindowEndColumn() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.INTEGER)
.valueColumn(SchemaUtil.WINDOWEND_NAME, SqlTypes.INTEGER)
.build();

// Expect:
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Schema contains system columns in value schema");

// When:
new TestCommand(
SOURCE_NAME,
schema,
Optional.empty(),
Optional.empty(),
TOPIC_NAME,
FORAMTS,
Optional.empty()
);
}

private static final class TestCommand extends CreateSourceCommand {

TestCommand(
Expand Down
Loading

0 comments on commit ea0a0ac

Please sign in to comment.