Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ksqlDB aggressively checks generated schema compatibility in CREATE STREAM statements #5801

Closed
agavra opened this issue Jul 9, 2020 · 12 comments
Assignees
Milestone

Comments

@agavra
Copy link
Contributor

agavra commented Jul 9, 2020

ksqlDB converts schemas in schema registry to SQL schemas when a CREATE statement is issued without a schema. Then, it generates a ParsedSchema from the SQL schema and checks that the round-trip schema is compatible with the original schema in schema registry.

This bug was probably introduced as fallout from #4717 and #5756 (which are present in 0.8.0 and 0.10.1 respectively, neither have made it into a version of CP yet as of 5.5.1). Previously, we were not checking the compatibility of schemas until we tried to produce to the topic, now we're checking it up-front - since most source topics aren't ever produced to, this bug was limited to INSERT INTO scenarios, but now it prevents users from reading their data.

It may not be feasible to remove this check, because users can declare subset schemas on topics with existing schema registry subjects (e.g. CREATE STREAM (col3 VARCHAR) WITH (kafka_topic='foo', value_format='avro'); where foo-value in schema registry has col1, col2 and col3).

There are an additional class of bugs (see #5673 and discussion) that are caused by mismatched declared schemas and reigstered schemas, so it might be worthwhile to just prevent partial schema inference.

One way this bug manifests itself is that srClient.testCompatbility is case-sensitive for JSON_SR, so registering the following schema works:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Flight",
  "type": "object",
  "properties": {
    "ICAO24": {
      "type": "string",
      "description": "Unique ICAO 24-bit address of the transponder in hex string representation."
    }
  }
}

But changing the field from ICAO24 to icao24 will fail with Could not register schema for subject 'foo-value' because it is incompatible with existing schema. (ksqlDB capitalizes all fields by default). A targeted fix for the above, for JSON_SR schemas, ksql should perserve the case and backtick everything.

Another case where this fails is when the schema compatiblity mode is stricter than BACKWARDS - for example FULL_TRANSITIVE makes it almost certain that ksql registered schemas will not be compatible with previous schemas because the fields are nullable.

@agavra agavra added bug needs-triage P0 Denotes must-have for a given milestone labels Jul 9, 2020
@agavra agavra changed the title ksqlDB should perserve case for schemas inferred from JSON_SR subjects in schema registry ksqlDB aggresively checks generated schema compatibility in CREATE STREAM statements Jul 9, 2020
@agavra
Copy link
Contributor Author

agavra commented Jul 9, 2020

Discussed this offline with @MichaelDrogalis and @colinhicks, there are two steps we're going to take:

  • short term: we're going to shotgun fix issues that we know are round-trip incompatible (e.g. casing in JSON) but keep the compatibility check
  • long term: evaluate removing partial schema declarations and thereby allowing us to just remove the compatibility check altogether

I will also check to see if there's a regression from the last CP release (5.5.1) in other potential round-trip schema registry issues such as AVRO unions/enums - which hypothetically would fail down the line.

@agavra agavra self-assigned this Jul 9, 2020
@agavra
Copy link
Contributor Author

agavra commented Jul 9, 2020

It seems that avro backwards compatibility is much looser than JSON backwards compatibility. Here's an enumeration of things I tried on 5.5.1 and whether or not they worked:

Register AVRO Schema with lowercase fields: this works, ksqlDB will be able to read fields and if INSERT INTO is used, it successfully registers a new schema with uppercase fields:

{"subject":"bar-value","version":1,"id":2,"schema":"{..."fields\":[{\"name\":\"col1\",\"type\":[\"null\",\"int\"]},{\"name\":\"col2\",\"type\":{\"type\":\"enum\",\"name\":\"Suit\",\"symbols\":[\"SPADES\",\"HEARTS\"]}}]}"}
{"subject":"bar-value","version":2,"id":3,"schema":"{..."fields\":[{\"name\":\"COL1\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"COL2\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}

Register AVRO Schema with Enum Type: this works, ksqlDB will create a VARCHAR field and if INSERT INTO is used, it successfully registers a new schema with a varchar field

{"subject":"bar-value","version":1,"id":2,"schema":"{..."fields\":[{\"name\":\"col1\",\"type\":[\"null\",\"int\"]},{\"name\":\"col2\",\"type\":{\"type\":\"enum\",\"name\":\"Suit\",\"symbols\":[\"SPADES\",\"HEARTS\"]}}]}"}
{"subject":"bar-value","version":3,"id":8,"schema":"{..."fields\":[{\"name\":\"col2\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}

Register JSON Schema with lowercase fields: reading the fields work, writing the fields do not work:

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering JSON schema: {"type":"object","properties":{"ICAO24":{"connect.index":0,"oneOf":[{"type":"null"},{"type":"string"}]}}}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409

Register an incompatible JSON_SR Stream: 5.5.1 allowed us to create streams over topics with totally incompatible schemas. In the example below, we have registered a schema with ,"properties":{"icao24":{"type":"string"}}, and we can do the statement below, but inserting into it and reading the data is incorrect:

CREATE STREAM baz2 (foobar STRING) WITH (kafka_topic='baz', value_format='json_sr');
ksql> INSERT INTO baz2 (foobar) VALUES ('123');
Failed to insert values into 'BAZ2'. Could not serialize row: [ '123' ]
ksql> SELECT * FROM baz2 EMIT CHANGES;
+--------------------------------------------------------------+--------------------------------------------------------------+--------------------------------------------------------------+
|ROWTIME                                                       |ROWKEY                                                        |FOOBAR                                                        |
+--------------------------------------------------------------+--------------------------------------------------------------+--------------------------------------------------------------+
|1594316270888                                                 |null                                                          |null                                                          |

These examples make me think that the schema compatibility model of AVRO is much more permissive than JSON_SR. The biggest regression from 5.5.x is the ability to read JSON_SR schemas that are case sensitive. It probably makes sense to fix this as a targeted fix for now for JSON_SR with case sensitivity.

@agavra agavra added P1 Slightly lower priority to P0 ;) P0 Denotes must-have for a given milestone and removed P0 Denotes must-have for a given milestone needs-triage P1 Slightly lower priority to P0 ;) labels Jul 9, 2020
@agavra
Copy link
Contributor Author

agavra commented Jul 9, 2020

One thing to note for AVRO, it looks like we compare the schemas with a different record name, so schema registry will always allow the two schemas to be registered even though they are incompatible:

{
    "name": "MyRecord",
    "namespace": "io.almog",
    "type": "record",
    "fields": [{
      "name": "col1",
      "type": "int"
    }]
}
{
  "type": "record",
  "name": "KsqlDataSourceSchema",
  "namespace": "io.confluent.ksql.avro_schemas",
  "fields": [
    {
      "name": "COL1",
      "type": [
        "null",
        "int"
      ],
      "default": null
    }
  ],
  "connect.name": "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema"
}

But if the first one is already registered, then the following will not work:

{
    "name": "MyRecord",
    "namespace": "io.almog",
    "type": "record",
    "fields": [{
      "name": "COL1",
      "type": "int"
    }]
}

@agavra
Copy link
Contributor Author

agavra commented Jul 15, 2020

Talked offline with @MichaelDrogalis and @colinhicks - I think we will leave this behavior and instead fix incompatibility issues (such as #5798) one by one. The reasoning is that we can trade off between these two worlds:

  • ksqlDB will fail to create sources for some schema-registry enabled topics that it should be able to because the compatibility check is too strict
  • ksqlDB will create sources for schema-registry enabled topics that it should not be able and errors will happen mysteriously down the line

Given that it's easier to work towards reducing the items that fall into the first bucket, we will prefer an overly aggressive check and fix the issues one by one.

Closing out this ticket as "won't fix, maintain current behavior"

@apurvam
Copy link
Contributor

apurvam commented Jul 16, 2020

Thanks @agavra . Just making sure I understand correctly: we are going to leave the behavior where, when you do a CREATE STATEMENT without a schema, KSQL will do the following:

  1. It will retrieve the schema from schema registry.
  2. It will convert it to a SQL schema. I presume that this is the schema we store in the metastore?
  3. It will convert it back to a parsed schema, and then calls SR to check compatibility with this parsed schema.

We are proposing to leave this behavior as is, and fix any stricter than necessary compatibility checks which cause step 3 to fail one by one?

@agavra
Copy link
Contributor Author

agavra commented Jul 16, 2020

@apurvam your summary is spot on

@apurvam
Copy link
Contributor

apurvam commented Jul 16, 2020

Thanks. that makes sense to me. Great summary in the original issue :)

@agavra agavra reopened this Jul 23, 2020
@agavra agavra added blocker and removed P0 Denotes must-have for a given milestone labels Jul 23, 2020
@apurvam apurvam added this to the 0.11.0 milestone Jul 23, 2020
@agavra
Copy link
Contributor Author

agavra commented Jul 23, 2020

In light of some new issues surrounding the compatibility check, and the other unknown unknowns that might come up, we've decided to change the behavior to the following:

  • if not schema exists for a topic, we will register it when we receive the statement. This is meaningful mostly for CSAS and CTAS statements, but also helps the case when a CREATE statement declares the fields and there is no schema registered
  • If a schema already exists for a topic, we will not do any validation that our generated schema is register-compatible with the existing schema.

This behavior will allow us to read anything that we were able to read in 5.5.1 and also fix the issues described in #4219. We are open to issues with INSERT INTO compatibility, but this is not a regression from 5.5.

@agavra agavra closed this as completed Jul 23, 2020
@jocelyndrean jocelyndrean changed the title ksqlDB aggresively checks generated schema compatibility in CREATE STREAM statements ksqlDB aggressively checks generated schema compatibility in CREATE STREAM statements Jul 24, 2020
@big-andy-coates
Copy link
Contributor

It feels to me like the right solution to this is being smarter in how we build the schema we want to send to SR to register or check compatibility.

If there is no prior existing schema we can build our own, as we currently do, and register it at the time the CREATE statement is issued.

If a prior schema does exist, then there are a few points where we care about this:

  1. CREATE statements where the users supplies a list of value columns:
    a. ksqlDB needs to ensure the supplied column definitions can read the data in the source topic.
    b. ksqlDB needs to configure the the deserializer with an appropriate schema.
  2. INSERT INTO statements _inserting into sources created by the above type of CREATE statement:
    a. ksqlDB needs to check it can write data that is compatible with the existing schema.
    b. ksqlDB needs to configure the serializer with an appropriate schema.

Point 1.a could be handled by custom code in ksqlDB to validate the user supplied column list with the existing schema. However, it can also be achieved by building a new schema, as explained below, and then using the SR to check compatibility. The later approach leverages existing tech, and is hence the preferred approach.

The issues at the moment seem to me to be caused because ksqlDB builds the new Avro/Json/PB schema using only the column definitions stored in the metastore, ignoring the existing schema. If we take into account the existing schema, I think most/all of the issues go away, (though I don't know all the issues in-depth).

I think we should look to build the new schema by combining any user supplied set of columns with the existing schema using the following rules:

Matching fields/columns

SQL is case insensitive by default. We can preserve the case of field names from the existing schema for any column names that aren't quoted in the CREATE statement. This fixes issues such as #5798.

The case-preserved schema should be used in all places.

Quoted column names should be case-sensitive when matching field names in the schema.

(Note, there's an edge case with the current identifiers implementation that we may choose to fix as part of this: we don't currently know if a fully uppercase column name has been quoted or not. If we don't fix this we'd have to assume any fuller uppercase column name was not quoted, and therefore it would match any the first source field with the same case-insensitive name).

Additional fields in schema

i.e. Fields in the existing schema that aren't supplied by the user.

The CREATE statement should preserve the missing fields in the schema is uses to check compatibility. This allows users to select a view of the data containing only the fields that they care about. We could pass this these fields to the deserializer as well, or may choose to exclude them, allowing smarter deserializers to avoid work locating and coercing fields we don't actually need.

Any INSERT INTO statement should additionally check that the data it will produce, i.e. data only containing the fields defined in the more limited view of the schema, is compatible with the full schema. This is probably best done by having each format being able to determine if a specific column in a schema is optional/nullable. It the additional field is not optional, the INSERT INTO statement should fail. The serializer should use the full schema when producing to the topic. This avoids ksqlDB registering a new schema, with only a subset of the fields, which can cause compatibility issues.

Additional columns

i.e. Columns supplied by the user that aren't in the existing schema.

(Could initially just fail)

The CREATE statement can include these columns in the the schema used to check compatibility and passed to the deserializer. We could ignore them for the compatibility check, but I don't think the additional logic gets up any benefit.

Any INSERT INTO statement would need to register a new schema with the additional columns defined and pass the schema to the deserializer.

Unsupported columns

Where the source schema contains columns with unsupported types ksqlDB currently just ignores them.

The CREATE statement should not ignore them. They should be included in the schema used when validating compatibility. We shouldn't be passing them to the deserializer, (mainly because in the fullness of time the serde interfaces should no use the Connect schema and so would not be able to express any column with an unsupported type).

Any INSERT INTO statement should fail if an unsupported column is not nullable, (implemented by each Format knowing how to determine if a field is optional). The serializer should use the full schema, including the unsupported columns. This avoids ksqlDB registering a new schema, with only a subset of fields.

Enum columns

Enum columns are weird because we can read them, but can't yet write them.

I think Avro enum columns are the only type we read, and we coerce to a string. This coercion likely a thorn in our side.

The CREATE statement should use the original enum field definition when checking compatibility. The deserializer should use VARCHAR.

Any INSERT INTO statements are more tricky! Personally, until Ksql supports enums I'd be rejecting INSERT INTO where an existing enum field is being written to by ksqlDB. It's just unsafe. Plus we likely want to remove INSERT INTO from ksqlDB in the future, removing this nasty edge case.

I believe the current behaviour is on an INSERT INTO would be to register a new schema with a VARCHAR field. This is very wrong IMHO.

Sinks

The same pattern of taking into account any existing schema can also be used when creating a sink's schema.

@agavra
Copy link
Contributor Author

agavra commented Aug 4, 2020

Thanks for the detailed thoughts @big-andy-coates!

If a prior schema does exist, then there are a few points where we care about this:

  1. CREATE statements where the users supplies a list of value columns:
    a. ksqlDB needs to ensure the supplied column definitions can read the data in the source topic.
    b. ksqlDB needs to configure the the deserializer with an appropriate schema.
  2. INSERT INTO statements _inserting into sources created by the above type of CREATE statement:
    a. ksqlDB needs to check it can write data that is compatible with the existing schema.
    b. ksqlDB needs to configure the serializer with an appropriate schema.

This frames the problem nicely, but I want to suggest a different solution: don't allow inline specification of columns in ksql for schemas already registered in schema registry. I think allowing this is really a bug - it introduces a lot of problems (including some more serious, like #5673) and there are compatibility features in some data formats that don't have a sql/ksql equivalent (e.g. additionlProperties described in #5798). If you want a projection of fields in a schema (I'm guessing the only use case for specifying subset fields), you should specify it as a projection - not do it sneakily in the serde suite.

The big benefit is that we don't have to reason about compatibility - that's a hard problem and we should delegate it 100% to schema registry. We also don't want to be in the business of doing this per format, which is something we'd need to get into because each format has their own compatibility requirements.

We can get away with this if we separate the case-sensitivity from the serialization layer. We can always coerce the GenericRow into the desired serialization schema and vice-versa if the SQL schema and the serialization schema are compatible (which we know they are in CT/CS statements because we generate the SQL schema from it).

@big-andy-coates
Copy link
Contributor

I like it :D§

@agavra
Copy link
Contributor Author

agavra commented Aug 6, 2020

#5959 - I created an issue to track this, should get around to it sometime soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants