-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: fix repartition semantics #4816
Merged
big-andy-coates
merged 2 commits into
confluentinc:master
from
big-andy-coates:paratition_by_semantics
Mar 20, 2020
Merged
fix: fix repartition semantics #4816
big-andy-coates
merged 2 commits into
confluentinc:master
from
big-andy-coates:paratition_by_semantics
Mar 20, 2020
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Fixes: confluentinc#4749 ##### Background This change fixes an issue with our repartition semantics. Old style query semantics for partition by are broken: S1: ROWKEY => B, C (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important). ```sql CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B; ``` S2: ROWKEY => B, C As you can see the schema of S2 is still the same. However, the old data in the key has been lost as its been overridden with the data from B, and the key now duplicates the data stored in B. This loss of data on a `SELECT * .. PARTITION BY` needed fixing. Secondly, with new primitive key [work to remove the restriction on key column naming](confluentinc#3536), the same query semantics do not work. e.g. S1: A => B, C ```sql CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B; ``` S2: B => B, C This fails as the `B` value column clashes with the `B` key column! ##### The fix This commit fixes the PARTITION BY semantics so that any PARTITION BY on a specific column sees the old key column being moved to the value and the new key column being moved from the value to the key. For example, S1: A => B, C ```sql CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY B; ``` Results in the schema: S2: B => C, A. If a PARTITION BY is an expression other than a column reference, then ksql will synthesis a new column name. For example, S1: A => B, C ```sql CREATE STREAM S2 AS SELECT * FROM S1 PARTITION BY CAST(B AS INT); ``` Results in the schema: S2: KSQL_COL_0 => B, C, A. [This github issue](confluentinc#4813) will add the ability to use aliases in PARTITION BY expressions, allowing a custom name to be assigned.
This was referenced Mar 19, 2020
agavra
reviewed
Mar 19, 2020
ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java
Show resolved
Hide resolved
ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java
Show resolved
Hide resolved
...db-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java
Show resolved
Hide resolved
ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json
Show resolved
Hide resolved
agavra
approved these changes
Mar 20, 2020
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanations! LGTM
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
Fixes: #4749. (Note: the feature is currently disabled behind the 'allow any key column name' feature flag).
Background
This change fixes an issue with our repartition semantics.
Old style query semantics for partition by are broken:
S1: ROWKEY => B, C (Meaning S1 has a schema with ROWKEY as the key column, and B and C as value columns - types aren't important).
S2: ROWKEY => B, C
As you can see the schema of S2 is still the same. However, the old data in the key has been lost as its been overridden with the data from B, and the key now duplicates the data stored in B.
This loss of data on a
SELECT * .. PARTITION BY
needed fixing.Secondly, with new primitive key work to remove the restriction on key column naming, the same query semantics do not work. e.g.
S1: A => B, C
S2: B => B, C
This fails as the
B
value column clashes with theB
key column!The fix
This commit fixes the PARTITION BY semantics so that any PARTITION BY on a specific column sees the old key column being moved to the value and the new key column being moved from the value to the key. For example,
S1: A => B, C
Results in the schema: S2: B => C, A.
If a PARTITION BY is an expression other than a column reference, then ksql will synthesis a new column name. For example,
S1: A => B, C
Results in the schema: S2: KSQL_COL_0 => B, C, A.
This github issue will add the ability to use aliases in PARTITION BY expressions, allowing a custom name to be assigned.
The approach
There are main changes:
The LogicalPlanner has been updated to build the new repartitioned schema via a new
PartitionByParamsFactory
class.The streams topology is built differently by introducing a new version of the
SelectKey
plan step. The data passed is the same as the old version. However, the new version is handled by a new builder, which knows to build the streams topology in the right way.It would also have been possible to achieve the second step by adding a defaulted flag to the existing
SelectKey
step. However, it was felt that clear separation was better. This means once we go version 1.0 we can just delete the old V1 step, rather than trying to unpick the code that handled a boolean flag.Testing done
Usual.
Reviewer checklist