Skip to content

Commit

Permalink
Fix: Consistency between inserted rowkey and key values when insertin…
Browse files Browse the repository at this point in the history
…g into a table with a key (#3381)

* fix:#3347 If the table has an key and a value for it has been provided in an insert then make sure rowkey gets the string form of it

* fix: When ROWKEY and ID are both specified in insert into, then check rowkey is string form of id

* fix: Add a bunch of RQTT tests to test combinations of inserts with id and rowkey when stream has key

* fix:nit

* fix:nit

* fix:nits
  • Loading branch information
purplefox authored Sep 18, 2019
1 parent 9051f9b commit a909737
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,15 @@ private static void handleExplicitKeyField(
final Object rowKeyValue = values.get(SchemaUtil.ROWKEY_NAME);

if (keyValue != null ^ rowKeyValue != null) {
values.putIfAbsent(key, rowKeyValue);
values.putIfAbsent(SchemaUtil.ROWKEY_NAME, keyValue);
} else if (!Objects.equals(keyValue, rowKeyValue)) {
throw new KsqlException(
String.format(
"Expected ROWKEY and %s to match but got %s and %s respectively.",
key, rowKeyValue, keyValue));
if (keyValue == null) {
values.put(key, rowKeyValue);
} else {
values.put(SchemaUtil.ROWKEY_NAME, keyValue.toString());
}
} else if (keyValue != null && !Objects.equals(keyValue.toString(), rowKeyValue)) {
throw new KsqlException(String.format(
"Expected ROWKEY and %s to match but got %s and %s respectively.",
key, rowKeyValue, keyValue));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,138 @@
{"topic": "test_topic", "key": "10", "value": {"ID": null}}
]
},
{
"name": "rowkey should be set when stream has int key and only key specified in insert",
"statements": [
"CREATE STREAM TEST (ID INT) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');",
"INSERT INTO TEST (ID) VALUES (10);"
],
"inputs": [
],
"outputs": [
{"topic": "test_topic", "key": "10", "value": {"ID": 10}}
]
},
{
"name": "rowkey should be set when stream has String key and only key specified in insert",
"statements": [
"CREATE STREAM TEST (ID VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');",
"INSERT INTO TEST (ID) VALUES ('10');"
],
"inputs": [
],
"outputs": [
{"topic": "test_topic", "key": "10", "value": {"ID": "10"}}
]
},
{
"name": "rowkey should be set when stream has double key and only key specified in insert",
"statements": [
"CREATE STREAM TEST (ID DOUBLE) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');",
"INSERT INTO TEST (ID) VALUES (1.23);"
],
"inputs": [
],
"outputs": [
{"topic": "test_topic", "key": "1.23", "value": {"ID": 1.23}}
]
},
{
"name": "rowkey should be set when stream has bigint key and only key specified in insert",
"statements": [
"CREATE STREAM TEST (ID BIGINT) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');",
"INSERT INTO TEST (ID) VALUES (10);"
],
"inputs": [
],
"outputs": [
{"topic": "test_topic", "key": "10", "value": {"ID": 10}}
]
},
{
"name": "rowkey should be set when stream has boolean key and only key specified in insert",
"statements": [
"CREATE STREAM TEST (ID BOOLEAN) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');",
"INSERT INTO TEST (ID) VALUES (TRUE);"
],
"inputs": [
],
"outputs": [
{"topic": "test_topic", "key": "true", "value": {"ID": true}}
]
},
{
"name": "keyfield should be set when stream has string key and only rowkey specified in insert",
"statements": [
"CREATE STREAM TEST (ID STRING) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');",
"INSERT INTO TEST (ROWKEY) VALUES ('10');"
],
"inputs": [
],
"outputs": [
{"topic": "test_topic", "key": "10", "value": {"ID": "10"}}
]
},
{
"name": "rowkey and key should match when stream has int key",
"statements": [
"CREATE STREAM TEST (ID INT) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');",
"INSERT INTO TEST (ROWKEY, ID) VALUES ('10', 10);"
],
"inputs": [
],
"outputs": [
{"topic": "test_topic", "key": "10", "value": {"ID": 10}}
]
},
{
"name": "rowkey and key should match when stream has String key",
"statements": [
"CREATE STREAM TEST (ID STRING) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');",
"INSERT INTO TEST (ROWKEY, ID) VALUES ('10', '10');"
],
"inputs": [
],
"outputs": [
{"topic": "test_topic", "key": "10", "value": {"ID": "10"}}
]
},
{
"name": "rowkey and key should match when stream has double key",
"statements": [
"CREATE STREAM TEST (ID DOUBLE) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');",
"INSERT INTO TEST (ROWKEY, ID) VALUES ('1.23', 1.23);"
],
"inputs": [
],
"outputs": [
{"topic": "test_topic", "key": "1.23", "value": {"ID": 1.23}}
]
},
{
"name": "rowkey and key should match when stream has bigint key",
"statements": [
"CREATE STREAM TEST (ID BIGINT) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');",
"INSERT INTO TEST (ROWKEY, ID) VALUES ('10', 10);"
],
"inputs": [
],
"outputs": [
{"topic": "test_topic", "key": "10", "value": {"ID": 10}}
]
},
{
"name": "rowkey and key should match when stream has boolean key",
"statements": [
"CREATE STREAM TEST (ID BOOLEAN) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');",
"INSERT INTO TEST (ROWKEY, ID) VALUES ('true', true);"
],
"inputs": [
],
"outputs": [
{"topic": "test_topic", "key": "true", "value": {"ID": true}}
]
},
{
"name": "should fail on mismatch between explicit columns and value counts",
"statements": [
Expand All @@ -77,6 +209,18 @@
"message": "Failed to prepare statement: Expected number columns and values to match: [ROWKEY, ID], ['10']",
"status": 400
}
},
{
"name": "should fail on mismatch between rowkey and key values when stream has key",
"statements": [
"CREATE STREAM TEST (ID INT) WITH (kafka_topic='test_topic', value_format='JSON', key='ID');",
"INSERT INTO TEST (ROWKEY, ID) VALUES ('10', 5);"
],
"expectedError": {
"type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage",
"message": "Failed to insert values into 'TEST'. Expected ROWKEY and ID to match but got 10 and 5 respectively.",
"status": 400
}
}
]
}

0 comments on commit a909737

Please sign in to comment.