Skip to content

Commit

Permalink
[SPARK-5383][SQL] Support alias for udtfs
Browse files Browse the repository at this point in the history
Add support for alias of udtfs, such as
```
select stack(2, key, value, key, value) as (a, b) from src limit 5;

select a, b from (select stack(2, key, value, key, value) as (a, b) from src) t limit 5

```

Author: wangfei <[email protected]>
Author: scwf <[email protected]>
Author: Fei Wang <[email protected]>

Closes #4186 from scwf/multi-alias-names and squashes the following commits:

c35e922 [wangfei] fix conflicts
adc8311 [wangfei] minor format fix
2783aed [wangfei] convert it to a Generate instead of leaving it inside of a Project clause
a87668a [wangfei] minor improvement
b25d9b3 [wangfei] resolve conflicts
d38f041 [wangfei] style fix
8cfcebf [wangfei] minor improvement
12a239e [wangfei] fix test case
050177f [wangfei] added extendedCheckRules
3d69329 [wangfei] added CheckMultiAlias to analyzer
324150d [wangfei] added multi alias node
74f5a81 [Fei Wang] imports order fix
5bc3f59 [scwf] style fix
3daec28 [scwf] support alias for udfs with multi output columns
  • Loading branch information
scwf authored and marmbrus committed Feb 3, 2015
1 parent ca7a6cd commit 5adbb39
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ class Analyzer(catalog: Catalog,
typeCoercionRules ++
extendedRules : _*),
Batch("Check Analysis", Once,
CheckResolution,
CheckAggregation),
CheckResolution ::
CheckAggregation ::
Nil: _*),
Batch("AnalysisOperators", fixedPoint,
EliminateAnalysisOperators)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,44 @@ case class UnresolvedStar(table: Option[String]) extends Star {
override def toString = table.map(_ + ".").getOrElse("") + "*"
}

/**
* Used to assign new names to Generator's output, such as hive udtf.
* For example the SQL expression "stack(2, key, value, key, value) as (a, b)" could be represented
* as follows:
* MultiAlias(stack_function, Seq(a, b))
* @param child the computation being performed
* @param names the names to be associated with each output of computing [[child]].
*/
case class MultiAlias(child: Expression, names: Seq[String])
extends Attribute with trees.UnaryNode[Expression] {

override def name = throw new UnresolvedException(this, "name")

override def exprId = throw new UnresolvedException(this, "exprId")

override def dataType = throw new UnresolvedException(this, "dataType")

override def nullable = throw new UnresolvedException(this, "nullable")

override def qualifiers = throw new UnresolvedException(this, "qualifiers")

override lazy val resolved = false

override def newInstance = this

override def withNullability(newNullability: Boolean) = this

override def withQualifiers(newQualifiers: Seq[String]) = this

override def withName(newName: String) = this

override def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

override def toString: String = s"$child AS $names"

}

/**
* Represents all the resolved input attributes to a given relational operator. This is used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ abstract class Attribute extends NamedExpression {
/**
* Used to assign a new name to a computation.
* For example the SQL expression "1 + 1 AS a" could be represented as follows:
* Alias(Add(Literal(1), Literal(1), "a")()
* Alias(Add(Literal(1), Literal(1)), "a")()
*
* Note that exprId and qualifiers are in a separate parameter list because
* we only pattern match on child and name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.CreateTables ::
catalog.PreInsertionCasts ::
ExtractPythonUdfs ::
ResolveUdtfsAlias ::
Nil
}

Expand Down
16 changes: 12 additions & 4 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import java.sql.Date
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Context
Expand Down Expand Up @@ -968,14 +969,21 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
}

protected def selExprNodeToExpr(node: Node): Option[Expression] = node match {
case Token("TOK_SELEXPR",
e :: Nil) =>
case Token("TOK_SELEXPR", e :: Nil) =>
Some(nodeToExpr(e))

case Token("TOK_SELEXPR",
e :: Token(alias, Nil) :: Nil) =>
case Token("TOK_SELEXPR", e :: Token(alias, Nil) :: Nil) =>
Some(Alias(nodeToExpr(e), cleanIdentifier(alias))())

case Token("TOK_SELEXPR", e :: aliasChildren) =>
var aliasNames = ArrayBuffer[String]()
aliasChildren.foreach { _ match {
case Token(name, Nil) => aliasNames += cleanIdentifier(name)
case _ =>
}
}
Some(MultiAlias(nodeToExpr(e), aliasNames))

/* Hints are ignored */
case Token("TOK_HINTLIST", _) => None

Expand Down
19 changes: 18 additions & 1 deletion sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Generate, Project, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils.getContextOrSparkClassLoader
import org.apache.spark.sql.catalyst.analysis.MultiAlias
import org.apache.spark.sql.catalyst.errors.TreeNodeException

/* Implicit conversions */
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -321,6 +324,20 @@ private[hive] case class HiveGenericUdtf(
override def toString = s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
}

/**
* Resolve Udtfs Alias.
*/
private[spark] object ResolveUdtfsAlias extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan) = plan transform {
case p @ Project(projectList, _)
if projectList.exists(_.isInstanceOf[MultiAlias]) && projectList.size != 1 =>
throw new TreeNodeException(p, "only single Generator supported for SELECT clause")

case Project(Seq(MultiAlias(udtf @ HiveGenericUdtf(_, _, _), names)), child) =>
Generate(udtf.copy(aliasNames = names), join = false, outer = false, None, child)
}
}

private[hive] case class HiveUdafFunction(
funcWrapper: HiveFunctionWrapper,
exprs: Seq[Expression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,18 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
assert(sql("select key from src having key > 490").collect().size < 100)
}

test("SPARK-5383 alias for udfs with multi output columns") {
assert(
sql("select stack(2, key, value, key, value) as (a, b) from src limit 5")
.collect()
.size == 5)

assert(
sql("select a, b from (select stack(2, key, value, key, value) as (a, b) from src) t limit 5")
.collect()
.size == 5)
}

test("SPARK-5367: resolve star expression in udf") {
assert(sql("select concat(*) from src limit 5").collect().size == 5)
assert(sql("select array(*) from src limit 5").collect().size == 5)
Expand Down

0 comments on commit 5adbb39

Please sign in to comment.