Skip to content

Commit

Permalink
[SPARK-14004][SQL] NamedExpressions should have at most one qualifier
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This is a more aggressive version of PR #11820, which not only fixes the original problem, but also does the following updates to enforce the at-most-one-qualifier constraint:

- Renames `NamedExpression.qualifiers` to `NamedExpression.qualifier`
- Uses `Option[String]` rather than `Seq[String]` for `NamedExpression.qualifier`

Quoted PR description of #11820 here:

> Current implementations of `AttributeReference.sql` and `Alias.sql` joins all available qualifiers, which is logically wrong. But this implementation mistake doesn't cause any real SQL generation bugs though, since there is always at most one qualifier for any given `AttributeReference` or `Alias`.

## How was this patch tested?

Existing tests should be enough.

Author: Cheng Lian <[email protected]>

Closes #11822 from liancheng/spark-14004-aggressive.
  • Loading branch information
liancheng authored and yhuai committed Mar 21, 2016
1 parent 43ebf7a commit 5d8de16
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ class Analyzer(
} transformUp {
case other => other transformExpressions {
case a: Attribute =>
attributeRewrites.get(a).getOrElse(a).withQualifiers(a.qualifiers)
attributeRewrites.get(a).getOrElse(a).withQualifier(a.qualifier)
}
}
newRight
Expand Down Expand Up @@ -1467,8 +1467,7 @@ object CleanupAliases extends Rule[LogicalPlan] {

def trimNonTopLevelAliases(e: Expression): Expression = e match {
case a: Alias =>
Alias(trimAliases(a.child), a.name)(
a.exprId, a.qualifiers, a.explicitMetadata, a.isGenerated)
a.withNewChildren(trimAliases(a.child) :: Nil)
case other => trimAliases(other)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
if (table == null) {
throw new AnalysisException("Table not found: " + tableName)
}
val tableWithQualifiers = SubqueryAlias(tableName, table)
val qualifiedTable = SubqueryAlias(tableName, table)

// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
// properly qualified with this alias.
alias
.map(a => SubqueryAlias(a, tableWithQualifiers))
.getOrElse(tableWithQualifiers)
.map(a => SubqueryAlias(a, qualifiedTable))
.getOrElse(qualifiedTable)
}

override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
Expand Down Expand Up @@ -149,11 +149,11 @@ trait OverrideCatalog extends Catalog {
getOverriddenTable(tableIdent) match {
case Some(table) =>
val tableName = getTableName(tableIdent)
val tableWithQualifiers = SubqueryAlias(tableName, table)
val qualifiedTable = SubqueryAlias(tableName, table)

// If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes
// are properly qualified with this alias.
alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)

case None => super.lookupRelation(tableIdent, alias)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute with Un
override def exprId: ExprId = throw new UnresolvedException(this, "exprId")
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
override def qualifiers: Seq[String] = throw new UnresolvedException(this, "qualifiers")
override def qualifier: Option[String] = throw new UnresolvedException(this, "qualifier")
override lazy val resolved = false

override def newInstance(): UnresolvedAttribute = this
override def withNullability(newNullability: Boolean): UnresolvedAttribute = this
override def withQualifiers(newQualifiers: Seq[String]): UnresolvedAttribute = this
override def withQualifier(newQualifier: Option[String]): UnresolvedAttribute = this
override def withName(newName: String): UnresolvedAttribute = UnresolvedAttribute.quoted(newName)

override def toString: String = s"'$name"
Expand Down Expand Up @@ -158,7 +158,7 @@ abstract class Star extends LeafExpression with NamedExpression {
override def exprId: ExprId = throw new UnresolvedException(this, "exprId")
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
override def qualifiers: Seq[String] = throw new UnresolvedException(this, "qualifiers")
override def qualifier: Option[String] = throw new UnresolvedException(this, "qualifier")
override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute")
override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance")
override lazy val resolved = false
Expand Down Expand Up @@ -188,7 +188,7 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
case None => input.output
// If there is a table, pick out attributes that are part of this table.
case Some(t) => if (t.size == 1) {
input.output.filter(_.qualifiers.exists(resolver(_, t.head)))
input.output.filter(_.qualifier.exists(resolver(_, t.head)))
} else {
List()
}
Expand Down Expand Up @@ -243,7 +243,7 @@ case class MultiAlias(child: Expression, names: Seq[String])

override def nullable: Boolean = throw new UnresolvedException(this, "nullable")

override def qualifiers: Seq[String] = throw new UnresolvedException(this, "qualifiers")
override def qualifier: Option[String] = throw new UnresolvedException(this, "qualifier")

override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute")

Expand Down Expand Up @@ -298,7 +298,7 @@ case class UnresolvedAlias(child: Expression, aliasName: Option[String] = None)
extends UnaryExpression with NamedExpression with Unevaluable {

override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute")
override def qualifiers: Seq[String] = throw new UnresolvedException(this, "qualifiers")
override def qualifier: Option[String] = throw new UnresolvedException(this, "qualifier")
override def exprId: ExprId = throw new UnresolvedException(this, "exprId")
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
} else {
tempTables.get(name.table)
}
val tableWithQualifiers = SubqueryAlias(name.table, relation)
val qualifiedTable = SubqueryAlias(name.table, relation)
// If an alias was specified by the lookup, wrap the plan in a subquery so that
// attributes are properly qualified with this alias.
alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ trait NamedExpression extends Expression {
* multiple qualifiers, it is possible that there are other possible way to refer to this
* attribute.
*/
def qualifiedName: String = (qualifiers.headOption.toSeq :+ name).mkString(".")
def qualifiedName: String = (qualifier.toSeq :+ name).mkString(".")

/**
* All possible qualifiers for the expression.
* Optional qualifier for the expression.
*
* For now, since we do not allow using original table name to qualify a column name once the
* table is aliased, this can only be:
Expand All @@ -73,7 +73,7 @@ trait NamedExpression extends Expression {
* e.g. top level attributes aliased in the SELECT clause, or column from a LocalRelation.
* 2. Single element: either the table name or the alias name of the table.
*/
def qualifiers: Seq[String]
def qualifier: Option[String]

def toAttribute: Attribute

Expand Down Expand Up @@ -102,7 +102,7 @@ abstract class Attribute extends LeafExpression with NamedExpression {
override def references: AttributeSet = AttributeSet(this)

def withNullability(newNullability: Boolean): Attribute
def withQualifiers(newQualifiers: Seq[String]): Attribute
def withQualifier(newQualifier: Option[String]): Attribute
def withName(newName: String): Attribute

override def toAttribute: Attribute = this
Expand All @@ -122,15 +122,15 @@ abstract class Attribute extends LeafExpression with NamedExpression {
* @param name The name to be associated with the result of computing [[child]].
* @param exprId A globally unique id used to check if an [[AttributeReference]] refers to this
* alias. Auto-assigned if left blank.
* @param qualifiers A list of strings that can be used to referred to this attribute in a fully
* @param qualifier An optional string that can be used to referred to this attribute in a fully
* qualified way. Consider the examples tableName.name, subQueryAlias.name.
* tableName and subQueryAlias are possible qualifiers.
* @param explicitMetadata Explicit metadata associated with this alias that overwrites child's.
* @param isGenerated A flag to indicate if this alias is generated by Catalyst
*/
case class Alias(child: Expression, name: String)(
val exprId: ExprId = NamedExpression.newExprId,
val qualifiers: Seq[String] = Nil,
val qualifier: Option[String] = None,
val explicitMetadata: Option[Metadata] = None,
override val isGenerated: java.lang.Boolean = false)
extends UnaryExpression with NamedExpression {
Expand Down Expand Up @@ -158,12 +158,12 @@ case class Alias(child: Expression, name: String)(

def newInstance(): NamedExpression =
Alias(child, name)(
qualifiers = qualifiers, explicitMetadata = explicitMetadata, isGenerated = isGenerated)
qualifier = qualifier, explicitMetadata = explicitMetadata, isGenerated = isGenerated)

override def toAttribute: Attribute = {
if (resolved) {
AttributeReference(name, child.dataType, child.nullable, metadata)(
exprId, qualifiers, isGenerated)
exprId, qualifier, isGenerated)
} else {
UnresolvedAttribute(name)
}
Expand All @@ -172,19 +172,19 @@ case class Alias(child: Expression, name: String)(
override def toString: String = s"$child AS $name#${exprId.id}$typeSuffix"

override protected final def otherCopyArgs: Seq[AnyRef] = {
exprId :: qualifiers :: explicitMetadata :: isGenerated :: Nil
exprId :: qualifier :: explicitMetadata :: isGenerated :: Nil
}

override def equals(other: Any): Boolean = other match {
case a: Alias =>
name == a.name && exprId == a.exprId && child == a.child && qualifiers == a.qualifiers &&
name == a.name && exprId == a.exprId && child == a.child && qualifier == a.qualifier &&
explicitMetadata == a.explicitMetadata
case _ => false
}

override def sql: String = {
val qualifiersString = if (qualifiers.isEmpty) "" else qualifiers.head + "."
s"${child.sql} AS $qualifiersString${quoteIdentifier(name)}"
val qualifierPrefix = qualifier.map(_ + ".").getOrElse("")
s"${child.sql} AS $qualifierPrefix${quoteIdentifier(name)}"
}
}

Expand All @@ -197,9 +197,9 @@ case class Alias(child: Expression, name: String)(
* @param metadata The metadata of this attribute.
* @param exprId A globally unique id used to check if different AttributeReferences refer to the
* same attribute.
* @param qualifiers A list of strings that can be used to referred to this attribute in a fully
* qualified way. Consider the examples tableName.name, subQueryAlias.name.
* tableName and subQueryAlias are possible qualifiers.
* @param qualifier An optional string that can be used to referred to this attribute in a fully
* qualified way. Consider the examples tableName.name, subQueryAlias.name.
* tableName and subQueryAlias are possible qualifiers.
* @param isGenerated A flag to indicate if this reference is generated by Catalyst
*/
case class AttributeReference(
Expand All @@ -208,7 +208,7 @@ case class AttributeReference(
nullable: Boolean = true,
override val metadata: Metadata = Metadata.empty)(
val exprId: ExprId = NamedExpression.newExprId,
val qualifiers: Seq[String] = Nil,
val qualifier: Option[String] = None,
override val isGenerated: java.lang.Boolean = false)
extends Attribute with Unevaluable {

Expand All @@ -220,7 +220,7 @@ case class AttributeReference(
override def equals(other: Any): Boolean = other match {
case ar: AttributeReference =>
name == ar.name && dataType == ar.dataType && nullable == ar.nullable &&
metadata == ar.metadata && exprId == ar.exprId && qualifiers == ar.qualifiers
metadata == ar.metadata && exprId == ar.exprId && qualifier == ar.qualifier
case _ => false
}

Expand All @@ -241,13 +241,13 @@ case class AttributeReference(
h = h * 37 + nullable.hashCode()
h = h * 37 + metadata.hashCode()
h = h * 37 + exprId.hashCode()
h = h * 37 + qualifiers.hashCode()
h = h * 37 + qualifier.hashCode()
h
}

override def newInstance(): AttributeReference =
AttributeReference(name, dataType, nullable, metadata)(
qualifiers = qualifiers, isGenerated = isGenerated)
qualifier = qualifier, isGenerated = isGenerated)

/**
* Returns a copy of this [[AttributeReference]] with changed nullability.
Expand All @@ -256,39 +256,39 @@ case class AttributeReference(
if (nullable == newNullability) {
this
} else {
AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifiers, isGenerated)
AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifier, isGenerated)
}
}

override def withName(newName: String): AttributeReference = {
if (name == newName) {
this
} else {
AttributeReference(newName, dataType, nullable, metadata)(exprId, qualifiers, isGenerated)
AttributeReference(newName, dataType, nullable, metadata)(exprId, qualifier, isGenerated)
}
}

/**
* Returns a copy of this [[AttributeReference]] with new qualifiers.
* Returns a copy of this [[AttributeReference]] with new qualifier.
*/
override def withQualifiers(newQualifiers: Seq[String]): AttributeReference = {
if (newQualifiers.toSet == qualifiers.toSet) {
override def withQualifier(newQualifier: Option[String]): AttributeReference = {
if (newQualifier == qualifier) {
this
} else {
AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifiers, isGenerated)
AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifier, isGenerated)
}
}

def withExprId(newExprId: ExprId): AttributeReference = {
if (exprId == newExprId) {
this
} else {
AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifiers, isGenerated)
AttributeReference(name, dataType, nullable, metadata)(newExprId, qualifier, isGenerated)
}
}

override protected final def otherCopyArgs: Seq[AnyRef] = {
exprId :: qualifiers :: isGenerated :: Nil
exprId :: qualifier :: isGenerated :: Nil
}

override def toString: String = s"$name#${exprId.id}$typeSuffix"
Expand All @@ -298,8 +298,8 @@ case class AttributeReference(
override def simpleString: String = s"$name#${exprId.id}: ${dataType.simpleString}"

override def sql: String = {
val qualifiersString = if (qualifiers.isEmpty) "" else qualifiers.head + "."
s"$qualifiersString${quoteIdentifier(name)}"
val qualifierPrefix = qualifier.map(_ + ".").getOrElse("")
s"$qualifierPrefix${quoteIdentifier(name)}"
}
}

Expand All @@ -324,10 +324,10 @@ case class PrettyAttribute(
override def withNullability(newNullability: Boolean): Attribute =
throw new UnsupportedOperationException
override def newInstance(): Attribute = throw new UnsupportedOperationException
override def withQualifiers(newQualifiers: Seq[String]): Attribute =
override def withQualifier(newQualifier: Option[String]): Attribute =
throw new UnsupportedOperationException
override def withName(newName: String): Attribute = throw new UnsupportedOperationException
override def qualifiers: Seq[String] = throw new UnsupportedOperationException
override def qualifier: Option[String] = throw new UnsupportedOperationException
override def exprId: ExprId = throw new UnsupportedOperationException
override def nullable: Boolean = throw new UnsupportedOperationException
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ object PhysicalOperation extends PredicateHelper {
expr.transform {
case a @ Alias(ref: AttributeReference, name) =>
aliases.get(ref)
.map(Alias(_, name)(a.exprId, a.qualifiers, isGenerated = a.isGenerated))
.map(Alias(_, name)(a.exprId, a.qualifier, isGenerated = a.isGenerated))
.getOrElse(a)

case a: AttributeReference =>
aliases.get(a)
.map(Alias(_, a.name)(a.exprId, a.qualifiers, isGenerated = a.isGenerated)).getOrElse(a)
.map(Alias(_, a.name)(a.exprId, a.qualifier, isGenerated = a.isGenerated)).getOrElse(a)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
// As the root of the expression, Alias will always take an arbitrary exprId, we need
// to erase that for equality testing.
val cleanedExprId =
Alias(a.child, a.name)(ExprId(-1), a.qualifiers, isGenerated = a.isGenerated)
Alias(a.child, a.name)(ExprId(-1), a.qualifier, isGenerated = a.isGenerated)
BindReferences.bindReference(cleanedExprId, allAttributes, allowFailures = true)
case other =>
BindReferences.bindReference(other, allAttributes, allowFailures = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
resolver: Resolver,
attribute: Attribute): Option[(Attribute, List[String])] = {
assert(nameParts.length > 1)
if (attribute.qualifiers.exists(resolver(_, nameParts.head))) {
if (attribute.qualifier.exists(resolver(_, nameParts.head))) {
// At least one qualifier matches. See if remaining parts match.
val remainingParts = nameParts.tail
resolveAsColumn(remainingParts, resolver, attribute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ case class Generate(
def output: Seq[Attribute] = {
val qualified = qualifier.map(q =>
// prepend the new qualifier to the existed one
generatorOutput.map(a => a.withQualifiers(q +: a.qualifiers))
generatorOutput.map(a => a.withQualifier(Some(q)))
).getOrElse(generatorOutput)

if (join) child.output ++ qualified else qualified
Expand Down Expand Up @@ -615,7 +615,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo

case class SubqueryAlias(alias: String, child: LogicalPlan) extends UnaryNode {

override def output: Seq[Attribute] = child.output.map(_.withQualifiers(alias :: Nil))
override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class SubexpressionEliminationSuite extends SparkFunSuite {
val a: AttributeReference = AttributeReference("name", IntegerType)()
val b1 = a.withName("name2").withExprId(id)
val b2 = a.withExprId(id)
val b3 = a.withQualifiers("qualifierName" :: Nil)
val b3 = a.withQualifier(Some("qualifierName"))

assert(b1 != b2)
assert(a != b1)
Expand Down
Loading

0 comments on commit 5d8de16

Please sign in to comment.