diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 45e8830b9ba76..5689816d022a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -19,29 +19,24 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.planning.SQLConf import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.catalyst.trees -abstract class LogicalPlan extends QueryPlan[LogicalPlan] with SQLConf { +abstract class LogicalPlan extends QueryPlan[LogicalPlan] { self: Product => // TODO: make a case class? + /** + * Estimates of various statistics. The default estimation logic simply sums up the corresponding + * statistic produced by the children. To override this behavior, override `statistics` and + * assign it a overriden version of `Statistics`. + */ protected class Statistics { lazy val childrenStats = children.map(_.statistics) - lazy val numTuples: Long = childrenStats.map(_.numTuples).sum - - lazy val sizeInBytes: Long = { - val sum = childrenStats.map(_.sizeInBytes).sum - if (sum == 0) statsDefaultSizeInBytes else sum - } + lazy val sizeInBytes: Long = childrenStats.map(_.sizeInBytes).sum } - - /** - * Estimates of various statistics. - */ lazy val statistics: Statistics = new Statistics /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala similarity index 97% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala rename to sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index c78c2439c2533..234cb2a445b49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.catalyst.planning +package org.apache.spark.sql import java.util.Properties @@ -40,8 +40,8 @@ private object SQLConf { trait SQLConf { import SQLConf._ - import SQLConf._ - protected[spark] val settings = confSettings + import org.apache.spark.sql.SQLConf._ + @transient protected[spark] val settings = confSettings /** ************************ Spark SQL Params/Hints ******************* */ // TODO: refactor so that these hints accessors don't pollute the name space of SQLContext? diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index d3718c55543fb..1a8357f31f5dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql - import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag @@ -30,7 +29,6 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl.ExpressionConversions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.planning.SQLConf import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index b07f0df7de4b9..7e6454d9c87f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Logging, Row} +import org.apache.spark.sql.{Logging, Row, SQLConf} import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.GenericRow @@ -67,7 +67,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { */ @DeveloperApi case class SparkLogicalPlan(alreadyPlanned: SparkPlan) - extends LogicalPlan with MultiInstanceRelation { + extends LogicalPlan with MultiInstanceRelation with SQLConf { def output = alreadyPlanned.output override def references = Set.empty @@ -80,6 +80,19 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) case _ => sys.error("Multiple instance of the same relation detected.") }).asInstanceOf[this.type] } + + override lazy val statistics = new Statistics { + // If this is wrapping around ExistingRdd and no reasonable estimation logic is implemented, + // return a default value. + override lazy val sizeInBytes: Long = { + val defaultSum = childrenStats.map(_.sizeInBytes).sum + alreadyPlanned match { + case e: ExistingRdd if defaultSum == 0 => statsDefaultSizeInBytes + case _ => defaultSum + } + } + } + } private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] {