Skip to content

Commit

Permalink
support injection histogram information to statistics apache#10
Browse files Browse the repository at this point in the history
  • Loading branch information
GH-Gloway committed Oct 18, 2017
1 parent 01a2e50 commit 4aa79d6
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,16 +352,19 @@ object CatalogTable {
case class CatalogStatistics(
sizeInBytes: BigInt,
rowCount: Option[BigInt] = None,
colStats: Map[String, ColumnStat] = Map.empty) {
colStats: Map[String, ColumnStat] = Map.empty,
histograms: Map[String, Histogram] = Map.empty) {

/**
* Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
* on column names.
*/
def toPlanStats(planOutput: Seq[Attribute]): Statistics = {
val matched = planOutput.flatMap(a => colStats.get(a.name).map(a -> _))
val matched2 = planOutput.flatMap(a => histograms.get(a.name).map(a -> _))
Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount,
attributeStats = AttributeMap(matched))
attributeStats = AttributeMap(matched),
histograms = AttributeMap(matched2))
}

/** Readable string representation for the CatalogStatistics. */
Expand Down
23 changes: 21 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Histogram}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.command.DDLUtils
Expand All @@ -54,6 +54,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize"
val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows"
val STATISTICS_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats."
val STATISTICS_HISTOGRAM_PREFIX = STATISTICS_PREFIX + "histogram."

/**
* Specifies the input data source format.
Expand Down Expand Up @@ -214,6 +215,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val columnStats = new scala.collection.mutable.HashMap[String, ColumnStat]() {
override def default(key: String): ColumnStat = ColumnStat(0, None, None, 0, 0, 0)
}

val histograms = new scala.collection.mutable.HashMap[String, Histogram]() {
override def default(key: String): Histogram = Histogram(bucket = Nil,
distinctCount = Nil, height = 0.0)
}
var catalogStatistics = CatalogStatistics(0)
extraOptions.foreach(entry => {
val key = entry._1
Expand All @@ -237,9 +243,22 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
catalogStatistics = catalogStatistics.copy(rowCount = Some(BigInt(value)))
} else if (key == STATISTICS_TOTAL_SIZE) {
catalogStatistics = catalogStatistics.copy(sizeInBytes = BigInt(value))
} else if (key.startsWith(STATISTICS_HISTOGRAM_PREFIX)) {
val temp = key.substring(STATISTICS_COL_STATS_PREFIX.length).split("\\.")
val fieldName = temp(0)
val parameter = temp(1)
val hist = histograms(fieldName)
val newHist = parameter match {
case "buckets" => hist.copy(
bucket = value.substring(1, value.length-1).split(",").map(_.trim.toDouble).toList)
case "distinctEachBucket" => hist.copy(
distinctCount = value.substring(1, value.length-1).split(",").map(_.trim.toLong).toList
)
case "height" => hist.copy(height = value.toLong)
}
}
})
catalogStatistics.copy(colStats = columnStats.toMap)
catalogStatistics.copy(colStats = columnStats.toMap, histograms = histograms.toMap)
}

private def fromExternalString(s: String, name: String, dataType: DataType): Any = {
Expand Down

0 comments on commit 4aa79d6

Please sign in to comment.