-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23564][SQL] Add isNotNull check for left anti and outer joins #20717
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -341,6 +341,26 @@ case class Join( | |
case UsingJoin(_, _) => false | ||
case _ => resolvedExceptNatural | ||
} | ||
|
||
override protected def constructAllConstraints: Set[Expression] = { | ||
// additional constraints which are not enforced on the result of join operations, but can be | ||
// enforced either on the left or the right side | ||
val additionalConstraints = joinType match { | ||
case LeftAnti | LeftOuter if condition.isDefined => | ||
splitConjunctivePredicates(condition.get).flatMap(inferIsNotNullConstraints).filter( | ||
_.references.subsetOf(right.outputSet)) | ||
case RightOuter if condition.isDefined => | ||
splitConjunctivePredicates(condition.get).flatMap(inferIsNotNullConstraints).filter( | ||
_.references.subsetOf(left.outputSet)) | ||
case _ => Seq.empty[Expression] | ||
} | ||
super.constructAllConstraints ++ additionalConstraints | ||
} | ||
|
||
override lazy val constraints: ExpressionSet = ExpressionSet( | ||
super.constructAllConstraints.filter { c => | ||
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add more test cases (or statements) for this code path? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks, I added some statements to the |
||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -204,4 +204,40 @@ class InferFiltersFromConstraintsSuite extends PlanTest { | |
val optimized = Optimize.execute(originalQuery) | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("SPARK-23564: left anti join should filter out null join keys on right side") { | ||
val x = testRelation.subquery('x) | ||
val y = testRelation.subquery('y) | ||
val condition = Some("x.a".attr === "y.a".attr) | ||
val originalQuery = x.join(y, LeftAnti, condition).analyze | ||
val left = x | ||
val right = y.where(IsNotNull('a)) | ||
val correctAnswer = left.join(right, LeftAnti, condition).analyze | ||
val optimized = Optimize.execute(originalQuery) | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("SPARK-23564: left outer join should filter out null join keys on right side") { | ||
val x = testRelation.subquery('x) | ||
val y = testRelation.subquery('y) | ||
val condition = Some("x.a".attr === "y.a".attr) | ||
val originalQuery = x.join(y, LeftOuter, condition).analyze | ||
val left = x | ||
val right = y.where(IsNotNull('a)) | ||
val correctAnswer = left.join(right, LeftOuter, condition).analyze | ||
val optimized = Optimize.execute(originalQuery) | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("SPARK-23564: right outer join should filter out null join keys on left side") { | ||
val x = testRelation.subquery('x) | ||
val y = testRelation.subquery('y) | ||
val condition = Some("x.a".attr === "y.a".attr) | ||
val originalQuery = x.join(y, RightOuter, condition).analyze | ||
val left = x.where(IsNotNull('a)) | ||
val right = y | ||
val correctAnswer = left.join(right, RightOuter, condition).analyze | ||
val optimized = Optimize.execute(originalQuery) | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is a simple repetition of the previous private def testConstraints(
x: LogicalPlan, y: LogicalPlan, left: LogicalPlan, right: LogicalPlan, joinType: JoinType) = {
val condition = Some("x.a".attr === "y.a".attr)
val originalQuery = x.join(y, joinType, condition).analyze
val correctAnswer = left.join(right, joinType, condition).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
test("SPARK-23405: left-semi equal-join should filter out null join keys on both sides") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
testConstraints(x, y, x.where(IsNotNull('a)), y.where(IsNotNull('a)), LeftSemi)
}
test("SPARK-23564: left anti join should filter out null join keys on right side") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
testConstraints(x, y, x, y.where(IsNotNull('a)), LeftAnti)
}
test("SPARK-23564: left outer join should filter out null join keys on right side") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
testConstraints(x, y, x, y.where(IsNotNull('a)), LeftOuter)
}
test("SPARK-23564: right outer join should filter out null join keys on left side") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
testConstraints(x, y, x.where(IsNotNull('a)), y, RightOuter)
} |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let keep this
private
because this is used only in this class.