-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unreadable state store when running a query with a source table with a schema-registry schema #5673
Comments
Steps to reproduce: create table t_source (id int primary key, `col1` int) with (kafka_topic='t', value_format='avro', partitions=1, replicas=1);
create table t (id int primary key) with (kafka_topic='t', value_format='avro');
create stream s (id int key, col1 int) with (kafka_topic='s', value_format='avro', partitions=1, replicas=1);
insert into t_source VALUES (1, 1);
insert into s (id, col1) VALUES (1, 1);
SET 'auto.offset.reset'='earliest';
create stream j as select t.*, s.* from s join t on s.col1 = t.id;
--- at this point you need to delete your streams data dir and
--- restart the ksql application
insert into s (id, col1) VALUES (1, 1); The following topics have been created:
At this point, schema registry will have the following information (note that curl -XGET 'http://localhost:8081/subjects'
["_confluent-ksql-default_query_CSAS_J_0-KafkaTopic_Right-Reduce-changelog-value","t-value","s-value","_confluent-ksql-default_query_CSAS_J_0-Join-repartition-value","J-value"] Importantly, notice the difference between
The logs will show the following error message:
Some notes:
I still need to confirm this hypothesis, but I think that for existing queries we should be able to register the exact source schema under the changelog subject so that either (1) the data has the schemaId that was registered in steady state or (2) the data comes from a recovery and it will then be able to still find the correct schemaid in the changelog subject. In the example above, that would mean registering schema id The concern is that this makes it so that the state store in recovery may be larger than the state store in normal operation (because a DDL may be projecting fewer fields than those which are in the topic itself), but I believe this is preferable to data loss. I'm still thinking about the best way to handle "new queries" that come after the fix for KAFKA-10179 comes in. |
Confirmed that the following fixes the issue: (13:56:18) Downloads ➤ cat schema.avsc
{"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"col1\",\"type\":[\"null\",\"int\"],\"default\":null}]}"}
(13:56:29) Downloads ➤ curl -X POST -d @schema.avsc -H "Accept: application/json" -H "Content-Type: application/vnd.schemaregistry.v1+json" http://localhost:8081/subjects/_confluent-ksql-default_query_CSAS_J_0-KafkaTopic_Right-Reduce-changelog-value/versions
{"id":3} I need to test if this will work in cases when we use a subset schema, but I suspect that it will work (other than having the side effect of increasing the state store disk usage) |
Confirmed that this fix works for partial schemas as well. This could be a viable workaround to recover any queries in the bad state at expense of extra disk space. @rodesai's comment:
It seems like this will happen by default, we don't need to do any work to take advantage of this. The concern with the approach in KAFKA-10179 is that we will be writing the changelog schema to the users' subject in schema registry, which may not be what we want. In general, I don't think there's any way for this optimization to be "correct" given the possibility that there will be asymmetry in the serde suite. There is an expectation that |
Discussed offline with some folk, the short term solution will be to proactively register the source topic schema into schema registry under the changelog value subject so that we will be able to recover from this bug. Additionally, for all future queries we will add an additional map stage to prevent from hitting this optimization (as there are other potential side effects we don't want from it - as discussed on https://issues.apache.org/jira/browse/KAFKA-10179). It is not currently on the kafka streams roadmap to rework the way this optimization is handled other than the bug-fix apache/kafka#8902 |
Hi @agavra @rodesai @apurvam I know I'm late on this ticket, but recently while working on https://issues.apache.org/jira/browse/KAFKA-8037 as part of the restoration optimization it occurs to me again that this issue may worth a more general fix: to short, there's a discrepancy of the source topic materialization such that during normal processing, we would deserialize the data from source topic with the registered deserializer (which is based on the source topic registered schema), and then serialize it and puts the bytes into rocksDB / changelog topic with the registered serializer (which will set a schema for the changelog topic); whereas during restoration we just directly get the bytes from the source topic to the state store without deserialization / serialization again. This issue applies for both source KTable as well as Global KTable. In addition, it means if there are any customized processor before materializing the state stores, they will be ignored during the restoration as well --- I'm actually not sure how KSQL did it today since it is not supported in Streams DSL, but from your description My current thoughts are: 1) we should detect for source KTables / GlobalKables that have customized logic so the bytes from the source topic are not the same as the bytes written to the state store (and hence not the changelog), and if this is the case, we should still create a changelog topic and not piggy-back using source topics, 2) if there's no customized logic even, upon restoration from source topic we should still check if there's any corrupted data, since during normal processing since we have the serde process ill-formatted data would be dropped still. WDYT? Does that resolve this issue? |
For anyone following this discussion, it's continued on KAFKA-10179 and KAFKA-8037 as it's more relevant to Kafka Streams |
We recently discovered an issue in streams that impacts ksql (https://issues.apache.org/jira/browse/KAFKA-10179). KS can optimize materialization of a topology's source table by using the source topic as the table's change-log rather than creating a new change-log topic. This becomes an issue during restore, because the restore just copies the source topic data into rocksdb. This is a problem for schemas registered with schema-registry. The data in the source topic has a schema id that is registered under the source topic's schema. However, the deserializer for the state store is provided a subject for the store, which doesn't have that schema registered, and deserialization fails.
Streams has proposed a fix in KAFKA-10179. However, the proposed fix - to associate the source topic name with the changelog state store - is not going to work for ksql. In ksql, the schema of the data going into the changelog may be different from the source topic’s schema (e.g. we may only be using some of the source’s fields), and we can’t register that schema with the source topic’s subject in the schema registry. We need to do our own work to solve the following:
How do we want to proceed for new queries? We could use the optimization, but then we must read and materialize the source using the source’s schema, and project out the needed fields in a downstream transformation. Alternatively, we could disable the optimization (provided streams gives us a way of doing this).
How do we want to handle existing queries? Existing queries may be in a corrupted state (e.g. corrupted state stores). How do we handle this? Some options:
The text was updated successfully, but these errors were encountered: