-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ksqlDB aggressively checks generated schema compatibility in CREATE STREAM statements #5801
Comments
Discussed this offline with @MichaelDrogalis and @colinhicks, there are two steps we're going to take:
I will also check to see if there's a regression from the last CP release (5.5.1) in other potential round-trip schema registry issues such as AVRO unions/enums - which hypothetically would fail down the line. |
It seems that avro backwards compatibility is much looser than JSON backwards compatibility. Here's an enumeration of things I tried on 5.5.1 and whether or not they worked: Register AVRO Schema with lowercase fields: this works, ksqlDB will be able to read fields and if
Register AVRO Schema with Enum Type: this works, ksqlDB will create a
Register JSON Schema with lowercase fields: reading the fields work, writing the fields do not work:
Register an incompatible JSON_SR Stream: 5.5.1 allowed us to create streams over topics with totally incompatible schemas. In the example below, we have registered a schema with CREATE STREAM baz2 (foobar STRING) WITH (kafka_topic='baz', value_format='json_sr');
ksql> INSERT INTO baz2 (foobar) VALUES ('123');
Failed to insert values into 'BAZ2'. Could not serialize row: [ '123' ]
ksql> SELECT * FROM baz2 EMIT CHANGES;
+--------------------------------------------------------------+--------------------------------------------------------------+--------------------------------------------------------------+
|ROWTIME |ROWKEY |FOOBAR |
+--------------------------------------------------------------+--------------------------------------------------------------+--------------------------------------------------------------+
|1594316270888 |null |null | These examples make me think that the schema compatibility model of AVRO is much more permissive than JSON_SR. The biggest regression from 5.5.x is the ability to read JSON_SR schemas that are case sensitive. It probably makes sense to fix this as a targeted fix for now for JSON_SR with case sensitivity. |
One thing to note for AVRO, it looks like we compare the schemas with a different record name, so schema registry will always allow the two schemas to be registered even though they are incompatible: {
"name": "MyRecord",
"namespace": "io.almog",
"type": "record",
"fields": [{
"name": "col1",
"type": "int"
}]
} {
"type": "record",
"name": "KsqlDataSourceSchema",
"namespace": "io.confluent.ksql.avro_schemas",
"fields": [
{
"name": "COL1",
"type": [
"null",
"int"
],
"default": null
}
],
"connect.name": "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema"
} But if the first one is already registered, then the following will not work: {
"name": "MyRecord",
"namespace": "io.almog",
"type": "record",
"fields": [{
"name": "COL1",
"type": "int"
}]
} |
Talked offline with @MichaelDrogalis and @colinhicks - I think we will leave this behavior and instead fix incompatibility issues (such as #5798) one by one. The reasoning is that we can trade off between these two worlds:
Given that it's easier to work towards reducing the items that fall into the first bucket, we will prefer an overly aggressive check and fix the issues one by one. Closing out this ticket as "won't fix, maintain current behavior" |
Thanks @agavra . Just making sure I understand correctly: we are going to leave the behavior where, when you do a
We are proposing to leave this behavior as is, and fix any stricter than necessary compatibility checks which cause step 3 to fail one by one? |
@apurvam your summary is spot on |
Thanks. that makes sense to me. Great summary in the original issue :) |
In light of some new issues surrounding the compatibility check, and the other unknown unknowns that might come up, we've decided to change the behavior to the following:
This behavior will allow us to read anything that we were able to read in 5.5.1 and also fix the issues described in #4219. We are open to issues with |
It feels to me like the right solution to this is being smarter in how we build the schema we want to send to SR to register or check compatibility. If there is no prior existing schema we can build our own, as we currently do, and register it at the time the CREATE statement is issued. If a prior schema does exist, then there are a few points where we care about this:
Point 1.a could be handled by custom code in ksqlDB to validate the user supplied column list with the existing schema. However, it can also be achieved by building a new schema, as explained below, and then using the SR to check compatibility. The later approach leverages existing tech, and is hence the preferred approach. The issues at the moment seem to me to be caused because ksqlDB builds the new Avro/Json/PB schema using only the column definitions stored in the metastore, ignoring the existing schema. If we take into account the existing schema, I think most/all of the issues go away, (though I don't know all the issues in-depth). I think we should look to build the new schema by combining any user supplied set of columns with the existing schema using the following rules: Matching fields/columnsSQL is case insensitive by default. We can preserve the case of field names from the existing schema for any column names that aren't quoted in the CREATE statement. This fixes issues such as #5798. The case-preserved schema should be used in all places. Quoted column names should be case-sensitive when matching field names in the schema. (Note, there's an edge case with the current identifiers implementation that we may choose to fix as part of this: we don't currently know if a fully uppercase column name has been quoted or not. If we don't fix this we'd have to assume any fuller uppercase column name was not quoted, and therefore it would match any the first source field with the same case-insensitive name). Additional fields in schemai.e. Fields in the existing schema that aren't supplied by the user. The CREATE statement should preserve the missing fields in the schema is uses to check compatibility. This allows users to select a view of the data containing only the fields that they care about. We could pass this these fields to the deserializer as well, or may choose to exclude them, allowing smarter deserializers to avoid work locating and coercing fields we don't actually need. Any INSERT INTO statement should additionally check that the data it will produce, i.e. data only containing the fields defined in the more limited view of the schema, is compatible with the full schema. This is probably best done by having each format being able to determine if a specific column in a schema is optional/nullable. It the additional field is not optional, the INSERT INTO statement should fail. The serializer should use the full schema when producing to the topic. This avoids ksqlDB registering a new schema, with only a subset of the fields, which can cause compatibility issues. Additional columnsi.e. Columns supplied by the user that aren't in the existing schema. (Could initially just fail) The CREATE statement can include these columns in the the schema used to check compatibility and passed to the deserializer. We could ignore them for the compatibility check, but I don't think the additional logic gets up any benefit. Any INSERT INTO statement would need to register a new schema with the additional columns defined and pass the schema to the deserializer. Unsupported columnsWhere the source schema contains columns with unsupported types ksqlDB currently just ignores them. The CREATE statement should not ignore them. They should be included in the schema used when validating compatibility. We shouldn't be passing them to the deserializer, (mainly because in the fullness of time the serde interfaces should no use the Connect schema and so would not be able to express any column with an unsupported type). Any INSERT INTO statement should fail if an unsupported column is not nullable, (implemented by each Format knowing how to determine if a field is optional). The serializer should use the full schema, including the unsupported columns. This avoids ksqlDB registering a new schema, with only a subset of fields. Enum columnsEnum columns are weird because we can read them, but can't yet write them. I think Avro enum columns are the only type we read, and we coerce to a string. This coercion likely a thorn in our side. The CREATE statement should use the original enum field definition when checking compatibility. The deserializer should use VARCHAR. Any INSERT INTO statements are more tricky! Personally, until Ksql supports enums I'd be rejecting INSERT INTO where an existing enum field is being written to by ksqlDB. It's just unsafe. Plus we likely want to remove INSERT INTO from ksqlDB in the future, removing this nasty edge case. I believe the current behaviour is on an INSERT INTO would be to register a new schema with a VARCHAR field. This is very wrong IMHO. SinksThe same pattern of taking into account any existing schema can also be used when creating a sink's schema. |
Thanks for the detailed thoughts @big-andy-coates!
This frames the problem nicely, but I want to suggest a different solution: don't allow inline specification of columns in ksql for schemas already registered in schema registry. I think allowing this is really a bug - it introduces a lot of problems (including some more serious, like #5673) and there are compatibility features in some data formats that don't have a sql/ksql equivalent (e.g. The big benefit is that we don't have to reason about compatibility - that's a hard problem and we should delegate it 100% to schema registry. We also don't want to be in the business of doing this per format, which is something we'd need to get into because each format has their own compatibility requirements. We can get away with this if we separate the case-sensitivity from the serialization layer. We can always coerce the |
I like it :D§ |
#5959 - I created an issue to track this, should get around to it sometime soon. |
ksqlDB converts schemas in schema registry to SQL schemas when a
CREATE
statement is issued without a schema. Then, it generates aParsedSchema
from theSQL
schema and checks that the round-trip schema is compatible with the original schema in schema registry.This bug was probably introduced as fallout from #4717 and #5756 (which are present in 0.8.0 and 0.10.1 respectively, neither have made it into a version of CP yet as of 5.5.1). Previously, we were not checking the compatibility of schemas until we tried to produce to the topic, now we're checking it up-front - since most source topics aren't ever produced to, this bug was limited to
INSERT INTO
scenarios, but now it prevents users from reading their data.It may not be feasible to remove this check, because users can declare subset schemas on topics with existing schema registry subjects (e.g.
CREATE STREAM (col3 VARCHAR) WITH (kafka_topic='foo', value_format='avro');
wherefoo-value
in schema registry hascol1
,col2
andcol3
).There are an additional class of bugs (see #5673 and discussion) that are caused by mismatched declared schemas and reigstered schemas, so it might be worthwhile to just prevent partial schema inference.
One way this bug manifests itself is that
srClient.testCompatbility
is case-sensitive forJSON_SR
, so registering the following schema works:But changing the field from
ICAO24
toicao24
will fail withCould not register schema for subject 'foo-value' because it is incompatible with existing schema.
(ksqlDB capitalizes all fields by default). A targeted fix for the above, forJSON_SR
schemas, ksql should perserve the case and backtick everything.Another case where this fails is when the schema compatiblity mode is stricter than
BACKWARDS
- for exampleFULL_TRANSITIVE
makes it almost certain that ksql registered schemas will not be compatible with previous schemas because the fields are nullable.The text was updated successfully, but these errors were encountered: