Skip to content

Commit

Permalink
fix rank dese_rank lead lag
Browse files Browse the repository at this point in the history
  • Loading branch information
guowei2 committed Apr 3, 2015
1 parent a512f49 commit 40edde7
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ private object HiveContext {
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
}.toSeq.sorted.mkString("{", ",", "}")
case (null, _) => "NULL"
case (o, ArrayType(typ, _)) => toHiveStructString(o, typ)
case (d: Int, DateType) => new DateWritable(d).toString
case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString
case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,17 @@ case class WindowAggregate(

case class ComputedWindow(
unboundFunction: Expression,
windowFunctionInfo: WindowFunctionInfo,
pivotResult: Boolean,
windowSpec: WindowSpec,
boundedFunction: Expression,
computedAttribute: AttributeReference)

case class WindowFunctionInfo(
supportsWindow: Boolean,
pivotResult: Boolean,
impliesOrder: Boolean)

private[this] val computedWindows = windowExpressions.collect{

case Alias(expr @ WindowExpression(func, spec), _) =>
val wfi = func match {
case HiveGenericUdaf(_, wfi, _) =>
WindowFunctionInfo(wfi.isSupportsWindow, wfi.isPivotResult, wfi.isImpliesOrder)
case _ => WindowFunctionInfo(true, false, false)
case HiveGenericUdaf(_, wfi, _) => wfi.isPivotResult
case _ => false
}
ComputedWindow(
func,
Expand Down Expand Up @@ -138,7 +132,7 @@ case class WindowAggregate(
}
}.getOrElse {
val function = baseExpr.newInstance()
if (window.windowFunctionInfo.pivotResult) {
if (window.pivotResult) {
rows.foreach(function.update)
function.eval(EmptyRow).asInstanceOf[Seq[Any]].iterator
} else if (ifSortInOnePartition) {
Expand Down
15 changes: 13 additions & 2 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ private[spark] object ResolveWindowUdaf extends Rule[LogicalPlan] {
HiveGenericUdaf(wrapper, wfi, children ++ sortExpr.map(_.child))
case Sort(sortExpr, _, _) =>
HiveGenericUdaf(wrapper, wfi, children ++ sortExpr.map(_.child))
case _ => sys.error(s"udaf $name with impliesOrder need sort expressions")
case _ =>
sys.error(s"udaf $wrapper.functionClassName need sort expressions")
}
// if function computed with window is `HiveGenericUdf`, we need to check whether
// it has HiveGenericUadf one, such as lead, lag
Expand Down Expand Up @@ -248,7 +249,17 @@ private[hive] case class HiveGenericUdaf(
@transient
protected lazy val inspectors = children.map(toInspector)

def dataType: DataType = inspectorToDataType(objectInspector)
protected val pivotResult = windowFunctionInfo.isPivotResult

def dataType: DataType =
if (!pivotResult) inspectorToDataType(objectInspector)
else {
inspectorToDataType(objectInspector) match {
case ArrayType(dt, _) => dt
case _ => sys.error(s"error resolve the data type of udaf $funcWrapper.functionClassName")
}
}


def nullable: Boolean = true

Expand Down

0 comments on commit 40edde7

Please sign in to comment.