From 56a8e6ee1fcf8c54dc5c941eb17e3aaa866fba1a Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 24 Jun 2014 16:32:27 -0700 Subject: [PATCH] Prototype impl of estimations for Catalyst logical plans. - Also add simple size-getters for ParquetRelation and MetastoreRelation. - Add a rule to auto-convert equi-joins to BroadcastHashJoin, if a table has smaller size, based on the above getter (for MetastoreRelation). --- .../catalyst/plans/logical/LogicalPlan.scala | 12 ++++++ .../spark/sql/execution/SparkStrategies.scala | 8 ++-- .../spark/sql/parquet/ParquetRelation.scala | 13 ++++++ .../org/apache/spark/sql/JoinSuite.scala | 9 ++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 42 +++++++++++++------ .../sql/hive/execution/HiveQuerySuite.scala | 27 +++++++++++- 6 files changed, 95 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index edc37e3877c0e..0e52ec9f3c814 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -26,6 +26,18 @@ import org.apache.spark.sql.catalyst.trees abstract class LogicalPlan extends QueryPlan[LogicalPlan] { self: Product => + protected class Estimates { + lazy val childrenEstimations = children.map(_.estimates) + lazy val cardinality: Long = childrenEstimations.map(_.cardinality).sum + lazy val numTuples: Long = childrenEstimations.map(_.size).sum + lazy val size: Long = childrenEstimations.map(_.numTuples).sum + } + + /** + * Estimates of various statistics. + */ + lazy val estimates: Estimates = new Estimates + /** * Returns the set of attributes that are referenced by this node * during evaluation. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index c078e71fe0290..809ccf5b54013 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -71,8 +71,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { condition, left, right @ PhysicalOperation(_, _, b: BaseRelation)) - if broadcastTables.contains(b.tableName) => - broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) + if broadcastTables.contains(b.tableName) + || (right.estimates.size <= sqlContext.autoConvertJoinSize) => + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) case ExtractEquiJoinKeys( Inner, @@ -81,7 +82,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { condition, left @ PhysicalOperation(_, _, b: BaseRelation), right) - if broadcastTables.contains(b.tableName) => + if broadcastTables.contains(b.tableName) + || (left.estimates.size <= sqlContext.autoConvertJoinSize) => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 9c4771d1a9846..91610ea95e747 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -22,11 +22,14 @@ import java.io.IOException import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.permission.FsAction +import org.apache.hadoop.mapreduce.Job import parquet.hadoop.ParquetOutputFormat import parquet.hadoop.metadata.CompressionCodecName +import parquet.hadoop.util.ContextUtil import parquet.schema.MessageType +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} @@ -43,12 +46,22 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} * * @param path The path to the Parquet file. */ +// TODO: make me a BaseRelation? For HashJoin strategy. private[sql] case class ParquetRelation( path: String, @transient conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation { self: Product => + @transient override lazy val estimates = new Estimates { + // TODO: investigate getting encoded column statistics in the parquet file? + override lazy val size: Long = { + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(conf.getOrElse(ContextUtil.getConfiguration(new Job()))) + fs.getContentSummary(hdfsPath).getLength // TODO: in bytes or system-dependent? + } + } + /** Schema derived from ParquetFile */ def parquetSchema: MessageType = ParquetTypesConverter diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index e17ecc87fd52a..e7e3fa9e7a617 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, FullOuter, Inner} import org.apache.spark.sql.execution._ +import org.apache.spark.sql.parquet.ParquetRelation import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext._ @@ -28,6 +29,14 @@ class JoinSuite extends QueryTest { // Ensures tables are loaded. TestData + test("parquet") { + val data = parquetFile("../../points.parquet") // local file! + val sizes = data.logicalPlan.collect { case j: ParquetRelation => + j.newInstance.estimates.size // also works without .newInstance + }.toSeq + assert(sizes.size === 1 && sizes(0) > 0) + } + test("equi-join is hash-join") { val x = testData2.as('x) val y = testData2.as('y) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 156b090712df2..7b6a48cf269d7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.hive import scala.util.parsing.combinator.RegexParsers +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, SerDeInfo} import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table} import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde2.Deserializer import org.apache.spark.annotation.DeveloperApi @@ -64,9 +65,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // Since HiveQL is case insensitive for table names we make them all lowercase. MetastoreRelation( - databaseName, - tblName, - alias)(table.getTTable, partitions.map(part => part.getTPartition)) + databaseName, tblName, alias)( + table.getTTable, partitions.map(part => part.getTPartition))( + hive.hiveconf, table.getPath) } def createTable( @@ -251,7 +252,11 @@ object HiveMetastoreTypes extends RegexParsers { private[hive] case class MetastoreRelation (databaseName: String, tableName: String, alias: Option[String]) (val table: TTable, val partitions: Seq[TPartition]) + (@transient hiveConf: HiveConf, @transient path: Path) extends BaseRelation { + + self: Product => + // TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of table and // use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements of partitions. // Right now, using org.apache.hadoop.hive.ql.metadata.Table and @@ -264,6 +269,19 @@ private[hive] case class MetastoreRelation new Partition(hiveQlTable, p) } + // TODO: are there any stats in hiveQlTable.getSkewedInfo that we can use? + @transient override lazy val estimates = new Estimates { + // Size getters adapted from + // https://github.com/apache/hive/blob/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java + override lazy val size: Long = + maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path) + + private[this] def maybeGetSize(conf: HiveConf, size: String, path: Path): Long = { + val res = try { Some(size.toLong) } catch { case _: Exception => None } + res.getOrElse { path.getFileSystem(conf).getContentSummary(path).getLength } + } + } + val tableDesc = new TableDesc( Class.forName(hiveQlTable.getSerializationLib).asInstanceOf[Class[Deserializer]], hiveQlTable.getInputFormatClass, @@ -275,14 +293,14 @@ private[hive] case class MetastoreRelation hiveQlTable.getMetadata ) - implicit class SchemaAttribute(f: FieldSchema) { - def toAttribute = AttributeReference( - f.getName, - HiveMetastoreTypes.toDataType(f.getType), - // Since data can be dumped in randomly with no validation, everything is nullable. - nullable = true - )(qualifiers = tableName +: alias.toSeq) - } + implicit class SchemaAttribute(f: FieldSchema) { + def toAttribute = AttributeReference( + f.getName, + HiveMetastoreTypes.toDataType(f.getType), + // Since data can be dumped in randomly with no validation, everything is nullable. + nullable = true + )(qualifiers = tableName +: alias.toSeq) + } // Must be a stable value since new attributes are born here. val partitionKeys = hiveQlTable.getPartitionKeys.map(_.toAttribute) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index a022a1e2dc70e..eba4f6fdda3f3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.execution.{BuildRight, BroadcastHashJoin} + import scala.util.Try +import org.apache.spark.sql.{SchemaRDD, Row} +import org.apache.spark.sql.hive.MetastoreRelation import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.{SchemaRDD, Row} case class TestData(a: Int, b: String) @@ -48,6 +51,28 @@ class HiveQuerySuite extends HiveComparisonTest { "Incorrect number of rows in created table") } + // TODO: put me in a separate EstimateSuite? + test("BHJ by size") { + hql("""SET spark.sql.join.broadcastTables=""") // reset broadcast tables + // TODO: use two different tables? + // assume src has small size + val rdd = hql("""SELECT * FROM src a JOIN src b ON a.key = b.key""") + val physical = rdd.queryExecution.sparkPlan + val bhj = physical.collect { case j: BroadcastHashJoin => j } + println(s"${rdd.queryExecution}") + assert(bhj.size === 1) + } + + // TODO: put me in a separate EstimateSuite? + test("estimates the size of a MetastoreRelation") { + val rdd = hql("""SELECT * FROM src""") + println(s"${rdd.queryExecution}") + val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation => + mr.estimates.size + }.toSeq + assert(sizes.size === 1 && sizes(0) > 0) + } + createQueryTest("between", "SELECT * FROM src WHERE key Between 1 and 2")