diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java index 01ee6835ea47..e36c14f0e668 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java @@ -323,13 +323,15 @@ private static void handleExplicitKeyField( final Object rowKeyValue = values.get(SchemaUtil.ROWKEY_NAME); if (keyValue != null ^ rowKeyValue != null) { - values.putIfAbsent(key, rowKeyValue); - values.putIfAbsent(SchemaUtil.ROWKEY_NAME, keyValue); - } else if (!Objects.equals(keyValue, rowKeyValue)) { - throw new KsqlException( - String.format( - "Expected ROWKEY and %s to match but got %s and %s respectively.", - key, rowKeyValue, keyValue)); + if (keyValue == null) { + values.put(key, rowKeyValue); + } else { + values.put(SchemaUtil.ROWKEY_NAME, keyValue.toString()); + } + } else if (keyValue != null && !Objects.equals(keyValue.toString(), rowKeyValue)) { + throw new KsqlException(String.format( + "Expected ROWKEY and %s to match but got %s and %s respectively.", + key, rowKeyValue, keyValue)); } } } diff --git a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json index f15ab0f40d88..e65f7ec74adf 100644 --- a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json +++ b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json @@ -66,6 +66,138 @@ {"topic": "test_topic", "key": "10", "value": {"ID": null}} ] }, + { + "name": "rowkey should be set when stream has int key and only key specified in insert", + "statements": [ + "CREATE STREAM TEST (ID INT) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "INSERT INTO TEST (ID) VALUES (10);" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": "10", "value": {"ID": 10}} + ] + }, + { + "name": "rowkey should be set when stream has String key and only key specified in insert", + "statements": [ + "CREATE STREAM TEST (ID VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "INSERT INTO TEST (ID) VALUES ('10');" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": "10", "value": {"ID": "10"}} + ] + }, + { + "name": "rowkey should be set when stream has double key and only key specified in insert", + "statements": [ + "CREATE STREAM TEST (ID DOUBLE) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "INSERT INTO TEST (ID) VALUES (1.23);" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": "1.23", "value": {"ID": 1.23}} + ] + }, + { + "name": "rowkey should be set when stream has bigint key and only key specified in insert", + "statements": [ + "CREATE STREAM TEST (ID BIGINT) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "INSERT INTO TEST (ID) VALUES (10);" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": "10", "value": {"ID": 10}} + ] + }, + { + "name": "rowkey should be set when stream has boolean key and only key specified in insert", + "statements": [ + "CREATE STREAM TEST (ID BOOLEAN) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "INSERT INTO TEST (ID) VALUES (TRUE);" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": "true", "value": {"ID": true}} + ] + }, + { + "name": "keyfield should be set when stream has string key and only rowkey specified in insert", + "statements": [ + "CREATE STREAM TEST (ID STRING) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "INSERT INTO TEST (ROWKEY) VALUES ('10');" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": "10", "value": {"ID": "10"}} + ] + }, + { + "name": "rowkey and key should match when stream has int key", + "statements": [ + "CREATE STREAM TEST (ID INT) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "INSERT INTO TEST (ROWKEY, ID) VALUES ('10', 10);" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": "10", "value": {"ID": 10}} + ] + }, + { + "name": "rowkey and key should match when stream has String key", + "statements": [ + "CREATE STREAM TEST (ID STRING) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "INSERT INTO TEST (ROWKEY, ID) VALUES ('10', '10');" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": "10", "value": {"ID": "10"}} + ] + }, + { + "name": "rowkey and key should match when stream has double key", + "statements": [ + "CREATE STREAM TEST (ID DOUBLE) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "INSERT INTO TEST (ROWKEY, ID) VALUES ('1.23', 1.23);" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": "1.23", "value": {"ID": 1.23}} + ] + }, + { + "name": "rowkey and key should match when stream has bigint key", + "statements": [ + "CREATE STREAM TEST (ID BIGINT) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "INSERT INTO TEST (ROWKEY, ID) VALUES ('10', 10);" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": "10", "value": {"ID": 10}} + ] + }, + { + "name": "rowkey and key should match when stream has boolean key", + "statements": [ + "CREATE STREAM TEST (ID BOOLEAN) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "INSERT INTO TEST (ROWKEY, ID) VALUES ('true', true);" + ], + "inputs": [ + ], + "outputs": [ + {"topic": "test_topic", "key": "true", "value": {"ID": true}} + ] + }, { "name": "should fail on mismatch between explicit columns and value counts", "statements": [ @@ -77,6 +209,18 @@ "message": "Failed to prepare statement: Expected number columns and values to match: [ROWKEY, ID], ['10']", "status": 400 } + }, + { + "name": "should fail on mismatch between rowkey and key values when stream has key", + "statements": [ + "CREATE STREAM TEST (ID INT) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');", + "INSERT INTO TEST (ROWKEY, ID) VALUES ('10', 5);" + ], + "expectedError": { + "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", + "message": "Failed to insert values into 'TEST'. Expected ROWKEY and ID to match but got 10 and 5 respectively.", + "status": 400 + } } ] } \ No newline at end of file