Skip to content

Commit

Permalink
test: don't allow FK-outer joins and add QTT cases for FK-join with f…
Browse files Browse the repository at this point in the history
…lipped join condition (#7526)
  • Loading branch information
mjsax authored May 18, 2021
1 parent d7b4bbc commit b073bbc
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -400,13 +400,15 @@ public static final class JoinInfo {
private final Optional<WithinExpression> withinExpression;
private final AliasedDataSource leftSource;
private final AliasedDataSource rightSource;
private final boolean flippedJoinCondition;

JoinInfo(
final AliasedDataSource leftSource,
final Expression leftJoinExpression,
final AliasedDataSource rightSource,
final Expression rightJoinExpression,
final JoinType type,
final boolean flippedJoinCondition,
final Optional<WithinExpression> withinExpression

) {
Expand All @@ -415,6 +417,7 @@ public static final class JoinInfo {
this.leftJoinExpression = requireNonNull(leftJoinExpression, "leftJoinExpression");
this.rightJoinExpression = requireNonNull(rightJoinExpression, "rightJoinExpression");
this.type = requireNonNull(type, "type");
this.flippedJoinCondition = flippedJoinCondition;
this.withinExpression = requireNonNull(withinExpression, "withinExpression");
}

Expand All @@ -438,6 +441,10 @@ public JoinType getType() {
return type;
}

public boolean hasFlippedJoinCondition() {
return flippedJoinCondition;
}

public Optional<WithinExpression> getWithinExpression() {
return withinExpression;
}
Expand All @@ -449,6 +456,7 @@ public JoinInfo flip() {
leftSource,
leftJoinExpression,
type,
true,
withinExpression
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ && isEqualityJoin(((LogicalBinaryExpression) joinExp).getRight())) {
right,
comparisonExpression.getRight(),
joinType,
flipped,
node.getWithinExpression()
);
analysis.addJoin(flipped ? joinInfo.flip() : joinInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,8 @@ private Optional<ColumnReferenceExp> verifyJoin(
"Invalid join condition:"
+ " stream-table joins require to join on the table's primary key."
+ " Got %s = %s.",
leftExpression,
rightExpression
joinInfo.hasFlippedJoinCondition() ? rightExpression : leftExpression,
joinInfo.hasFlippedJoinCondition() ? leftExpression : rightExpression
));
}
}
Expand All @@ -692,8 +692,8 @@ private Optional<ColumnReferenceExp> verifyJoin(
"Invalid join condition:"
+ " table-table joins require to join on the primary key of the right input"
+ " table. Got %s = %s.",
leftExpression,
rightExpression
joinInfo.hasFlippedJoinCondition() ? rightExpression : leftExpression,
joinInfo.hasFlippedJoinCondition() ? leftExpression : rightExpression
));
}

Expand All @@ -702,6 +702,17 @@ private Optional<ColumnReferenceExp> verifyJoin(

if (ksqlConfig.getBoolean(KsqlConfig.KSQL_FOREIGN_KEY_JOINS_ENABLED)) {

if (joinInfo.getType().equals(JoinType.OUTER)) {
throw new KsqlException(String.format(
"Invalid join type:"
+ " full-outer join not supported for foreign-key table-table join."
+ " Got %s %s %s.",
joinInfo.getLeftSource().getDataSource().getName().text(),
joinType,
joinInfo.getRightSource().getDataSource().getName().text()
));
}

// after we lift this n-way join restriction, we should be able to support FK-joins
// at any level in the join tree, even after we add right-deep/bushy join tree support,
// because a FK-join output table has the same PK as its left input table
Expand All @@ -710,8 +721,8 @@ private Optional<ColumnReferenceExp> verifyJoin(
throw new KsqlException(String.format(
"Invalid join condition: foreign-key table-table joins are not "
+ "supported as part of n-way joins. Got %s = %s.",
leftExpression,
rightExpression
joinInfo.hasFlippedJoinCondition() ? rightExpression : leftExpression,
joinInfo.hasFlippedJoinCondition() ? leftExpression : rightExpression
));
}

Expand All @@ -727,8 +738,8 @@ private Optional<ColumnReferenceExp> verifyJoin(
throw new KsqlException(String.format(
"Invalid join condition:"
+ " foreign-key table-table joins are not supported. Got %s = %s.",
leftExpression,
rightExpression
joinInfo.hasFlippedJoinCondition() ? rightExpression : leftExpression,
joinInfo.hasFlippedJoinCondition() ? leftExpression : rightExpression
));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,99 +1,105 @@
{
"tests": [
{
"name": "Should fail on right non-key attribute for inner-join",
"name": "Should fail on left non-key attribute for inner-join",
"statements": [
"CREATE TABLE left_table (id1 BIGINT PRIMARY KEY, f1 BIGINT) WITH (kafka_topic='left_topic', format='JSON');",
"CREATE TABLE right_table (id2 BIGINT PRIMARY KEY, f2 BIGINT) WITH (kafka_topic='right_topic', format='JSON');",
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table JOIN right_table ON id1 = f2;"
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table JOIN right_table ON f1 = id2;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Invalid join condition: table-table joins require to join on the primary key of the right input table. Got LEFT_TABLE.ID1 = RIGHT_TABLE.F2."
"message": "Invalid join condition: foreign-key table-table joins are not supported. Got LEFT_TABLE.F1 = RIGHT_TABLE.ID2."
}
},
{
"name": "Should fail on right non-key attribute for left-join",
"name": "Should fail on left non-key attribute for inner-join -- revers join condition order",
"statements": [
"CREATE TABLE left_table (id1 BIGINT PRIMARY KEY, f1 BIGINT) WITH (kafka_topic='left_topic', format='JSON');",
"CREATE TABLE right_table (id2 BIGINT PRIMARY KEY, f2 BIGINT) WITH (kafka_topic='right_topic', format='JSON');",
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table LEFT JOIN right_table ON id1 = f2;"
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table JOIN right_table ON id2 = f1;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Invalid join condition: table-table joins require to join on the primary key of the right input table. Got LEFT_TABLE.ID1 = RIGHT_TABLE.F2."
"message": "Invalid join condition: foreign-key table-table joins are not supported. Got RIGHT_TABLE.ID2 = LEFT_TABLE.F1."
}
},
{
"name": "Should fail on right non-key attribute for outer-join",
"name": "Should fail on left non-key attribute for left-join",
"statements": [
"CREATE TABLE left_table (id1 BIGINT PRIMARY KEY, f1 BIGINT) WITH (kafka_topic='left_topic', format='JSON');",
"CREATE TABLE right_table (id2 BIGINT PRIMARY KEY, f2 BIGINT) WITH (kafka_topic='right_topic', format='JSON');",
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table FULL OUTER JOIN right_table ON id1 = f2;"
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table LEFT JOIN right_table ON f1 = id2;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Invalid join condition: table-table joins require to join on the primary key of the right input table. Got LEFT_TABLE.ID1 = RIGHT_TABLE.F2."
"message": "Invalid join condition: foreign-key table-table joins are not supported. Got LEFT_TABLE.F1 = RIGHT_TABLE.ID2."
}
},
{
"name": "Should fail on left non-key attribute for inner-join",
"name": "Should fail on left non-key attribute for left-join -- revers join condition order",
"statements": [
"CREATE TABLE left_table (id1 BIGINT PRIMARY KEY, f1 BIGINT) WITH (kafka_topic='left_topic', format='JSON');",
"CREATE TABLE right_table (id2 BIGINT PRIMARY KEY, f2 BIGINT) WITH (kafka_topic='right_topic', format='JSON');",
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table JOIN right_table ON f1 = id2;"
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table LEFT JOIN right_table ON id2 = f1;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Invalid join condition: foreign-key table-table joins are not supported. Got LEFT_TABLE.F1 = RIGHT_TABLE.ID2."
"message": "Invalid join condition: foreign-key table-table joins are not supported. Got RIGHT_TABLE.ID2 = LEFT_TABLE.F1."
}
},
{
"name": "Should fail on left non-key attribute for left-join",
"name": "Should fail on left non-key attribute for outer-join",
"statements": [
"CREATE TABLE left_table (id1 BIGINT PRIMARY KEY, f1 BIGINT) WITH (kafka_topic='left_topic', format='JSON');",
"CREATE TABLE right_table (id2 BIGINT PRIMARY KEY, f2 BIGINT) WITH (kafka_topic='right_topic', format='JSON');",
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table LEFT JOIN right_table ON f1 = id2;"
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table FULL OUTER JOIN right_table ON f1 = id2;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Invalid join condition: foreign-key table-table joins are not supported. Got LEFT_TABLE.F1 = RIGHT_TABLE.ID2."
}
},
{
"name": "Should fail on left non-key attribute for outer-join",
"name": "Should fail on left non-key attribute for outer-join -- revers join condition order",
"statements": [
"CREATE TABLE left_table (id1 BIGINT PRIMARY KEY, f1 BIGINT) WITH (kafka_topic='left_topic', format='JSON');",
"CREATE TABLE right_table (id2 BIGINT PRIMARY KEY, f2 BIGINT) WITH (kafka_topic='right_topic', format='JSON');",
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table FULL OUTER JOIN right_table ON f1 = id2;"
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table FULL OUTER JOIN right_table ON id2 = f1;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Invalid join condition: foreign-key table-table joins are not supported. Got LEFT_TABLE.F1 = RIGHT_TABLE.ID2."
"message": "Invalid join condition: foreign-key table-table joins are not supported. Got RIGHT_TABLE.ID2 = LEFT_TABLE.F1."
}
},
{
"name": "Should fail on right non-key attribute for inner-join with qualifiers",
"name": "Should fail on outer-join",
"properties": {
"ksql.joins.foreign.key.enable": true
},
"statements": [
"CREATE TABLE left_table (id1 BIGINT PRIMARY KEY, f1 BIGINT) WITH (kafka_topic='left_topic', format='JSON');",
"CREATE TABLE right_table (id2 BIGINT PRIMARY KEY, f2 BIGINT) WITH (kafka_topic='right_topic', format='JSON');",
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table JOIN right_table ON left_table.id1 = right_table.f2;"
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table FULL OUTER JOIN right_table ON f1 = id2;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Invalid join condition: table-table joins require to join on the primary key of the right input table. Got LEFT_TABLE.ID1 = RIGHT_TABLE.F2."
"message": "Invalid join type: full-outer join not supported for foreign-key table-table join. Got LEFT_TABLE [FULL] OUTER JOIN RIGHT_TABLE."
}
},
{
"name": "Should fail on right non-key attribute for inner-join with alias",
"name": "Should fail on outer-join -- reverse join condition",
"properties": {
"ksql.joins.foreign.key.enable": true
},
"statements": [
"CREATE TABLE left_table (id1 BIGINT PRIMARY KEY, f1 BIGINT) WITH (kafka_topic='left_topic', format='JSON');",
"CREATE TABLE right_table (id2 BIGINT PRIMARY KEY, f2 BIGINT) WITH (kafka_topic='right_topic', format='JSON');",
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table AS lt JOIN right_table AS rt ON lt.id1 = rt.f2;"
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table FULL OUTER JOIN right_table ON id2 = f1;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Invalid join condition: table-table joins require to join on the primary key of the right input table. Got LT.ID1 = RT.F2."
"message": "Invalid join type: full-outer join not supported for foreign-key table-table join. Got LEFT_TABLE [FULL] OUTER JOIN RIGHT_TABLE."
}
},
{
Expand Down Expand Up @@ -136,101 +142,6 @@
"message": "Invalid join condition: foreign-key table-table joins are not supported. Got LEFT_TABLE.F1 = RIGHT_TABLE.ID3."
}
},
{
"name": "Should fail on right non-key attribute for inner-join - feature flag enabled",
"properties": {
"ksql.joins.foreign.key.enable": true
},
"comments": [
"TODO: this test should be deleted once the feature is ungated and the feature flag is removed",
"as it duplicates another test above (without the feature flag)"
],
"statements": [
"CREATE TABLE left_table (id1 BIGINT PRIMARY KEY, f1 BIGINT) WITH (kafka_topic='left_topic', format='JSON');",
"CREATE TABLE right_table (id2 BIGINT PRIMARY KEY, f2 BIGINT) WITH (kafka_topic='right_topic', format='JSON');",
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table JOIN right_table ON id1 = f2;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Invalid join condition: table-table joins require to join on the primary key of the right input table. Got LEFT_TABLE.ID1 = RIGHT_TABLE.F2."
}
},
{
"name": "Should fail on right non-key attribute for left-join - feature flag enabled",
"properties": {
"ksql.joins.foreign.key.enable": true
},
"comments": [
"TODO: this test should be deleted once the feature is ungated and the feature flag is removed",
"as it duplicates another test above (without the feature flag)"
],
"statements": [
"CREATE TABLE left_table (id1 BIGINT PRIMARY KEY, f1 BIGINT) WITH (kafka_topic='left_topic', format='JSON');",
"CREATE TABLE right_table (id2 BIGINT PRIMARY KEY, f2 BIGINT) WITH (kafka_topic='right_topic', format='JSON');",
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table LEFT JOIN right_table ON id1 = f2;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Invalid join condition: table-table joins require to join on the primary key of the right input table. Got LEFT_TABLE.ID1 = RIGHT_TABLE.F2."
}
},
{
"name": "Should fail on right non-key attribute for outer-join - feature flag enabled",
"properties": {
"ksql.joins.foreign.key.enable": true
},
"comments": [
"TODO: this test should be deleted once the feature is ungated and the feature flag is removed",
"as it duplicates another test above (without the feature flag)"
],
"statements": [
"CREATE TABLE left_table (id1 BIGINT PRIMARY KEY, f1 BIGINT) WITH (kafka_topic='left_topic', format='JSON');",
"CREATE TABLE right_table (id2 BIGINT PRIMARY KEY, f2 BIGINT) WITH (kafka_topic='right_topic', format='JSON');",
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table FULL OUTER JOIN right_table ON id1 = f2;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Invalid join condition: table-table joins require to join on the primary key of the right input table. Got LEFT_TABLE.ID1 = RIGHT_TABLE.F2."
}
},
{
"name": "Should fail on right non-key attribute for inner-join with qualifiers - feature flag enabled",
"properties": {
"ksql.joins.foreign.key.enable": true
},
"comments": [
"TODO: this test should be deleted once the feature is ungated and the feature flag is removed",
"as it duplicates another test above (without the feature flag)"
],
"statements": [
"CREATE TABLE left_table (id1 BIGINT PRIMARY KEY, f1 BIGINT) WITH (kafka_topic='left_topic', format='JSON');",
"CREATE TABLE right_table (id2 BIGINT PRIMARY KEY, f2 BIGINT) WITH (kafka_topic='right_topic', format='JSON');",
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table JOIN right_table ON left_table.id1 = right_table.f2;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Invalid join condition: table-table joins require to join on the primary key of the right input table. Got LEFT_TABLE.ID1 = RIGHT_TABLE.F2."
}
},
{
"name": "Should fail on right non-key attribute for inner-join with alias - feature flag enabled",
"properties": {
"ksql.joins.foreign.key.enable": true
},
"comments": [
"TODO: this test should be deleted once the feature is ungated and the feature flag is removed",
"as it duplicates another test above (without the feature flag)"
],
"statements": [
"CREATE TABLE left_table (id1 BIGINT PRIMARY KEY, f1 BIGINT) WITH (kafka_topic='left_topic', format='JSON');",
"CREATE TABLE right_table (id2 BIGINT PRIMARY KEY, f2 BIGINT) WITH (kafka_topic='right_topic', format='JSON');",
"CREATE TABLE output AS SELECT id1, id2, f1, f2 FROM left_table AS lt JOIN right_table AS rt ON lt.id1 = rt.f2;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Invalid join condition: table-table joins require to join on the primary key of the right input table. Got LT.ID1 = RT.F2."
}
},
{
"name": "Should fail on n-way join (fk outer) - feature flag enabled",
"properties": {
Expand Down
Loading

0 comments on commit b073bbc

Please sign in to comment.