-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Allow ksqlDB to detect FK-join table-table join condition #7452
Conversation
|
||
// added outer check to make `multi-joins.json` pass | ||
// the test uses expressions -> need to follow up on this issue | ||
if (root.getInfo().getRightJoinExpression() instanceof ColumnReferenceExp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to fix this, to be able to process BinaryArithmeticExpressions
and maybe others?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
joinOnNonKeyAttribute()
should always return true if the expression is not a ColumnReferenceExp
, right? Why don't we just pass the entire expression in and let joinOnNonKeyAttribute()
perform the check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding was different. If we do t1 JOIN t2 ON t1.key = t2.key + 1
it's still a PK-join not a FK-join right?However, 1 JOIN t2 ON t1.a = t2.key + 1
would be a FK-joins. For both cases, it seems we would repartition t2
onto new key key + 1
?
I am not sure what expression we support atm and what parsing on the expression we would need to do to cover all cases? I would assume that the expression can only contain a single "field/columnName"? So maybe it's sufficient to parse the full expression-tree and extract the single field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is a ColumnReferenceExp
then I think it would not be arithmetic, like t2.key + 1
atm (just reading through the sqlBase here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. If it's a ColumnReferenceExp
we do the check, but if not we skip it, but IMHO we need to do the check even if it's not a ColumnReferenceExp
. (For now, I just put this guard in place to avoid that multi-joins.json
pass, but it's just a workaround to make it pass -- we need to properly address those cases.)
final String joinAttributeName = joinExpression.getColumnName().text(); | ||
final List<DataSourceNode> dataSourceNodes = node.getSourceNodes().collect(Collectors.toList()); | ||
final String singleAttributeKeyName; | ||
if (dataSourceNodes.size() == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the binary-join case, for which PlanNode
access a stream/table directly (ie, no nested join in PlanNode)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know that we've already performed the self-join check by this point? If not, is it possible that we are looking at a PlanNode representing a join, rather than a single-data source? A more reliable way to perform this check would be to check whether the PlanNode is a JoinNode or a DataSourceNode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The self-join check happens before IIRC -- we have test in place about it that still pass so it seems we don't break anything at this point. Can try out to check the node type as suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did look into this, but we cannot do the instanceof
check, because the JoinNode
is "hidden" by the ProJoinRepartitionNode
(and not exposed).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we check for instanceOf
JoiningNode
instead of JoinNode
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
node.getSource() instanceOf JoiningNode
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... we have only a PlanNode
at hand, but getSource()
is a method on SingleSourcePlanNode
(that extends PlanNode
).
Also, digging a little more, it seems we could have either a JoinNode
, or PreJoinRepartitionNode
, or PreJoinProjectionNode
here and it's unclear how many layer to go down to find a JoinNode
.
Maybe better to hold off for now and keep the simple check as-is. We can still refactor in a follow up if we think its required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About self-joins: I actually think they won't break this logic. We want to figure out if the join happens on the PK, and if there is self-join we may have just one data source, but it still seems to be correct to execute the else
of this statement to figure out correctly if the join happens on the PK or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A self-join can change the primary key of the result. For example, a table-table outer self-join would result in a new, system-generated primary key. If this output is then used in another join, that join should check that we are joining on the PK of the output of the join, not the original PK, right? (Or is that not supported, in which case this isn't a concern from a correctness point of view; it'd just be confusing since the method name would be a bit misleading in certain cases.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Damn you, outer-joins...
@@ -296,8 +296,9 @@ private static SourceName getSourceName(final PlanNode node) { | |||
final DataSourceType rightType) { | |||
|
|||
return joinerMap.getOrDefault(new Pair<>(leftType, rightType), () -> { | |||
throw new KsqlException("Join between invalid operands requested: left type: " | |||
+ leftType + ", right type: " + rightType); | |||
throw new KsqlException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a check in the new verifyJoin
method and thus should never hit this line any longer, ie, the user should never see this error message again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users will never hit it, whey do we throw it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could remove it but would hit a NPE (if there is a bug) for this case -- thus, it seems better to just throw if we hit a bug instead of crashing with NPE?
It also seems safer to just keep the exception to void introducing a regression by accident (ie, if we miss a case and user could still hit this, even if they should not).
Not sure how this is handled in ksqlDB normally?
@@ -1232,18 +1232,6 @@ | |||
] | |||
} | |||
}, | |||
{ | |||
"name": "to table using something other than key column", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is covered in the newly added stream-table-join.json
now
@@ -1703,18 +1691,6 @@ | |||
] | |||
} | |||
}, | |||
{ | |||
"name": "on non-key table column", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above (note that this test seems to be redundant to the above one anyway?)
@@ -3096,7 +3072,7 @@ | |||
"statements": [ | |||
"CREATE TABLE T1 (ID INT PRIMARY KEY, VAL STRING) WITH (kafka_topic='t1', key_format='AVRO', value_format='JSON');", | |||
"CREATE TABLE T2 (ID BIGINT PRIMARY KEY, FOO INT) WITH (kafka_topic='t2', key_format='AVRO', value_format='JSON');", | |||
"CREATE STREAM OUTPUT AS SELECT T2.ID, T1.VAL FROM T1 JOIN T2 ON T1.ID = T2.ID;" | |||
"CREATE TABLE OUTPUT AS SELECT T2.ID, T1.VAL FROM T1 JOIN T2 ON T1.ID = T2.ID;" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side fix (was not an issue, as we fail before we verify the query output type)
@@ -3234,18 +3210,6 @@ | |||
"type": "io.confluent.ksql.util.KsqlStatementException", | |||
"message": "Implicit repartitioning of windowed sources is not supported. See https://github.com/confluentinc/ksql/issues/4385." | |||
} | |||
}, | |||
{ | |||
"name": "stream-table key-to-key with necessary repartition - SR-enabled key format", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test also seems redundant to the two from above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is not redundant from the other two. We have special logic that permits repartitioning tables for joins on sources with SR-enabled key formats (in order to ensure copartitioning in light of potentially different schema IDs). This test ensures that this feature does not allow changing the primary key column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand you comment correctly, the test does not test what it is supposed to test? With this PR, we would fail in verifyJoin
already as we try to join on a non-key table attribute (T.VAL
) that is always disallowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the changes in this PR, yeah. I'm fine removing this test from here. It'd be nice to update the new test for this case (that we only allow joins on the primary key of the table) to use an SR-enabled key format as an extra safe-guard.
@@ -222,19 +222,6 @@ | |||
"message": "The query used to build `OUTPUT` must include the join expression ROWKEY in its projection." | |||
} | |||
}, | |||
{ | |||
"name": "full-full missing join expression in projection", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was invalid to begin with, because stream-table full outer joins are not supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax . Initial questions inline.
|
||
// added outer check to make `multi-joins.json` pass | ||
// the test uses expressions -> need to follow up on this issue | ||
if (root.getInfo().getRightJoinExpression() instanceof ColumnReferenceExp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
joinOnNonKeyAttribute()
should always return true if the expression is not a ColumnReferenceExp
, right? Why don't we just pass the entire expression in and let joinOnNonKeyAttribute()
perform the check?
final List<DataSourceNode> dataSourceNodes = node.getSourceNodes().collect(Collectors.toList()); | ||
final String singleAttributeKeyName; | ||
if (dataSourceNodes.size() == 1) { | ||
singleAttributeKeyName = dataSourceNodes.get(0).getSchema().key().get(0).name().text(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we assuming there's a single key? Shouldn't we check whether there are multiple, and return true if so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't support joins on structured keys yet, and if one tries to do this, we should not reach this point in the code but fail earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe adding a TODO in case we forget to update it when adding multi-key join support.
final String joinAttributeName = joinExpression.getColumnName().text(); | ||
final List<DataSourceNode> dataSourceNodes = node.getSourceNodes().collect(Collectors.toList()); | ||
final String singleAttributeKeyName; | ||
if (dataSourceNodes.size() == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know that we've already performed the self-join check by this point? If not, is it possible that we are looking at a PlanNode representing a join, rather than a single-data source? A more reliable way to perform this check would be to check whether the PlanNode is a JoinNode or a DataSourceNode.
} else { | ||
final List<DataSourceNode> qualifiedNodes; | ||
|
||
if (joinExpression.maybeQualifier().isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we manipulating qualifiers here? All we want is to know whether we're joining on the key column of the node, right? Why don't we check that more directly, similar to
private ColumnName getKeyColumnName() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't manipulating qualifiers: if a qualifier is given, we limit the check for this single stream/table instead of the need to check all input schemas to find the attribute.
@@ -3234,18 +3210,6 @@ | |||
"type": "io.confluent.ksql.util.KsqlStatementException", | |||
"message": "Implicit repartitioning of windowed sources is not supported. See https://github.com/confluentinc/ksql/issues/4385." | |||
} | |||
}, | |||
{ | |||
"name": "stream-table key-to-key with necessary repartition - SR-enabled key format", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is not redundant from the other two. We have special logic that permits repartitioning tables for joins on sources with SR-enabled key formats (in order to ensure copartitioning in light of potentially different schema IDs). This test ensures that this feature does not allow changing the primary key column.
|
||
// added outer check to make `multi-joins.json` pass | ||
// the test uses expressions -> need to follow up on this issue | ||
if (root.getInfo().getRightJoinExpression() instanceof ColumnReferenceExp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is a ColumnReferenceExp
then I think it would not be arithmetic, like t2.key + 1
atm (just reading through the sqlBase here).
private JoinKey buildJoinKey(final Join join) { | ||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove
final List<DataSourceNode> dataSourceNodes = node.getSourceNodes().collect(Collectors.toList()); | ||
final String singleAttributeKeyName; | ||
if (dataSourceNodes.size() == 1) { | ||
singleAttributeKeyName = dataSourceNodes.get(0).getSchema().key().get(0).name().text(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe adding a TODO in case we forget to update it when adding multi-key join support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax -- LGTM overall. Some minor comments/questions inline.
// at any level in the join tree, even after we add right-deep/bushy join tree support, | ||
// because a FK-join output table has the same PK as its left input table | ||
throw new KsqlException("Invalid join condition:" | ||
+ " foreign-key table-table joins are not supported."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: (to clarify that there is nothing fundamentally unsupported about this, just that we haven't added support yet)
+ " foreign-key table-table joins are not supported."); | |
+ " foreign-key table-table joins are not yet supported."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we add them shortly anyway...
.collect(Collectors.toList()); | ||
|
||
if (allNodes.size() != 1) { | ||
// double check this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove comment.
qualifiedNode = allNodes.get(0); | ||
} else { | ||
final List<DataSourceNode> allNodes = dataSourceNodes.stream() | ||
.filter(n -> n.getSchema().columns().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use n.getSchema().findColumn()
to simplify this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, as we have multiple source node, we first need to identify the correct source node base on the qualifier from the joinExpression
, and can match the column name afterward . The ColumnName
itself is just a plain String
without qualifier and thus if we use n.getSchema().findColumn()
we might find the column multiple times and/or in the wrong data source schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand. Why is
.filter(n -> n.getSchema().columns().stream()
.map(column -> column.name().text()).collect(Collectors.toList())
.contains(joinAttributeName))
(as is currently written in the PR) different from
.filter(n -> n.getSchema().findColumn(ColumnName.of(joinAttributeName)).isPresent())
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad... I was looking into the wrong part of the code...
.collect(Collectors.toList()); | ||
|
||
if (allNodes.size() != 1) { | ||
// double check this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove comment.
ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private boolean isInnerNode(final PlanNode node) { | ||
return node.getSourceNodes().count() > 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still uncomfortable with the fact that this logic is brittle. If one day we decide to allow self-joins, then this breaks and it's very plausible that the person implementing self-joins won't realize they'll have broken the logic for detecting FK joins. Let's continue the discussion in #7452 (comment).
} | ||
} | ||
] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the test coverage! Can we verify that we either already have tests for, or add tests for the following:
- ambiguous join attribute
- invalid data source qualifier
- invalid data source alias
- positive test cases for qualifiers and aliases (hopefully these already exist)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems there is only tests for "field does not exist" in joins.json
-- happy to add those test, but seems orthogonal to this PR to close test gaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that these tests should have already existed before this PR, but the fact that this PR changes logic that could impact these test cases means I'd feel more confident in the correctness of this PR if we had the test coverage. Feel free to add additional test coverage in a follow-up PR -- in general, these types of requests are not blocking, especially if there is other work blocked behind the functionality of the PR (which there is in this case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments while I'm looking at it :) didn't review the tests or really the algorithms part of it, just the code structure
|
||
// stream-table join detected | ||
|
||
if (root.getInfo().getType().equals(JoinType.OUTER)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we checking this here? shouldn't this be enforced elsewhere? I'm asking because it's another place we'll need to update if we do somehow ever support it in the future and I'm not sure what value it adds
same for a few other conditions below (table-stream join)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are two places where we could check: in the Analyzer
or here. However, it think that the LogicalPlanner may be the better place. From my understanding, the Analyzer
won't even have enough context information?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess my question was, where does it fail today?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed to dig it out first... Currently, it fails later when we build the physical plan, ie, when we call StreamToTableJoiner#join()
. (Will update the PR and remove the check from there...)
However, it seems much more reasonable to through in the logical planner... We should always ensure that we build a valid logical plan IMHO, and building the physical plan should never fail. Or is there any reason why we would need to fail when building the physical plan? Cannot think of any reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for digging into that!
I think I disagree here - the limitation is at the physical layer, not logical, so it's not unreasonable to throw at that level (imagine we had different back ends, say a pull query executor that just uses a network shuffle - would this limitation apply there?)
EDIT (addendum): to me, if it logically doesn't make sense (no physical plan would ever be able to implement it) then we should fail when we build the logical plan. If it's a technical limitation, it makes sense to fail at the physcial plan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a stream-table join, a OUTER join is not defined, so it is really a logical issue. (If we could define it, KafkaStreams would have implemented it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense then!
if (joinOnNonKeyAttribute(root.getInfo().getRightJoinExpression(), right)) { | ||
throw new KsqlException("Invalid join condition:" | ||
+ " stream-table joins require to join on the table's primary key."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should also never happen, right? the stream will be re-partitioned such that we'll always join on the table key
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have ON stream.attributes = table.someAttribute
this conditions verifies table.someAttribute
and would throw, because we can only use table.key
for stream-table join. It does not matter here, if we use the stream-side key or some other attribute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it! (same comment as below then, let's call out what the key is and what we tried to join on)
throw new KsqlException("Invalid join condition:" | ||
+ " table-table joins require to join on the primary key of the right input table." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this more actionable by also outputting what the join criteria was and what the keys of each are?
} | ||
final ColumnReferenceExp simpleJoinExpression = (ColumnReferenceExp) joinExpression; | ||
|
||
final String joinAttributeName = simpleJoinExpression.getColumnName().text(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we avoid using .text()
? making all the Strings type-safe was a goal of wrapping them all in classes and we really shouldn't be comparing ColumnName
to anything other than that (same for SourceName
below)
} | ||
} | ||
|
||
private boolean joinOnNonKeyAttribute(final Expression joinExpression, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without looking into the code, it feels to me like this should be part of JoinTree
logic (and we should output this information in JoinTree.Node
). Ideally, anything that has to do with "resolving" a Join should be outside of LogicalPlanner since this class is already getting a little unwieldly before this change - and also I think we can leverage the recursion used there to make this a little cleaner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+10 here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we might want to do some refactoring, but it should not be part of this PR IMHO.
In particular, the logical planner should use a visitor pattern IMHO to build the logical plan. Just pushing out "random methods" into "random helper" classed, does make LogicalPlanner
shorter, but does not really improve the code structure IMHO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to defend my original comment 😝 I don't think JoinTree
is a random helper, it already has the logic to extract keys and traverse N-way join trees. That's why I thought this would fit in there. Up to you if you feel there's a better way and when you want to do it (which PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I keep it in mind but don't want to hold off this PR any longer, as it blocks follow up PRs for FK-joins.
qualifier | ||
)); | ||
} | ||
qualifiedNode = allNodes.get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Iterables.getOnlyElement
is useful to make sure this survives refactoring
|
||
// n-way join sub-tree (ie, not a leaf) | ||
if (isInnerNode(node)) { | ||
final DataSourceNode qualifiedNode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own understanding: is DataSourceNode
the only node type whose getSources()
return empty today?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. (Double checked the code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax -- great test coverage! Couple minor points inline but don't let those block you from merging this since this PR is blocking other work.
throw new KsqlException(String.format( | ||
"Invalid join condition:" | ||
+ " table-table joins require to join on the primary key of the right input" | ||
+ " table: %s = %s.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error message seems confusing. It's printing what the user passed in after the colon, so a user might think that they're already passing in a valid join condition even though that's not the case. Ideally we'd clarify by printing what the "primary key of the right input" is but that's more work. A minor clarification in the meantime could be the following:
+ " table: %s = %s.", | |
+ " table. Got: %s = %s.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"primary key of the right input
I tried that but it's not so simple, because there might be "tree"... (And we cannot use the "output schema" of the repartition node, because it might have different keys than the sources...)
if (joinOnNonKeyAttribute(rightExpression, rightNode)) { | ||
throw new KsqlException(String.format( | ||
"Invalid join condition:" | ||
+ " stream-table joins require to join on the table's primary key: %s = %s.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above (below).
Fixed bug exposed by test Updated tests
Description
First PR to add FK-join support. This PR only aims to detect the FK-join condition, and throws an informative error message for now. This PR also includes error message improvements for existing joins.
I am not happy with everything is this PR yet, but wanted to open it to get early feedback. In particular, we cannot handle the case when an expression is use in the join condition yet. -- I am also not sure if there might be some existing helper method that I could re-use to simplify the code.
Testing done
Added new QueryTranslationTest cases.
Missing: we still need to test with qualifiers and aliases. Should we add it to this PR, or do a follow up PR?
Reviewer checklist