Skip to content

Commit

Permalink
[SPARK-3688][SQL]LogicalPlan can't resolve column correctlly
Browse files Browse the repository at this point in the history
This PR fixed the resolving problem described in https://issues.apache.org/jira/browse/SPARK-3688
```
CREATE TABLE t1(x INT);
CREATE TABLE t2(a STRUCT<x: INT>, k INT);
SELECT a.x FROM t1 a JOIN t2 b ON a.x = b.k;
```

Author: tianyi <[email protected]>

Closes apache#4524 from tianyi/SPARK-3688 and squashes the following commits:

237a256 [tianyi] resolve a name with table.column pattern first.
  • Loading branch information
tianyi authored and marmbrus committed Feb 11, 2015
1 parent a60d2b7 commit 44b2311
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,29 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
def resolve(name: String, resolver: Resolver): Option[NamedExpression] =
resolve(name, output, resolver)

def resolveAsTableColumn(
nameParts: Array[String],
resolver: Resolver,
attribute: Attribute): List[(Attribute, List[String])] = {
if (attribute.qualifiers.find(resolver(_, nameParts.head)).nonEmpty && nameParts.size > 1) {
val remainingParts = nameParts.drop(1)
resolveAsColumn(remainingParts, resolver, attribute)
} else {
Nil
}
}

def resolveAsColumn(
nameParts: Array[String],
resolver: Resolver,
attribute: Attribute): List[(Attribute, List[String])] = {
if (resolver(attribute.name, nameParts.head)) {
(attribute.withName(nameParts.head), nameParts.tail.toList) :: Nil
} else {
Nil
}
}

/** Performs attribute resolution given a name and a sequence of possible attributes. */
protected def resolve(
name: String,
Expand All @@ -136,24 +159,15 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {

val parts = name.split("\\.")

// Collect all attributes that are output by this nodes children where either the first part
// matches the name or where the first part matches the scope and the second part matches the
// name. Return these matches along with any remaining parts, which represent dotted access to
// struct fields.
val options = input.flatMap { option =>
// If the first part of the desired name matches a qualifier for this possible match, drop it.
val remainingParts =
if (option.qualifiers.find(resolver(_, parts.head)).nonEmpty && parts.size > 1) {
parts.drop(1)
} else {
parts
}

if (resolver(option.name, remainingParts.head)) {
// Preserve the case of the user's attribute reference.
(option.withName(remainingParts.head), remainingParts.tail.toList) :: Nil
} else {
Nil
// We will try to resolve this name as `table.column` pattern first.
var options = input.flatMap { option =>
resolveAsTableColumn(parts, resolver, option)
}

// If none of attributes match `table.column` pattern, we try to resolve it as a column.
if(options.isEmpty) {
options = input.flatMap { option =>
resolveAsColumn(parts, resolver, option)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ class HiveResolutionSuite extends HiveComparisonTest {
assert(sql("SELECT nestedArray[0].a FROM nestedRepeatedTest").collect().head(0) === 1)
}

createQueryTest("test ambiguousReferences resolved as hive",
"""
|CREATE TABLE t1(x INT);
|CREATE TABLE t2(a STRUCT<x: INT>, k INT);
|INSERT OVERWRITE TABLE t1 SELECT 1 FROM src LIMIT 1;
|INSERT OVERWRITE TABLE t2 SELECT named_struct("x",1),1 FROM src LIMIT 1;
|SELECT a.x FROM t1 a JOIN t2 b ON a.x = b.k;
""".stripMargin)

/**
* Negative examples. Currently only left here for documentation purposes.
* TODO(marmbrus): Test that catalyst fails on these queries.
Expand Down

0 comments on commit 44b2311

Please sign in to comment.