Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow ksqlDB to detect FK-join table-table join condition #7452

Merged
merged 13 commits into from
May 10, 2021
Merged

feat: Allow ksqlDB to detect FK-join table-table join condition #7452

merged 13 commits into from
May 10, 2021

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Apr 29, 2021

Description

First PR to add FK-join support. This PR only aims to detect the FK-join condition, and throws an informative error message for now. This PR also includes error message improvements for existing joins.

I am not happy with everything is this PR yet, but wanted to open it to get early feedback. In particular, we cannot handle the case when an expression is use in the join condition yet. -- I am also not sure if there might be some existing helper method that I could re-use to simplify the code.

Testing done

Added new QueryTranslationTest cases.

Missing: we still need to test with qualifiers and aliases. Should we add it to this PR, or do a follow up PR?

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@mjsax mjsax requested a review from a team as a code owner April 29, 2021 14:11

// added outer check to make `multi-joins.json` pass
// the test uses expressions -> need to follow up on this issue
if (root.getInfo().getRightJoinExpression() instanceof ColumnReferenceExp) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to fix this, to be able to process BinaryArithmeticExpressions and maybe others?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

joinOnNonKeyAttribute() should always return true if the expression is not a ColumnReferenceExp, right? Why don't we just pass the entire expression in and let joinOnNonKeyAttribute() perform the check?

Copy link
Member Author

@mjsax mjsax Apr 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was different. If we do t1 JOIN t2 ON t1.key = t2.key + 1 it's still a PK-join not a FK-join right?However, 1 JOIN t2 ON t1.a = t2.key + 1 would be a FK-joins. For both cases, it seems we would repartition t2 onto new key key + 1?

I am not sure what expression we support atm and what parsing on the expression we would need to do to cover all cases? I would assume that the expression can only contain a single "field/columnName"? So maybe it's sufficient to parse the full expression-tree and extract the single field?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is a ColumnReferenceExp then I think it would not be arithmetic, like t2.key + 1 atm (just reading through the sqlBase here).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If it's a ColumnReferenceExp we do the check, but if not we skip it, but IMHO we need to do the check even if it's not a ColumnReferenceExp. (For now, I just put this guard in place to avoid that multi-joins.json pass, but it's just a workaround to make it pass -- we need to properly address those cases.)

final String joinAttributeName = joinExpression.getColumnName().text();
final List<DataSourceNode> dataSourceNodes = node.getSourceNodes().collect(Collectors.toList());
final String singleAttributeKeyName;
if (dataSourceNodes.size() == 1) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the binary-join case, for which PlanNode access a stream/table directly (ie, no nested join in PlanNode)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know that we've already performed the self-join check by this point? If not, is it possible that we are looking at a PlanNode representing a join, rather than a single-data source? A more reliable way to perform this check would be to check whether the PlanNode is a JoinNode or a DataSourceNode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The self-join check happens before IIRC -- we have test in place about it that still pass so it seems we don't break anything at this point. Can try out to check the node type as suggested.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did look into this, but we cannot do the instanceof check, because the JoinNode is "hidden" by the ProJoinRepartitionNode (and not exposed).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we check for instanceOf JoiningNode instead of JoinNode?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node.getSource() instanceOf JoiningNode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... we have only a PlanNode at hand, but getSource() is a method on SingleSourcePlanNode (that extends PlanNode).

Also, digging a little more, it seems we could have either a JoinNode, or PreJoinRepartitionNode, or PreJoinProjectionNode here and it's unclear how many layer to go down to find a JoinNode.

Maybe better to hold off for now and keep the simple check as-is. We can still refactor in a follow up if we think its required.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About self-joins: I actually think they won't break this logic. We want to figure out if the join happens on the PK, and if there is self-join we may have just one data source, but it still seems to be correct to execute the else of this statement to figure out correctly if the join happens on the PK or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A self-join can change the primary key of the result. For example, a table-table outer self-join would result in a new, system-generated primary key. If this output is then used in another join, that join should check that we are joining on the PK of the output of the join, not the original PK, right? (Or is that not supported, in which case this isn't a concern from a correctness point of view; it'd just be confusing since the method name would be a bit misleading in certain cases.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Damn you, outer-joins...

@@ -296,8 +296,9 @@ private static SourceName getSourceName(final PlanNode node) {
final DataSourceType rightType) {

return joinerMap.getOrDefault(new Pair<>(leftType, rightType), () -> {
throw new KsqlException("Join between invalid operands requested: left type: "
+ leftType + ", right type: " + rightType);
throw new KsqlException(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a check in the new verifyJoin method and thus should never hit this line any longer, ie, the user should never see this error message again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If users will never hit it, whey do we throw it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could remove it but would hit a NPE (if there is a bug) for this case -- thus, it seems better to just throw if we hit a bug instead of crashing with NPE?

It also seems safer to just keep the exception to void introducing a regression by accident (ie, if we miss a case and user could still hit this, even if they should not).

Not sure how this is handled in ksqlDB normally?

@@ -1232,18 +1232,6 @@
]
}
},
{
"name": "to table using something other than key column",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is covered in the newly added stream-table-join.json now

@@ -1703,18 +1691,6 @@
]
}
},
{
"name": "on non-key table column",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above (note that this test seems to be redundant to the above one anyway?)

@@ -3096,7 +3072,7 @@
"statements": [
"CREATE TABLE T1 (ID INT PRIMARY KEY, VAL STRING) WITH (kafka_topic='t1', key_format='AVRO', value_format='JSON');",
"CREATE TABLE T2 (ID BIGINT PRIMARY KEY, FOO INT) WITH (kafka_topic='t2', key_format='AVRO', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT T2.ID, T1.VAL FROM T1 JOIN T2 ON T1.ID = T2.ID;"
"CREATE TABLE OUTPUT AS SELECT T2.ID, T1.VAL FROM T1 JOIN T2 ON T1.ID = T2.ID;"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side fix (was not an issue, as we fail before we verify the query output type)

@@ -3234,18 +3210,6 @@
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Implicit repartitioning of windowed sources is not supported. See https://github.com/confluentinc/ksql/issues/4385."
}
},
{
"name": "stream-table key-to-key with necessary repartition - SR-enabled key format",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test also seems redundant to the two from above?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is not redundant from the other two. We have special logic that permits repartitioning tables for joins on sources with SR-enabled key formats (in order to ensure copartitioning in light of potentially different schema IDs). This test ensures that this feature does not allow changing the primary key column.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand you comment correctly, the test does not test what it is supposed to test? With this PR, we would fail in verifyJoin already as we try to join on a non-key table attribute (T.VAL) that is always disallowed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the changes in this PR, yeah. I'm fine removing this test from here. It'd be nice to update the new test for this case (that we only allow joins on the primary key of the table) to use an SR-enabled key format as an extra safe-guard.

@@ -222,19 +222,6 @@
"message": "The query used to build `OUTPUT` must include the join expression ROWKEY in its projection."
}
},
{
"name": "full-full missing join expression in projection",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was invalid to begin with, because stream-table full outer joins are not supported.

@mjsax mjsax mentioned this pull request Apr 29, 2021
Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mjsax . Initial questions inline.


// added outer check to make `multi-joins.json` pass
// the test uses expressions -> need to follow up on this issue
if (root.getInfo().getRightJoinExpression() instanceof ColumnReferenceExp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

joinOnNonKeyAttribute() should always return true if the expression is not a ColumnReferenceExp, right? Why don't we just pass the entire expression in and let joinOnNonKeyAttribute() perform the check?

final List<DataSourceNode> dataSourceNodes = node.getSourceNodes().collect(Collectors.toList());
final String singleAttributeKeyName;
if (dataSourceNodes.size() == 1) {
singleAttributeKeyName = dataSourceNodes.get(0).getSchema().key().get(0).name().text();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we assuming there's a single key? Shouldn't we check whether there are multiple, and return true if so?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support joins on structured keys yet, and if one tries to do this, we should not reach this point in the code but fail earlier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding a TODO in case we forget to update it when adding multi-key join support.

final String joinAttributeName = joinExpression.getColumnName().text();
final List<DataSourceNode> dataSourceNodes = node.getSourceNodes().collect(Collectors.toList());
final String singleAttributeKeyName;
if (dataSourceNodes.size() == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know that we've already performed the self-join check by this point? If not, is it possible that we are looking at a PlanNode representing a join, rather than a single-data source? A more reliable way to perform this check would be to check whether the PlanNode is a JoinNode or a DataSourceNode.

} else {
final List<DataSourceNode> qualifiedNodes;

if (joinExpression.maybeQualifier().isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we manipulating qualifiers here? All we want is to know whether we're joining on the key column of the node, right? Why don't we check that more directly, similar to

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't manipulating qualifiers: if a qualifier is given, we limit the check for this single stream/table instead of the need to check all input schemas to find the attribute.

@@ -3234,18 +3210,6 @@
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Implicit repartitioning of windowed sources is not supported. See https://github.com/confluentinc/ksql/issues/4385."
}
},
{
"name": "stream-table key-to-key with necessary repartition - SR-enabled key format",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is not redundant from the other two. We have special logic that permits repartitioning tables for joins on sources with SR-enabled key formats (in order to ensure copartitioning in light of potentially different schema IDs). This test ensures that this feature does not allow changing the primary key column.

@mjsax mjsax requested review from spena and vcrfxia April 29, 2021 16:15

// added outer check to make `multi-joins.json` pass
// the test uses expressions -> need to follow up on this issue
if (root.getInfo().getRightJoinExpression() instanceof ColumnReferenceExp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is a ColumnReferenceExp then I think it would not be arithmetic, like t2.key + 1 atm (just reading through the sqlBase here).

private JoinKey buildJoinKey(final Join join) {
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove

final List<DataSourceNode> dataSourceNodes = node.getSourceNodes().collect(Collectors.toList());
final String singleAttributeKeyName;
if (dataSourceNodes.size() == 1) {
singleAttributeKeyName = dataSourceNodes.get(0).getSchema().key().get(0).name().text();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding a TODO in case we forget to update it when adding multi-key join support.

Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mjsax -- LGTM overall. Some minor comments/questions inline.

// at any level in the join tree, even after we add right-deep/bushy join tree support,
// because a FK-join output table has the same PK as its left input table
throw new KsqlException("Invalid join condition:"
+ " foreign-key table-table joins are not supported.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: (to clarify that there is nothing fundamentally unsupported about this, just that we haven't added support yet)

Suggested change
+ " foreign-key table-table joins are not supported.");
+ " foreign-key table-table joins are not yet supported.");

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we add them shortly anyway...

.collect(Collectors.toList());

if (allNodes.size() != 1) {
// double check this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove comment.

qualifiedNode = allNodes.get(0);
} else {
final List<DataSourceNode> allNodes = dataSourceNodes.stream()
.filter(n -> n.getSchema().columns().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use n.getSchema().findColumn() to simplify this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, as we have multiple source node, we first need to identify the correct source node base on the qualifier from the joinExpression, and can match the column name afterward . The ColumnName itself is just a plain String without qualifier and thus if we use n.getSchema().findColumn() we might find the column multiple times and/or in the wrong data source schema?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. Why is

.filter(n -> n.getSchema().columns().stream()
    .map(column -> column.name().text()).collect(Collectors.toList())
    .contains(joinAttributeName))

(as is currently written in the PR) different from

.filter(n -> n.getSchema().findColumn(ColumnName.of(joinAttributeName)).isPresent())

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad... I was looking into the wrong part of the code...

.collect(Collectors.toList());

if (allNodes.size() != 1) {
// double check this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove comment.

}

private boolean isInnerNode(final PlanNode node) {
return node.getSourceNodes().count() > 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still uncomfortable with the fact that this logic is brittle. If one day we decide to allow self-joins, then this breaks and it's very plausible that the person implementing self-joins won't realize they'll have broken the logic for detecting FK joins. Let's continue the discussion in #7452 (comment).

}
}
]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the test coverage! Can we verify that we either already have tests for, or add tests for the following:

  • ambiguous join attribute
  • invalid data source qualifier
  • invalid data source alias
  • positive test cases for qualifiers and aliases (hopefully these already exist)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems there is only tests for "field does not exist" in joins.json -- happy to add those test, but seems orthogonal to this PR to close test gaps?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that these tests should have already existed before this PR, but the fact that this PR changes logic that could impact these test cases means I'd feel more confident in the correctness of this PR if we had the test coverage. Feel free to add additional test coverage in a follow-up PR -- in general, these types of requests are not blocking, especially if there is other work blocked behind the functionality of the PR (which there is in this case).

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments while I'm looking at it :) didn't review the tests or really the algorithms part of it, just the code structure


// stream-table join detected

if (root.getInfo().getType().equals(JoinType.OUTER)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we checking this here? shouldn't this be enforced elsewhere? I'm asking because it's another place we'll need to update if we do somehow ever support it in the future and I'm not sure what value it adds

same for a few other conditions below (table-stream join)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are two places where we could check: in the Analyzer or here. However, it think that the LogicalPlanner may be the better place. From my understanding, the Analyzer won't even have enough context information?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my question was, where does it fail today?

Copy link
Member Author

@mjsax mjsax May 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to dig it out first... Currently, it fails later when we build the physical plan, ie, when we call StreamToTableJoiner#join(). (Will update the PR and remove the check from there...)

However, it seems much more reasonable to through in the logical planner... We should always ensure that we build a valid logical plan IMHO, and building the physical plan should never fail. Or is there any reason why we would need to fail when building the physical plan? Cannot think of any reason.

Copy link
Contributor

@agavra agavra May 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for digging into that!

I think I disagree here - the limitation is at the physical layer, not logical, so it's not unreasonable to throw at that level (imagine we had different back ends, say a pull query executor that just uses a network shuffle - would this limitation apply there?)

EDIT (addendum): to me, if it logically doesn't make sense (no physical plan would ever be able to implement it) then we should fail when we build the logical plan. If it's a technical limitation, it makes sense to fail at the physcial plan

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a stream-table join, a OUTER join is not defined, so it is really a logical issue. (If we could define it, KafkaStreams would have implemented it).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense then!

Comment on lines 593 to 613
if (joinOnNonKeyAttribute(root.getInfo().getRightJoinExpression(), right)) {
throw new KsqlException("Invalid join condition:"
+ " stream-table joins require to join on the table's primary key.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should also never happen, right? the stream will be re-partitioned such that we'll always join on the table key

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have ON stream.attributes = table.someAttribute this conditions verifies table.someAttribute and would throw, because we can only use table.key for stream-table join. It does not matter here, if we use the stream-side key or some other attribute.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it! (same comment as below then, let's call out what the key is and what we tried to join on)

Comment on lines 610 to 611
throw new KsqlException("Invalid join condition:"
+ " table-table joins require to join on the primary key of the right input table."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this more actionable by also outputting what the join criteria was and what the keys of each are?

}
final ColumnReferenceExp simpleJoinExpression = (ColumnReferenceExp) joinExpression;

final String joinAttributeName = simpleJoinExpression.getColumnName().text();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid using .text()? making all the Strings type-safe was a goal of wrapping them all in classes and we really shouldn't be comparing ColumnName to anything other than that (same for SourceName below)

}
}

private boolean joinOnNonKeyAttribute(final Expression joinExpression,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without looking into the code, it feels to me like this should be part of JoinTree logic (and we should output this information in JoinTree.Node). Ideally, anything that has to do with "resolving" a Join should be outside of LogicalPlanner since this class is already getting a little unwieldly before this change - and also I think we can leverage the recursion used there to make this a little cleaner

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+10 here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we might want to do some refactoring, but it should not be part of this PR IMHO.

In particular, the logical planner should use a visitor pattern IMHO to build the logical plan. Just pushing out "random methods" into "random helper" classed, does make LogicalPlanner shorter, but does not really improve the code structure IMHO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to defend my original comment 😝 I don't think JoinTree is a random helper, it already has the logic to extract keys and traverse N-way join trees. That's why I thought this would fit in there. Up to you if you feel there's a better way and when you want to do it (which PR)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I keep it in mind but don't want to hold off this PR any longer, as it blocks follow up PRs for FK-joins.

qualifier
));
}
qualifiedNode = allNodes.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Iterables.getOnlyElement is useful to make sure this survives refactoring


// n-way join sub-tree (ie, not a leaf)
if (isInnerNode(node)) {
final DataSourceNode qualifiedNode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding: is DataSourceNode the only node type whose getSources() return empty today?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. (Double checked the code).

Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mjsax -- great test coverage! Couple minor points inline but don't let those block you from merging this since this PR is blocking other work.

throw new KsqlException(String.format(
"Invalid join condition:"
+ " table-table joins require to join on the primary key of the right input"
+ " table: %s = %s.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message seems confusing. It's printing what the user passed in after the colon, so a user might think that they're already passing in a valid join condition even though that's not the case. Ideally we'd clarify by printing what the "primary key of the right input" is but that's more work. A minor clarification in the meantime could be the following:

Suggested change
+ " table: %s = %s.",
+ " table. Got: %s = %s.",

Copy link
Member Author

@mjsax mjsax May 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"primary key of the right input

I tried that but it's not so simple, because there might be "tree"... (And we cannot use the "output schema" of the repartition node, because it might have different keys than the sources...)

if (joinOnNonKeyAttribute(rightExpression, rightNode)) {
throw new KsqlException(String.format(
"Invalid join condition:"
+ " stream-table joins require to join on the table's primary key: %s = %s.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above (below).

@vcrfxia vcrfxia merged commit 344d36d into confluentinc:master May 10, 2021
@mjsax mjsax deleted the add-fk-join branch May 10, 2021 22:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants