Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unreadable state store when running a query with a source table with a schema-registry schema #5673

Closed
rodesai opened this issue Jun 24, 2020 · 6 comments · Fixed by #5823
Closed
Assignees
Labels
avro Issues involving Avro serialisation blocker bug requires-streams-change
Milestone

Comments

@rodesai
Copy link
Contributor

rodesai commented Jun 24, 2020

We recently discovered an issue in streams that impacts ksql (https://issues.apache.org/jira/browse/KAFKA-10179). KS can optimize materialization of a topology's source table by using the source topic as the table's change-log rather than creating a new change-log topic. This becomes an issue during restore, because the restore just copies the source topic data into rocksdb. This is a problem for schemas registered with schema-registry. The data in the source topic has a schema id that is registered under the source topic's schema. However, the deserializer for the state store is provided a subject for the store, which doesn't have that schema registered, and deserialization fails.

Streams has proposed a fix in KAFKA-10179. However, the proposed fix - to associate the source topic name with the changelog state store - is not going to work for ksql. In ksql, the schema of the data going into the changelog may be different from the source topic’s schema (e.g. we may only be using some of the source’s fields), and we can’t register that schema with the source topic’s subject in the schema registry. We need to do our own work to solve the following:

How do we want to proceed for new queries? We could use the optimization, but then we must read and materialize the source using the source’s schema, and project out the needed fields in a downstream transformation. Alternatively, we could disable the optimization (provided streams gives us a way of doing this).

How do we want to handle existing queries? Existing queries may be in a corrupted state (e.g. corrupted state stores). How do we handle this? Some options:

  • try and detect this state and mark the queries as failed
  • detect this from the deserializer and drop corrupted rows
  • play some games with the deserializer to look for schemas in both subjects during deserialization
@rodesai rodesai added bug avro Issues involving Avro serialisation needs-triage labels Jun 24, 2020
@rodesai rodesai added this to the 0.11.0 milestone Jun 24, 2020
@agavra
Copy link
Contributor

agavra commented Jun 26, 2020

Steps to reproduce:

create table t_source (id int primary key, `col1` int) with (kafka_topic='t', value_format='avro', partitions=1, replicas=1);
create table t (id int primary key) with (kafka_topic='t', value_format='avro');
create stream s (id int key, col1 int) with (kafka_topic='s', value_format='avro', partitions=1, replicas=1);
insert into t_source  VALUES (1, 1);
insert into s (id, col1) VALUES (1, 1);
SET 'auto.offset.reset'='earliest';
create stream j as select t.*, s.* from s join t on s.col1 = t.id;

--- at this point you need to delete your streams data dir and
--- restart the ksql application

insert into s (id, col1) VALUES (1, 1);

The following topics have been created:

ksql> SHOW ALL TOPICS;

 Kafka Topic                                                              | Partitions | Partition Replicas
------------------------------------------------------------------------------------------------
 J                                                                        | 1          | 1
 __confluent.support.metrics                                              | 1          | 1
 _confluent-ksql-default__command_topic                                   | 1          | 1
 _confluent-ksql-default_query_CSAS_J_0-Join-repartition                  | 1          | 1
 _schemas                                                                 | 1          | 1
 default_ksql_processing_log                                              | 1          | 1
 s                                                                        | 1          | 1
 t                                                                        | 1          | 1
------------------------------------------------------------------------------------------------

At this point, schema registry will have the following information (note that _confluent-ksql-default_query_CSAS_J_0-KafkaTopic_Right-Reduce-changelog-value is registered in schema registry even though the topic does not exist):

curl -XGET 'http://localhost:8081/subjects'
["_confluent-ksql-default_query_CSAS_J_0-KafkaTopic_Right-Reduce-changelog-value","t-value","s-value","_confluent-ksql-default_query_CSAS_J_0-Join-repartition-value","J-value"]

Importantly, notice the difference between t-value and _confluent-ksql-default_query_CSAS_J_0-KafkaTopic_Right-Reduce-changelog-value, specifically that the fields are upper-cased in the latter and the id of the two schemas diverge:

curl -XGET 'http://localhost:8081/subjects/t-value/versions/1'
{"subject":"...","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"col1\",\"type\":[\"null\",\"int\"],\"default\":null}],\"connect.name\":\"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema\"}"}

curl -XGET 'http://localhost:8081/subjects/_confluent-ksql-default_query_CSAS_J_0-Join-repartition-value/versions/1'
{"subject":"...","version":1,"id":4,"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"COL1\",\"type\":[\"null\",\"int\"],\"default\":null}]}"}

The logs will show the following error message:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing message from topic: _confluent-ksql-default_query_CSAS_J_0-KafkaTopic_Right-Reduce-changelog
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic _confluent-ksql-default_query_CSAS_J_0-KafkaTopic_Right-Reduce-changelog to Avro: 
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:118)
	at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:45)
	at io.confluent.ksql.serde.tls.ThreadLocalDeserializer.deserialize(ThreadLocalDeserializer.java:37)
	at io.confluent.ksql.serde.GenericRowSerDe$GenericRowDeserializer.deserialize(GenericRowSerDe.java:304)
	at io.confluent.ksql.serde.GenericRowSerDe$GenericRowDeserializer.deserialize(GenericRowSerDe.java:289)
	at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:45)
	at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:54)
	at org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:27)
	at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:210)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:133)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:880)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:133)
	at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:78)
	at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
	at org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesGetter.get(KTableTransformValues.java:144)
	at org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesGetter.get(KTableTransformValues.java:144)
	at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:77)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:865)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:100)
	at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:716)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:865)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:716)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:884)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:695)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema version for id 3
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:351)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:436)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:423)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:228)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:356)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:113)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:155)
	at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:162)
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:101)
	at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:45)

Some notes:

  • the serializer that serializes the messages into the state store will serialize them under the subject _confluent-ksql-default_query_CSAS_J_0-KafkaTopic_Right-Reduce-changelog-value using the schema of the ksql source (not the existing topic schema).
  • if the schema that is being serialized to the state store is exactly the same as the schema of the source topic, then schema registry will reuse the same schemaId (this is why it is necessary to differentiate between t and t_source in the example above)

I still need to confirm this hypothesis, but I think that for existing queries we should be able to register the exact source schema under the changelog subject so that either (1) the data has the schemaId that was registered in steady state or (2) the data comes from a recovery and it will then be able to still find the correct schemaid in the changelog subject. In the example above, that would mean registering schema id 1 under the _confluent-ksql-default_query_CSAS_J_0-KafkaTopic_Right-Reduce-changelog-value subject.

The concern is that this makes it so that the state store in recovery may be larger than the state store in normal operation (because a DDL may be projecting fewer fields than those which are in the topic itself), but I believe this is preferable to data loss.

I'm still thinking about the best way to handle "new queries" that come after the fix for KAFKA-10179 comes in.

@agavra
Copy link
Contributor

agavra commented Jun 26, 2020

Confirmed that the following fixes the issue:

(13:56:18) Downloads ➤ cat schema.avsc
{"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"col1\",\"type\":[\"null\",\"int\"],\"default\":null}]}"}

(13:56:29) Downloads ➤ curl -X POST -d @schema.avsc -H "Accept: application/json" -H "Content-Type: application/vnd.schemaregistry.v1+json" http://localhost:8081/subjects/_confluent-ksql-default_query_CSAS_J_0-KafkaTopic_Right-Reduce-changelog-value/versions
{"id":3}

I need to test if this will work in cases when we use a subset schema, but I suspect that it will work (other than having the side effect of increasing the state store disk usage)

@agavra
Copy link
Contributor

agavra commented Jun 26, 2020

Confirmed that this fix works for partial schemas as well. This could be a viable workaround to recover any queries in the bad state at expense of extra disk space.

@rodesai's comment:

We could use the optimization, but then we must read and materialize the source using the source’s schema, and project out the needed fields in a downstream transformation.

It seems like this will happen by default, we don't need to do any work to take advantage of this. The concern with the approach in KAFKA-10179 is that we will be writing the changelog schema to the users' subject in schema registry, which may not be what we want.

In general, I don't think there's any way for this optimization to be "correct" given the possibility that there will be asymmetry in the serde suite. There is an expectation that serialize(deserialize(topic_data)) == topic_data for the serde, but there's no way for Kafka Streams or for KSQL to enforce this, so there will always be a potential for bugs in this recovery optimization. My opinion is to require this optimization to be enabled on a case-by-case basis via some streams property config.

@agavra
Copy link
Contributor

agavra commented Jul 7, 2020

Discussed offline with some folk, the short term solution will be to proactively register the source topic schema into schema registry under the changelog value subject so that we will be able to recover from this bug. Additionally, for all future queries we will add an additional map stage to prevent from hitting this optimization (as there are other potential side effects we don't want from it - as discussed on https://issues.apache.org/jira/browse/KAFKA-10179). It is not currently on the kafka streams roadmap to rework the way this optimization is handled other than the bug-fix apache/kafka#8902

@guozhangwang
Copy link
Contributor

Hi @agavra @rodesai @apurvam I know I'm late on this ticket, but recently while working on https://issues.apache.org/jira/browse/KAFKA-8037 as part of the restoration optimization it occurs to me again that this issue may worth a more general fix: to short, there's a discrepancy of the source topic materialization such that during normal processing, we would deserialize the data from source topic with the registered deserializer (which is based on the source topic registered schema), and then serialize it and puts the bytes into rocksDB / changelog topic with the registered serializer (which will set a schema for the changelog topic); whereas during restoration we just directly get the bytes from the source topic to the state store without deserialization / serialization again. This issue applies for both source KTable as well as Global KTable.

In addition, it means if there are any customized processor before materializing the state stores, they will be ignored during the restoration as well --- I'm actually not sure how KSQL did it today since it is not supported in Streams DSL, but from your description schema of the data going into the changelog may be different from the source topic’s schema (e.g. we may only be using some of the source’s fields) I'm assuming that's indeed the case, could you point me to the relevant code how you did this customized filtering or other logic before materialization btw? --- so although the original motivation of just copyying bytes over to state store is good for saving serde costs, we cannot always blindly do that and this is a general issue that worth its own fix.

My current thoughts are: 1) we should detect for source KTables / GlobalKables that have customized logic so the bytes from the source topic are not the same as the bytes written to the state store (and hence not the changelog), and if this is the case, we should still create a changelog topic and not piggy-back using source topics, 2) if there's no customized logic even, upon restoration from source topic we should still check if there's any corrupted data, since during normal processing since we have the serde process ill-formatted data would be dropped still.

WDYT? Does that resolve this issue?

@agavra
Copy link
Contributor

agavra commented Jul 20, 2020

For anyone following this discussion, it's continued on KAFKA-10179 and KAFKA-8037 as it's more relevant to Kafka Streams

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
avro Issues involving Avro serialisation blocker bug requires-streams-change
Projects
None yet
4 participants