Skip to content

Commit

Permalink
fix: do not allow inserts into tables with null key
Browse files Browse the repository at this point in the history
Fixes: confluentinc#3021

Tables do not support rows with `null` keys, so we should not allow users to insert rows with `null` keys.
  • Loading branch information
big-andy-coates committed Oct 17, 2019
1 parent 56b46de commit b3a9b14
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.parser.tree.InsertValues;
Expand Down Expand Up @@ -237,6 +238,11 @@ private RowData extractRow(

handleExplicitKeyField(values, dataSource.getKeyField());

if (dataSource.getDataSourceType() == DataSourceType.KTABLE
&& values.get(SchemaUtil.ROWKEY_NAME) == null) {
throw new KsqlException("Value for ROWKEY is required for tables");
}

final long ts = (long) values.getOrDefault(SchemaUtil.ROWTIME_NAME, clock.getAsLong());

final Struct key = buildKey(schema, values);
Expand Down Expand Up @@ -340,6 +346,13 @@ private static void handleExplicitKeyField(
}
}

private static void throwOnTableMissingRowKey(final Map<ColumnName, ?> values) {
final Object rowKeyValue = values.get(SchemaUtil.ROWKEY_NAME);
if (rowKeyValue == null) {
throw new KsqlException("Value for ROWKEY is required for tables");
}
}

private static SqlType columnType(final ColumnName column, final LogicalSchema schema) {
return schema
.findColumn(ColumnRef.withoutSource(column))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
Expand Down Expand Up @@ -95,17 +96,18 @@
@RunWith(MockitoJUnitRunner.class)
public class InsertValuesExecutorTest {

private static final ColumnName COL0 = ColumnName.of("COL0");
private static final LogicalSchema SINGLE_FIELD_SCHEMA = LogicalSchema.builder()
.valueColumn(ColumnName.of("COL0"), SqlTypes.STRING)
.valueColumn(COL0, SqlTypes.STRING)
.build();

private static final LogicalSchema SCHEMA = LogicalSchema.builder()
.valueColumn(ColumnName.of("COL0"), SqlTypes.STRING)
.valueColumn(COL0, SqlTypes.STRING)
.valueColumn(ColumnName.of("COL1"), SqlTypes.BIGINT)
.build();

private static final LogicalSchema BIG_SCHEMA = LogicalSchema.builder()
.valueColumn(ColumnName.of("COL0"), SqlTypes.STRING) // named COL0 for auto-ROWKEY
.valueColumn(COL0, SqlTypes.STRING) // named COL0 for auto-ROWKEY
.valueColumn(ColumnName.of("INT"), SqlTypes.INTEGER)
.valueColumn(ColumnName.of("BIGINT"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("DOUBLE"), SqlTypes.DOUBLE)
Expand Down Expand Up @@ -164,7 +166,7 @@ public void setup() {
when(serviceContext.getKafkaClientSupplier()).thenReturn(kafkaClientSupplier);
when(serviceContext.getSchemaRegistryClientFactory()).thenReturn(srClientFactory);

givenDataSourceWithSchema(SCHEMA, SerdeOption.none(), Optional.of(ColumnName.of("COL0")));
givenSourceStreamWithSchema(SCHEMA, SerdeOption.none(), Optional.of(COL0));

when(valueSerdeFactory.create(any(), any(), any(), any(), any(), any()))
.thenReturn(valueSerde);
Expand Down Expand Up @@ -200,7 +202,7 @@ public void shouldHandleFullRow() {
@Test
public void shouldInsertWrappedSingleField() {
// Given:
givenDataSourceWithSchema(SINGLE_FIELD_SCHEMA, SerdeOption.none(), Optional.of(ColumnName.of("COL0")));
givenSourceStreamWithSchema(SINGLE_FIELD_SCHEMA, SerdeOption.none(), Optional.of(COL0));

final ConfiguredStatement<InsertValues> statement = givenInsertValues(
valueFieldNames(SINGLE_FIELD_SCHEMA),
Expand All @@ -219,8 +221,11 @@ public void shouldInsertWrappedSingleField() {
@Test
public void shouldInsertUnwrappedSingleField() {
// Given:
givenDataSourceWithSchema(SINGLE_FIELD_SCHEMA,
SerdeOption.of(SerdeOption.UNWRAP_SINGLE_VALUES), Optional.of(ColumnName.of("COL0")));
givenSourceStreamWithSchema(
SINGLE_FIELD_SCHEMA,
SerdeOption.of(SerdeOption.UNWRAP_SINGLE_VALUES),
Optional.of(COL0))
;

final ConfiguredStatement<InsertValues> statement = givenInsertValues(
valueFieldNames(SINGLE_FIELD_SCHEMA),
Expand Down Expand Up @@ -398,9 +403,10 @@ public void shouldHandleAllSortsOfLiterals() {
}

@Test
public void shouldHandleNullKey() {
public void shouldHandleNullKeyForSourceWithKeyField() {
// Given:
givenDataSourceWithSchema(BIG_SCHEMA, SerdeOption.none(), Optional.of(ColumnName.of("COL0")));
givenSourceStreamWithSchema(BIG_SCHEMA, SerdeOption.none(), Optional.of(COL0));

final ConfiguredStatement<InsertValues> statement = givenInsertValues(
allFieldNames(BIG_SCHEMA),
ImmutableList.of(
Expand Down Expand Up @@ -431,7 +437,7 @@ public void shouldHandleNullKey() {
@Test
public void shouldAllowUpcast() {
// Given:
givenDataSourceWithSchema(SCHEMA, SerdeOption.none(), Optional.of(ColumnName.of("COL0")));
givenSourceStreamWithSchema(SCHEMA, SerdeOption.none(), Optional.of(COL0));

final ConfiguredStatement<InsertValues> statement = givenInsertValuesStrings(
ImmutableList.of("COL0", "COL1"),
Expand Down Expand Up @@ -580,7 +586,7 @@ public void shouldThrowIfNotEnoughValuesSuppliedWithNoSchema() {
@Test
public void shouldFailOnDowncast() {
// Given:
givenDataSourceWithSchema(BIG_SCHEMA, SerdeOption.none(), Optional.of(ColumnName.of("COL0")));
givenSourceStreamWithSchema(BIG_SCHEMA, SerdeOption.none(), Optional.of(COL0));

final ConfiguredStatement<InsertValues> statement = givenInsertValuesStrings(
ImmutableList.of("INT"),
Expand All @@ -598,9 +604,31 @@ public void shouldFailOnDowncast() {
}

@Test
public void shouldHandleSourcesWithNoKeyField() {
public void shouldHandleStreamsWithNoKeyField() {
// Given:
givenSourceStreamWithSchema(SCHEMA, SerdeOption.none(), Optional.empty());

final ConfiguredStatement<InsertValues> statement = givenInsertValuesStrings(
ImmutableList.of("ROWKEY", "COL0", "COL1"),
ImmutableList.of(
new StringLiteral("key"),
new StringLiteral("str"),
new LongLiteral(2L))
);

// When:
executor.execute(statement, ImmutableMap.of(), engine, serviceContext);

// Then:
verify(keySerializer).serialize(TOPIC_NAME, keyStruct("key"));
verify(valueSerializer).serialize(TOPIC_NAME, new GenericRow(ImmutableList.of("str", 2L)));
verify(producer).send(new ProducerRecord<>(TOPIC_NAME, null, 1L, KEY, VALUE));
}

@Test
public void shouldHandleTablesWithNoKeyField() {
// Given:
givenDataSourceWithSchema(SCHEMA, SerdeOption.none(), Optional.empty());
givenSourceTableWithSchema(SCHEMA, SerdeOption.none(), Optional.empty());

final ConfiguredStatement<InsertValues> statement = givenInsertValuesStrings(
ImmutableList.of("ROWKEY", "COL0", "COL1"),
Expand All @@ -620,9 +648,9 @@ public void shouldHandleSourcesWithNoKeyField() {
}

@Test
public void shouldHandleSourcesWithNoKeyFieldAndNoRowKeyProvided() {
public void shouldHandleStreamsWithNoKeyFieldAndNoRowKeyProvided() {
// Given:
givenDataSourceWithSchema(SCHEMA, SerdeOption.none(), Optional.empty());
givenSourceStreamWithSchema(SCHEMA, SerdeOption.none(), Optional.empty());

final ConfiguredStatement<InsertValues> statement = givenInsertValuesStrings(
ImmutableList.of("COL0", "COL1"),
Expand All @@ -640,6 +668,47 @@ public void shouldHandleSourcesWithNoKeyFieldAndNoRowKeyProvided() {
verify(producer).send(new ProducerRecord<>(TOPIC_NAME, null, 1L, KEY, VALUE));
}

@Test
public void shouldThrowOnTablesWithNoKeyFieldAndNoRowKeyProvided() {
// Given:
givenSourceTableWithSchema(SCHEMA, SerdeOption.none(), Optional.empty());

final ConfiguredStatement<InsertValues> statement = givenInsertValuesStrings(
ImmutableList.of("COL0", "COL1"),
ImmutableList.of(
new StringLiteral("str"),
new LongLiteral(2L))
);

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage(
"Failed to insert values into 'TOPIC'. Value for ROWKEY is required for tables");

// When:
executor.execute(statement, ImmutableMap.of(), engine, serviceContext);
}

@Test
public void shouldThrowOnTablesWithKeyFieldAndNullKeyFieldValueProvided() {
// Given:
givenSourceTableWithSchema(SCHEMA, SerdeOption.none(), Optional.of(COL0));

final ConfiguredStatement<InsertValues> statement = givenInsertValuesStrings(
ImmutableList.of("COL1"),
ImmutableList.of(
new LongLiteral(2L))
);

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage(
"Failed to insert values into 'TOPIC'. Value for ROWKEY is required for tables");

// When:
executor.execute(statement, ImmutableMap.of(), engine, serviceContext);
}

@Test
public void shouldBuildCorrectSerde() {
// Given:
Expand Down Expand Up @@ -694,10 +763,27 @@ private static ConfiguredStatement<InsertValues> givenInsertValues(
);
}

private void givenDataSourceWithSchema(
private void givenSourceStreamWithSchema(
final LogicalSchema schema,
final Set<SerdeOption> serdeOptions,
final Optional<ColumnName> keyField
) {
givenDataSourceWithSchema(schema, serdeOptions, keyField, false);
}

private void givenSourceTableWithSchema(
final LogicalSchema schema,
final Set<SerdeOption> serdeOptions,
final Optional<ColumnName> keyField
) {
givenDataSourceWithSchema(schema, serdeOptions, keyField, true);
}

private void givenDataSourceWithSchema(
final LogicalSchema schema,
final Set<SerdeOption> serdeOptions,
final Optional<ColumnName> keyField,
final boolean table
) {
final KsqlTopic topic = new KsqlTopic(
TOPIC_NAME,
Expand All @@ -711,15 +797,29 @@ private void givenDataSourceWithSchema(
ColumnRef.withoutSource(kf),
schema.findValueColumn(ColumnRef.withoutSource(kf)).get()))
.orElse(KeyField.none());
final DataSource<?> dataSource = new KsqlStream<>(
"",
SourceName.of("TOPIC"),
schema,
serdeOptions,
valueKeyField,
new MetadataTimestampExtractionPolicy(),
topic
);

final DataSource<?> dataSource;
if (table) {
dataSource = new KsqlTable<>(
"",
SourceName.of("TOPIC"),
schema,
serdeOptions,
valueKeyField,
new MetadataTimestampExtractionPolicy(),
topic
);
} else {
dataSource = new KsqlStream<>(
"",
SourceName.of("TOPIC"),
schema,
serdeOptions,
valueKeyField,
new MetadataTimestampExtractionPolicy(),
topic
);
}

final MetaStoreImpl metaStore = new MetaStoreImpl(TestFunctionRegistry.INSTANCE.get());
metaStore.putSource(dataSource);
Expand Down

0 comments on commit b3a9b14

Please sign in to comment.