Skip to content

Commit

Permalink
Revert "[SPARK-7320] [SQL] Add Cube / Rollup for dataframe"
Browse files Browse the repository at this point in the history
This reverts commit 10698e1.
  • Loading branch information
pwendell committed May 20, 2015
1 parent 829f1d9 commit 6338c40
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 230 deletions.
104 changes: 2 additions & 102 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -685,53 +685,7 @@ class DataFrame private[sql](
* @since 1.3.0
*/
@scala.annotation.varargs
def groupBy(cols: Column*): GroupedData = {
GroupedData(this, cols.map(_.expr), GroupedData.GroupByType)
}

/**
* Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns,
* so we can run aggregation on them.
* See [[GroupedData]] for all the available aggregate functions.
*
* {{{
* // Compute the average for all numeric columns rolluped by department and group.
* df.rollup($"department", $"group").avg()
*
* // Compute the max age and average salary, rolluped by department and gender.
* df.rollup($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
* ))
* }}}
* @group dfops
* @since 1.4.0
*/
@scala.annotation.varargs
def rollup(cols: Column*): GroupedData = {
GroupedData(this, cols.map(_.expr), GroupedData.RollupType)
}

/**
* Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns,
* so we can run aggregation on them.
* See [[GroupedData]] for all the available aggregate functions.
*
* {{{
* // Compute the average for all numeric columns cubed by department and group.
* df.cube($"department", $"group").avg()
*
* // Compute the max age and average salary, cubed by department and gender.
* df.cube($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
* ))
* }}}
* @group dfops
* @since 1.4.0
*/
@scala.annotation.varargs
def cube(cols: Column*): GroupedData = GroupedData(this, cols.map(_.expr), GroupedData.CubeType)
def groupBy(cols: Column*): GroupedData = new GroupedData(this, cols.map(_.expr))

/**
* Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
Expand All @@ -756,61 +710,7 @@ class DataFrame private[sql](
@scala.annotation.varargs
def groupBy(col1: String, cols: String*): GroupedData = {
val colNames: Seq[String] = col1 +: cols
GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.GroupByType)
}

/**
* Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns,
* so we can run aggregation on them.
* See [[GroupedData]] for all the available aggregate functions.
*
* This is a variant of rollup that can only group by existing columns using column names
* (i.e. cannot construct expressions).
*
* {{{
* // Compute the average for all numeric columns rolluped by department and group.
* df.rollup("department", "group").avg()
*
* // Compute the max age and average salary, rolluped by department and gender.
* df.rollup($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
* ))
* }}}
* @group dfops
* @since 1.4.0
*/
@scala.annotation.varargs
def rollup(col1: String, cols: String*): GroupedData = {
val colNames: Seq[String] = col1 +: cols
GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.RollupType)
}

/**
* Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns,
* so we can run aggregation on them.
* See [[GroupedData]] for all the available aggregate functions.
*
* This is a variant of cube that can only group by existing columns using column names
* (i.e. cannot construct expressions).
*
* {{{
* // Compute the average for all numeric columns cubed by department and group.
* df.cube("department", "group").avg()
*
* // Compute the max age and average salary, cubed by department and gender.
* df.cube($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
* ))
* }}}
* @group dfops
* @since 1.4.0
*/
@scala.annotation.varargs
def cube(col1: String, cols: String*): GroupedData = {
val colNames: Seq[String] = col1 +: cols
GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.CubeType)
new GroupedData(this, colNames.map(colName => resolve(colName)))
}

/**
Expand Down
92 changes: 26 additions & 66 deletions sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,9 @@ import scala.language.implicitConversions
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Rollup, Cube, Aggregate}
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
import org.apache.spark.sql.types.NumericType

/**
* Companion object for GroupedData
*/
private[sql] object GroupedData {
def apply(
df: DataFrame,
groupingExprs: Seq[Expression],
groupType: GroupType): GroupedData = {
new GroupedData(df, groupingExprs, groupType: GroupType)
}

/**
* The Grouping Type
*/
trait GroupType

/**
* To indicate it's the GroupBy
*/
object GroupByType extends GroupType

/**
* To indicate it's the CUBE
*/
object CubeType extends GroupType

/**
* To indicate it's the ROLLUP
*/
object RollupType extends GroupType
}

/**
* :: Experimental ::
Expand All @@ -65,37 +34,19 @@ private[sql] object GroupedData {
* @since 1.3.0
*/
@Experimental
class GroupedData protected[sql](
df: DataFrame,
groupingExprs: Seq[Expression],
private val groupType: GroupedData.GroupType) {
class GroupedData protected[sql](df: DataFrame, groupingExprs: Seq[Expression]) {

private[this] def toDF(aggExprs: Seq[NamedExpression]): DataFrame = {
val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) {
val retainedExprs = groupingExprs.map {
case expr: NamedExpression => expr
case expr: Expression => Alias(expr, expr.prettyString)()
}
retainedExprs ++ aggExprs
} else {
aggExprs
}

groupType match {
case GroupedData.GroupByType =>
DataFrame(
df.sqlContext, Aggregate(groupingExprs, aggregates, df.logicalPlan))
case GroupedData.RollupType =>
DataFrame(
df.sqlContext, Rollup(groupingExprs, df.logicalPlan, aggregates))
case GroupedData.CubeType =>
DataFrame(
df.sqlContext, Cube(groupingExprs, df.logicalPlan, aggregates))
private[sql] implicit def toDF(aggExprs: Seq[NamedExpression]): DataFrame = {
val namedGroupingExprs = groupingExprs.map {
case expr: NamedExpression => expr
case expr: Expression => Alias(expr, expr.prettyString)()
}
DataFrame(
df.sqlContext, Aggregate(groupingExprs, namedGroupingExprs ++ aggExprs, df.logicalPlan))
}

private[this] def aggregateNumericColumns(colNames: String*)(f: Expression => Expression)
: DataFrame = {
: Seq[NamedExpression] = {

val columnExprs = if (colNames.isEmpty) {
// No columns specified. Use all numeric columns.
Expand All @@ -112,10 +63,10 @@ class GroupedData protected[sql](
namedExpr
}
}
toDF(columnExprs.map { c =>
columnExprs.map { c =>
val a = f(c)
Alias(a, a.prettyString)()
})
}
}

private[this] def strToExpr(expr: String): (Expression => Expression) = {
Expand Down Expand Up @@ -168,10 +119,10 @@ class GroupedData protected[sql](
* @since 1.3.0
*/
def agg(exprs: Map[String, String]): DataFrame = {
toDF(exprs.map { case (colName, expr) =>
exprs.map { case (colName, expr) =>
val a = strToExpr(expr)(df(colName).expr)
Alias(a, a.prettyString)()
}.toSeq)
}.toSeq
}

/**
Expand Down Expand Up @@ -224,10 +175,19 @@ class GroupedData protected[sql](
*/
@scala.annotation.varargs
def agg(expr: Column, exprs: Column*): DataFrame = {
toDF((expr +: exprs).map(_.expr).map {
val aggExprs = (expr +: exprs).map(_.expr).map {
case expr: NamedExpression => expr
case expr: Expression => Alias(expr, expr.prettyString)()
})
}
if (df.sqlContext.conf.dataFrameRetainGroupColumns) {
val retainedExprs = groupingExprs.map {
case expr: NamedExpression => expr
case expr: Expression => Alias(expr, expr.prettyString)()
}
DataFrame(df.sqlContext, Aggregate(groupingExprs, retainedExprs ++ aggExprs, df.logicalPlan))
} else {
DataFrame(df.sqlContext, Aggregate(groupingExprs, aggExprs, df.logicalPlan))
}
}

/**
Expand All @@ -236,7 +196,7 @@ class GroupedData protected[sql](
*
* @since 1.3.0
*/
def count(): DataFrame = toDF(Seq(Alias(Count(Literal(1)), "count")()))
def count(): DataFrame = Seq(Alias(Count(Literal(1)), "count")())

/**
* Compute the average value for each numeric columns for each group. This is an alias for `avg`.
Expand Down Expand Up @@ -296,5 +256,5 @@ class GroupedData protected[sql](
@scala.annotation.varargs
def sum(colNames: String*): DataFrame = {
aggregateNumericColumns(colNames:_*)(Sum)
}
}
}

This file was deleted.

0 comments on commit 6338c40

Please sign in to comment.