Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SQL][SPARK-7088] Fix analysis for 3rd party logical plan. #6853

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ class Analyzer(
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j")

val (oldRelation, newRelation) = right.collect {
right.collect {
// Handle base relations that might appear more than once.
case oldVersion: MultiInstanceRelation
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
Expand All @@ -308,25 +308,27 @@ class Analyzer(
if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
.nonEmpty =>
(oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
}.headOption.getOrElse { // Only handle first case, others will be fixed on the next pass.
sys.error(
s"""
|Failure when resolving conflicting references in Join:
|$plan
|
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
""".stripMargin)
}

val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
val newRight = right transformUp {
case r if r == oldRelation => newRelation
} transformUp {
case other => other transformExpressions {
case a: Attribute => attributeRewrites.get(a).getOrElse(a)
}
// Only handle first case, others will be fixed on the next pass.
.headOption match {
case None =>
/*
* No result implies that there is a logical plan node that produces new references
* that this rule cannot handle. When that is the case, there must be another rule
* that resolves these conflicts. Otherwise, the analysis will fail.
*/
j
case Some((oldRelation, newRelation)) =>
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
val newRight = right transformUp {
case r if r == oldRelation => newRelation
} transformUp {
case other => other transformExpressions {
case a: Attribute => attributeRewrites.get(a).getOrElse(a)
}
}
j.copy(right = newRight)
}
j.copy(right = newRight)

// When resolve `SortOrder`s in Sort based on child, don't report errors as
// we still have chance to resolve it based on grandchild
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ trait CheckAnalysis {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {

case operator: LogicalPlan =>
operator transformExpressionsUp {
case a: Attribute if !a.resolved =>
Expand Down Expand Up @@ -121,6 +122,17 @@ trait CheckAnalysis {

case _ => // Analysis successful!
}

// Special handling for cases when self-join introduce duplicate expression ids.
case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not reachable according to the compiler. @smola can you submit a fix for this? thanks.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NAVER - http://www.naver.com/

[email protected] 님께 보내신 메일 <Re: [spark] [SQL][SPARK-7088] Fix analysis for 3rd party logical plan. (#6853)> 이 다음과 같은 이유로 전송 실패했습니다.


받는 사람이 회원님의 메일을 수신차단 하였습니다.


val conflictingAttributes = left.outputSet.intersect(right.outputSet)
failAnalysis(
s"""
|Failure when resolving conflicting references in Join:
|$plan
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
|""".stripMargin)

}
extendedCheckRules.foreach(_(plan))
}
Expand Down