Author: @big-andy-coates | Release Target: 0.13 | Status: Approved | Discussion: confluentinc#6017
tl;dr: ksqlDB currently only supports keys compatible with the KAFKA
format. This limits the
data ksqlDB can work with. Extending the set of key formats ksqlDB supports immediately
opens up the use of ksqlDB with previously incompatible datasets.
Data stored in Kafka has a key and a value. These can be serialized using different formats, but
generally use a common format. ksqlDB supports multiple value data formats, but requires the
key data format to be the KAFKA
.
This limitation is particularly problematic for tables. ksqlDB is unable to access changelog topics
in Kafka that have non-KAFKA
formatted keys. As the key of the Kafka record is the PRIMARY KEY
of the table, it is essential that the key can be read if the changelog is to be materialised into a
table. When changelog topics have non-KAFKA
key formats, the limitation precludes ksqlDB as a solution.
The limitation is less damaging for streams. However, it is still the case that the user loses the ability to access the data in the Kafka record's key. If this data is not duplicated in the record's value, which generally seems to be the case, then the data is not accessible at all. If the data is required, then the limitation precludes ksqlDB as a solution.
As well as unsupported input key formats, ksqlDB is equally precluded should a solution require the
output to have a non-KAFKA
key format. ksqlDB is often used as the glue between disparate systems,
even though it is limited to changing the values format and structure. Supporting other key formats
opens this up to also transforming the key into a different format.
In some cases users are able to work around this limitation. This may involve changing upstream code, or introducing pre-processing, or, in the case of Connect, using SMTs to convert the key format. All such solutions tend to increase the complexity of the system, and generally hurt performance.
To open ksqlDB up to new problems spaces and to drive adoption, ksqlDB should support other key formats.
- Addition of a new optional
KEY_FORMAT
property in theWITH
clause, to set the key format. - Addition of a new optional
FORMAT
property in theWITH
clause, to set both the key & value formats. - Addition of new server configuration to provide defaults for key format.
- Support for additional key column data types, where the key format supports it:
DECIMAL
BOOLEAN
- Support of the following key formats:
KAFKA
: the current key format.DELIMITED
: single key columns as a single string value.JSON
/JSON_SR
: single key column as an anonymous value, i.e. not within a JSON object.AVRO
: single key column as an anonymous value, i.e. not within an Avro record.NONE
: special format indicating no data, or ignored data, e.g. a key-less stream.
- Storing and retrieving key schemas from the Schema Registry for formats that support the integration.
- Full support of these key formats for all supported SQL syntax.
- Automatic repartitioning of streams and tables for joins where key formats do not match.
- Support for reading & writing key schemas to & from the schema registry.
- Enhancements to QTT and the ksqlDB testing tool to allow for keys with formats beyond KAFKA.
- Support for multiple key columns: this will come later.
- Support for single key columns wrapped in an envelope of some kind: this will come later.
- Support for complex key column data types, i.e. array, struct and map: this will come later.
- Support for
PROTOBUF
keys, as this requires support for wrapped keys: this will come later. - Enhancing DataGen to support non-KAFKA keys.
- Key schema evolution. (See key schema evolution) in the compatibility section.
- Support for right-outer joins. This may be covered in a future KLIP.
We know from customers and community members that there are a lot of people that have data with
non-KAFKA
formatted keys. This is the first step to unlocking that data and use-cases.
With support for AVRO
and JSON
key formats there are a lot of existing use-cases that suddenly
no longer require pre-processing, or tricky Connect SMTs configured, and there are new use-cases,
which ksqlDB was previously unsuitable for, as documented in the motivation section, which can
now be handled.
The following new properties will be accepted in the WITH
clause of CREATE
statements for streams
and tables.
KEY_FORMAT
: sets the key format, works long the same lines as the existingVALUE_FORMAT
.FORMAT
: sets both the key and value format with a single property.
KEY_FORMAT
will not be a required property if ksql.persistence.default.format.key
is set.
Providing FORMAT
will set both the key and value formats. Providing FORMAT
along with either
KEY_FORMAT
or VALUE_FORMAT
will result in an error.
The following new configuration options will be added. These configurations can be set globally,
within the application property file, or locally, via the SET
command.
ksql.persistence.default.format.key
: the default key format.
The new KEY_FORMAT
or FORMAT
property will be supported wherever the current VALUE_FORMAT
is
supported. Namely:
- In
CREATE STREAM
andCREATE TABLE
statements. - In
CREATE STREAM AS SELECT
andCREATE TABLE AS SELECT
statements.
The key format will follow the same inheritance rules as the current value format. Namely: any
derived stream will inherit the format of its leftmost source, unless the format is explicitly set
in the WITH
clause.
For example:
-- Creates a table over a changelog topic with AVRO key and JSON value:
CREATE TABLE USERS (
ID BIGINT PRIMARY KEY,
NAME STRING
) WITH (
KAFKA_TOPIC='USERS',
KEY_FORMAT='AVRO',
VALUE_FORMAT='JSON'
);
-- Creates a stream over a topic with JSON key and value:
CREATE STREAM BIDS (
ITEM_ID BIGINT KEY,
USER_ID BIGINT,
AMMOUNT INT
) WITH (
KAFKA_TOPIC='USERS',
FORMAT='JSON'
);
-- Change the key format of a stream:
CREATE STREAM AVRO_BIDS
WITH WITH (
KEY_FORMAT='AVRO'
) AS
SELECT * FROM BIDS;
-- Creates an enriched stream. The key format is inherited from the leftmost source, i.e. JSON:
CREATE STREAM ENRICHED_BIDS AS
SELECT *
FROM BIDS
JOIN USERS ON BIDS.USER_ID = USERS.ID;
For formats that support integration with the schema registry, the key schema will be read and registered with the Schema Registry as needed, following the same pattern as the value schema in the current product.
In addition, where possible, key schemas will be marked as READONLY
to avoid unintentional
changes to the key schema id, which would break compatibility. If the Schema Registry is not
configured to allow schema mutability to be set, then the statement will still succeed, only
a warning will be logged, with link to Schema Registry config that needs changing.
If a CREATE TABLE
or CREATE STREAM
statement does not include a KEY_FORMAT
property, the
key format is picked up from the ksql.persistence.default.format.key
system configuration. If this
is not set, then an error is returned. Note: The server config will have this set to KAFKA
to
maintain backwards compatibility with current system by default.
The system already serializes the key format of source, intermediate and sink topics as part of the query plan, meaning it should be fairly easily to plug in new formats.
Validation will be added to ensure only supported key formats are set, and that key column data types are supported by key formats.
Most existing functionality should just work, as the key format only comes into play during (de)serialization, (obviously). The only area where additional work is expected are joins and key-less streams.
Joins require the binary key of both sides of the join to match and both sides to be delivered to the same ksqlDB node. The former normally ensuring the latter, unless a custom partitioning strategy has been used.
The introduction of additional key formats means that while the deserialized key from both sides of a join may match, the serialized binary data may differ if the key serialization format is different. To accommodate this, ksqlDB will automagically repartition one side of a join to match the key format of the other.
Many joins require an implicit repartition of one or both sides to facilitate the join. In such situations the change of key format can be performed in the same repartitioned step, avoiding any additional re-partitions. This means that joining sources with different key formats will only require an implicit repartition to converge the key formats if neither side is already being repartitioned.
Where one side must be repartitioned to correct the key format, choosing which side to repartition can not be driven by the size of the data, as in a traditional database system, as the size of the data is unknown, likely infinite. Ideally, for a streaming system it is the rate of change of the data, i.e. the throughput, that would drive the choice. Unfortunately, this too can not be known upfront. For this reason, we propose repartitioning based on the order of sources within the query, with the source on the right being repartitioned.
A benefit of making the choice order-based is that, once the rule is learned, users can predict and control which side is re-partitioned in some situations, i.e. stream-stream and table-table joins.
Note: allowing users to freely switch left and right sources to control which side is repartitioned will work for all but left-outer joins. To support switching left-outer joins ksqlDB would need to support a right-outer join. The addition of this is deemed out of scope.
Repartitioning the right side was chosen over the left, as it will mean stream-table joins will repartition the table, which we propose will generally see a lower throughput of updates to the stream side.
Such repartitioning is possible and safe... ish, even for tables, because the logical key of the data will not have changed, only the serialization format. This ensures the ordering of updates to a specific key are maintained across the repartition. Of course, the repartitioning would introduce cross-key out-of-order data, as the records are shuffled across partitions. That is to say that even if the source partitions were correctly ordered by time, the re-partitioned partitions would see out-of-order records, though per-key ordering would be maintained. Thus time-tracking ("stream-time"), grace-period and retention-time might be affected. However, this phenomenon already exists, and is deemed acceptable, for other implicit re-partitions.
To ensure query plans written after this work are forward compatible with future enhancements to
support single key columns wrapped in JSON object, Avro records, etc, and ultimately multiple key
columns, a new UNWRAP_SINGLE_KEY
value will be added to SerdeOption
and explicitly set on all
source, sink and internal topics. See Future multi-column key work
below for more info / background.
A new NONE
format will be introduced to allow users to provide a KEY_FORMAT
that informs ksqlDB
to ignore the key. This format will be rejected as a VALUE_FORMAT
for now, as ksqlDB does not yet
support value-less streams and tables. See Schema Inference below for more
info / background.
This format is predominately being added to allow users to declare key-less streams when the new
ksql.persistence.default.format.key
system configuration is set to a format that supports schema
inference, i.e. loading the schema from the schema registry. If a user were not to explicitly set
the key format to NONE
and attempt to create a stream, ksqlDB would attempt to read the key schema
from the schema registry, and report an error if the schema did not exist. The NONE
format will
allow users to override the default key format and explicitly inform ksqlDB to ignore the key:
SET 'ksql.persistence.default.format.key'='AVRO';
-- Only the value columns of CLICKS will be loaded from the schema registry.
CREATE STREAM CLICKS
WITH (
key_format='NONE', -- Informs ksqlDB to ignore the key
value_format='AVRO',
...
);
Declaring a table with key format NONE
will result in an error.
Defining key columns IN CREATE TABLE
or CREATE STREAM
statements where the key format is NONE
will result in an error:
CREATE TABLE USER (
ID INT PRIMARY KEY,
NAME STRING
) WITH (
key_format='NONE' -- Error! Can't define key columns with this format
...
)
CREATE AS
statements that set the key, i.e. those containing GROUP BY
, PARTITION BY
and
JOIN
, where the source has a NONE
key format, and which do not explicitly define a key format,
will pick up their key format from the new ksql.persistence.default.format.key
system
configuration. If this setting is not set, the statement will generate an error.
CREATE STREAM KEY_LESS (
NAME STRING
) WITH (
key_format='NONE',
...
);
-- Table T will get key format from the 'ksql.persistence.default.format.key' system config.
-- If the config is not set, an error will be generated.
CREATE TABLE T AS
SELECT
NAME,
COUNT()
FROM KEY_LESS
GROUP BY NAME;
CREATE AS
statements that create key-less streams will now implicitly set the key format to
NONE
.
Aside from the usual unit tests etc, the QTT suit of tests will be extended to cover the different key formats. Tests will be added to cover the new syntax and configuration combinations and permutations. Existing tests covering aggregations, re-partitions and joins will be extended to include variants with different key formats.
The KLIP will be broken down into the following deliverables:
-
Basic JSON support (5 weeks): Support for the
JSON
key format, without:- schema registry integration
- Automatic repartitioning of streams and tables for joins where key formats do not match: such joins will result in an error initially.
Included in this milestone:
- Addition of a new optional
KEY_FORMAT
property in theWITH
clause, to set the key format. - Addition of a new optional
FORMAT
property in theWITH
clause, to set both the key & value formats. - Addition of new server configuration to provide defaults for key format.
- Support for additional key column data types, as JSON supports them:
DECIMAL
BOOLEAN
- Full support of the key format for all supported SQL syntax.
- Enhancements to QTT and the ksqlDB testing tool
- Rest and HTTP2 server endpoints and Java client to work with new key format.
-
NONE format (1 week): Supported on keys only. Needed to support key-less streams once we have SR integration.
-
Schema Registry support (1 week): Adds support for reading and writing schemas to and from the schema registry.
-
JSON_SR support (1 week) Adds support for the
JSON_SR
key format, inc. schema registry integration. -
Avro support (1 week) Adds support for the
AVRO
key format, inc. schema registry integration. -
Delimited support (1 week): Adds support for the
DELIMITED
key format. -
Auto-repartitioning on key format mismatch (1.5 weeks). Adds support for automatic repartitioning of streams and tables for joins where key formats do not match.
-
Blog post (1 week): write a blog post about the new features. Likely one post for everything but auto-repartitioning, and a second to cover this.
New server config and new CREATE
properties will be added to main docs site.
There are no incompatible changes within the proposal, so no demos and examples must change.
However, it probably pays to update some to highlight the new features. We propose updating the
Kafka micro site examples to leverage the new functionality, as these have automated testing.
It may be worth changing the ksqlDB quickstart too.
As mentioned above, existing query plays already include key formats for all topics. So existing queries will continue to work.
Without ksql.persistence.default.format.key
set to KAFKA
existing queries in the form:
CREATE TABLE USERS (
ID BIGINT PRIMARY KEY,
NAME STRING
) WITH (
KAFKA_TOPIC='USERS',
VALUE_FORMAT='JSON'
);
...will start failing, as they do not specify the KEY_FORMAT
. We therefore propose the server
config shipped with ccloud and on-prem releases has ksql.persistence.default.format.key
set to
KAFKA
.
Assuming the default key format is set, existing SQL will run unchanged. The ksqlDB release will include this property set.
ksqlDB supports allowing the user to choose between serializing a single value column as
an anonymous value, or within an envelope of some kind, via the WRAP_SINGLE_VALUE
property. For
example, the following expects the value to a JSON
serialized number, not a JSON object with
a foo
numeric field.
CREATE STREAM INPUT (
K STRING KEY,
foo INT
) WITH (
WRAP_SINGLE_VALUE=false,
kafka_topic='input_topic',
value_format='JSON'
);
The is also a system configuration ksql.persistence.wrap.single.values
that can be used to provide
a default for wrapping / unwrapping single values.
Where the user explicitly requests wrapping or unwrapping of single values, either via the WITH
clause property or the system configuration, the query plan's formats
has either the
WRAP_SINGLE_VALUES
or UNWRAP_SINGLE_VALUES
SerdeOption
set on the source and/or sink topics.
These options are used to ensure correct serialization and deserialization when the query is executed,
and control the shape of the schema registered with the Schema Registry. If neither option is set,
the format's default wrapping is used, e.g. KAFKA
defaults to unwrapped, where as JSON
defaults
to wrapped
.
This KLIP adds the ability to serialized a single key column as an anonymous value. Future work will
extend this to support wrapped single columns and then multiple columns. This future work will need
to maintain backwards compatibility and allow users to choose how single key values should be
serialized. Future work will introduce a WRAP_SINGLE_KEY
property and a ksql.persistence.wrap.single.keys
configuration.
To ensure query plans written by this KLIP are forwards compatible with this planned work, all query
plans will explicitly set the UNWRAP_SINGLE_KEYS
SerdeOption
on all source, internal and sink
topics, ensuring the correct (de)serialization options are maintained once ksqlDB supports these
later features.
NB: ksqlDB makes no claims of guaranteeing future versions will use the same formats for internal topics for new queries.
Internal topics have their key format serialized as part of the query plan, i.e. all current plans
have KAFKA
as the key format. This means all existing plans are forward compatible with this
KLIP.
When generating new query plans, internal topics inherit their key, (and value), format from their
leftmost parent. This KLIP does not propose to change this, except where an automatic repartition
is required to align key formats to enable a join, as already noted. New plans generated after this
KLIP may have key formats other than KAFKA
for source, sink and internal topics.
No changes around internal topics are needed to maintain compatibility.
The introduction of key formats that support schema inference, i.e. loading the key schema from the schema registry, introduces an edge cases we must account for: key-less streams.
kqlDB already supports 'partial schemas', where the value format supports schema inference and the user explicitly provides the key definition:
--- table created with values using schema inference.
CREATE TABLE FOO (
ID BIGINT PRIMARY KEY
) WITH (
KAFKA_TOPIC='foo',
KEY_FORMAT='KAFKA',
VALUE_FORMAT='AVRO'
);
If the key format also supports schema inference as well, then this would become:
--- table created with keys and values using schema inference.
CREATE TABLE FOO WITH (
KAFKA_TOPIC='foo',
KEY_FORMAT='AVRO',
VALUE_FORMAT='AVRO'
);
No problem so far. However, a user can currently define a key-less stream with:
-- key-less stream with explicitly provided columns:
CREATE STREAM FOO (
VAL STRING
) WITH (
KAFKA_TOPIC='foo',
VALUE_FORMAT='DELIMITED'
);
-- key-less stream with value columns using schema inference:
CREATE STREAM FOO WITH (
KAFKA_TOPIC='foo',
VALUE_FORMAT='AVRO'
);
But what happens once users can supply the key format? Key format currently defaults to KAFKA
, but
it doesn't make sense to force users to set KEY_FORMAT
to KAFKA
if there is no key!
-- Bad UX: forcing users to set the key format to KAFKA if there are no key columns, or the key
-- data is in a format ksqlDB can't read.
CREATE STREAM FOO WITH (
KAFKA_TOPIC='foo',
KEY_FORMAT='KAFKA',
VALUE_FORMAT='AVRO'
);
The user may also have set a default key format, via the ksql.persistence.default.format.key
system configuration, that supports schema inference. How then does a user declare a key-less
stream as opposed to a stream where the key schema is loaded from the Schema Registry?
SET 'ksql.persistence.default.format.key'='AVRO';
-- This would attempt to load the key schema from the Schema Registry:
CREATE STREAM FOO WITH (
KAFKA_TOPIC='foo',
VALUE_FORMAT='AVRO'
);
We propose adding a NONE
key format to allow users to explicitly set the key format when they
intend a stream to be key-less:
-- explicitly key-less stream:
CREATE STREAM FOO WITH (
KAFKA_TOPIC='foo',
KEY_FORMAT='NONE',
VALUE_FORMAT='AVRO'
);
Declaring a stream with a key format that does not support schema inference, and with no key columns, will result in an error:
CREATE STREAM FOO WITH (
KAFKA_TOPIC='foo',
KEY_FORMAT='KAFKA', -- Error: no key column defined!
VALUE_FORMAT='AVRO'
);
However, declaring a stream without any key columns and without an explicit key format, where the default key does not support schema inference, will not result in an error. This is required to maintain backwards compatibility for current statements defining key-less streams.
SET 'ksql.persistence.default.format.key'='KAFKA';
-- Creates a key-less stream:
CREATE STREAM FOO WITH (
KAFKA_TOPIC='foo',
VALUE_FORMAT='AVRO'
);
Key formats that support schema inference through integration with the Schema Registry prepend the serialized key data with a magic byte and the id of the registered schema.
If the key schema evolves, the schema id will change. Hence the serialized bytes of the same logical key will have changed, and meaning updates to the same logical key may now be spread across multiple partitions. For this reason, evolution of the key's schema is not supported.
Is not supporting key schema evolution a big concern? We propose not. If the schema of the key has changed, then in almost all cases the key itself has changed, e.g. a new column, or a change of column type. These, in themselves, will result in a different binary key. So the change of schema id seems a secondary issue.
There are schema evolutions that would be binary compatible were it not for the schema id in the key, e.g. changing the logical type of a Avro value. It would be possible to add a custom partitioner that ignored the magic byte and schema Id. However, we propose these are niche enough that supporting them has little ROI, at present. Hence key schema evolution is out of scope.
None.