Skip to content

Commit

Permalink
[SPARK-7088] Fix analysis for 3rd party logical plan.
Browse files Browse the repository at this point in the history
ResolveReferences analysis rule now does not throw when
it cannot resolve references in a self-join.
  • Loading branch information
smola committed Jun 16, 2015
1 parent 4c5889e commit c8c382a
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ class Analyzer(
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j")

val (oldRelation, newRelation) = right.collect {
right.collect {
// Handle base relations that might appear more than once.
case oldVersion: MultiInstanceRelation
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
Expand All @@ -315,25 +315,20 @@ class Analyzer(
if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
.nonEmpty =>
(oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
}.headOption.getOrElse { // Only handle first case, others will be fixed on the next pass.
sys.error(
s"""
|Failure when resolving conflicting references in Join:
|$plan
|
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
""".stripMargin)
}

val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
val newRight = right transformUp {
case r if r == oldRelation => newRelation
} transformUp {
case other => other transformExpressions {
case a: Attribute => attributeRewrites.get(a).getOrElse(a)
}
}.headOption match {
// Only handle first case, others will be fixed on the next pass.
case None => j
case Some((oldRelation, newRelation)) =>
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
val newRight = right transformUp {
case r if r == oldRelation => newRelation
} transformUp {
case other => other transformExpressions {
case a: Attribute => attributeRewrites.get(a).getOrElse(a)
}
}
j.copy(right = newRight)
}
j.copy(right = newRight)

case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ trait CheckAnalysis {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {

case operator: LogicalPlan =>
operator transformExpressionsUp {
case a: Attribute if !a.resolved =>
Expand Down Expand Up @@ -136,6 +137,17 @@ trait CheckAnalysis {

case _ => // Analysis successful!
}

// Special handling for cases when self-join introduce duplicate expression ids.
case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
failAnalysis(s"""
|Failure when resolving conflicting references in Join:
|$plan
|
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
""".stripMargin)

}
extendedCheckRules.foreach(_(plan))
}
Expand Down

0 comments on commit c8c382a

Please sign in to comment.