Skip to content

Commit

Permalink
Address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
shahidki31 committed May 17, 2021
1 parent 4a53ccb commit bcd628b
Showing 1 changed file with 47 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical.statsEstimation

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.expressions.AttributeMap
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics, Union}
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -70,10 +68,9 @@ object UnionEstimation {
None
}

val (attrToComputeMinMaxStats, attrToComputeNullCount) = getAttributeToComputeStats(union)
val attrStatsWithMinMax = computeMinMaxAttributeStatsMap(union, attrToComputeMinMaxStats)
val newAttrStats = updateAttributeStatsMapWithNullCount(union, attrStatsWithMinMax,
attrToComputeNullCount)
val newMinMaxStats = computeMinMaxStats(union)
val newNullCountStats = computeNullCountStats(union)
val newAttrStats = combineStats(newMinMaxStats, newNullCountStats)

Some(
Statistics(
Expand All @@ -82,7 +79,8 @@ object UnionEstimation {
attributeStats = newAttrStats))
}

private def getAttributeToComputeStats(union: Union) = {
/** This method computes the min-max statistics and return the attribute stats Map. */
private def computeMinMaxStats(union: Union) = {
val unionOutput = union.output
val attrToComputeMinMaxStats = union.children.map(_.output).transpose.zipWithIndex.filter {
case (attrs, outputIndex) => isTypeSupported(unionOutput(outputIndex).dataType) &&
Expand All @@ -93,24 +91,7 @@ object UnionEstimation {
attrStats.get(attr).isDefined && attrStats(attr).hasMinMaxStats
}
}
val attrToComputeNullCount = union.children.map(_.output).transpose.zipWithIndex.filter {
case (attrs, _) => attrs.zipWithIndex.forall {
case (attr, childIndex) =>
val attrStats = union.children(childIndex).stats.attributeStats
attrStats.get(attr).isDefined && attrStats(attr).nullCount.isDefined
}
}
(attrToComputeMinMaxStats, attrToComputeNullCount)
}

/** This method computes the min-max statistics and return the attribute stats Map. */
private def computeMinMaxAttributeStatsMap(
union: Union,
attrToComputeMinMaxStats: Seq[(Seq[Attribute], Int)]) = {
val unionOutput = union.output
val attrStatsWithMinMax = if (attrToComputeMinMaxStats.nonEmpty) {
val outputAttrStats = new ArrayBuffer[(Attribute, ColumnStat)]()
attrToComputeMinMaxStats.foreach {
val outputAttrStats = attrToComputeMinMaxStats.map {
case (attrs, outputIndex) =>
val dataType = unionOutput(outputIndex).dataType
val statComparator = createStatComparator(dataType)
Expand All @@ -130,49 +111,56 @@ object UnionEstimation {
(min, max)
}
val newStat = ColumnStat(min = minMaxValue._1, max = minMaxValue._2)
outputAttrStats += unionOutput(outputIndex) -> newStat
unionOutput(outputIndex) -> newStat
}
if (outputAttrStats.nonEmpty) {
AttributeMap(outputAttrStats.toSeq)
} else {
AttributeMap.empty[ColumnStat]
}
attrStatsWithMinMax
}

/** This method computes the null count statistics and update the attrStatsWithMinMax Map. */
private def updateAttributeStatsMapWithNullCount(
union: Union,
attrStatsWithMinMax: AttributeMap[ColumnStat],
attrToComputeNullCount: Seq[(Seq[Attribute], Int)]) = {
/** This method computes the null count statistics and return the attribute stats Map. */
private def computeNullCountStats(union: Union) = {
val unionOutput = union.output

val newAttrStats = if (attrToComputeNullCount.nonEmpty) {
val outputAttrStats = new ArrayBuffer[(Attribute, ColumnStat)]()
attrToComputeNullCount.foreach {
case (attrs, outputIndex) =>
val firstStat = union.children.head.stats.attributeStats(attrs.head)
val firstNullCount = firstStat.nullCount.get
val colWithNullStatValues = attrs.zipWithIndex.tail.foldLeft[BigInt](firstNullCount) {
case (totalNullCount, (attr, childIndex)) =>
val colStat = union.children(childIndex).stats.attributeStats(attr)
totalNullCount + colStat.nullCount.get
}

// If the attribute stats are already computed in min-max stats, update the
// statistics with null count.
if (attrStatsWithMinMax.get(unionOutput(outputIndex)).isDefined) {
val updatedColStat = attrStatsWithMinMax(unionOutput(outputIndex))
.copy(nullCount = Some(colWithNullStatValues))
outputAttrStats += (unionOutput(outputIndex) -> updatedColStat)
} else {
outputAttrStats += (unionOutput(outputIndex) ->
ColumnStat(nullCount = Some(colWithNullStatValues)))
}
val attrToComputeNullCount = union.children.map(_.output).transpose.zipWithIndex.filter {
case (attrs, _) => attrs.zipWithIndex.forall {
case (attr, childIndex) =>
val attrStats = union.children(childIndex).stats.attributeStats
attrStats.get(attr).isDefined && attrStats(attr).nullCount.isDefined
}
AttributeMap(attrStatsWithMinMax.toSeq ++ outputAttrStats.toSeq)
}
val outputAttrStats = attrToComputeNullCount.map {
case (attrs, outputIndex) =>
val firstStat = union.children.head.stats.attributeStats(attrs.head)
val firstNullCount = firstStat.nullCount.get
val colWithNullStatValues = attrs.zipWithIndex.tail.foldLeft[BigInt](firstNullCount) {
case (totalNullCount, (attr, childIndex)) =>
val colStat = union.children(childIndex).stats.attributeStats(attr)
totalNullCount + colStat.nullCount.get
}
val newStat = ColumnStat(nullCount = Some(colWithNullStatValues))
unionOutput(outputIndex) -> newStat
}
if (outputAttrStats.nonEmpty) {
AttributeMap(outputAttrStats.toSeq)
} else {
attrStatsWithMinMax
AttributeMap.empty[ColumnStat]
}
}

// Combine the two maps by updating the min-max stats map with null count stats.
private def combineStats(
minMaxStats: AttributeMap[ColumnStat],
nullCountStats: AttributeMap[ColumnStat]) = {
val updatedNullCountStats = nullCountStats.keys.map { key =>
if (minMaxStats.get(key).isDefined) {
val updatedColsStats = minMaxStats(key).copy(nullCount = nullCountStats(key).nullCount)
key -> updatedColsStats
} else {
key -> nullCountStats(key)
}
}
newAttrStats
AttributeMap(minMaxStats.toSeq ++ updatedNullCountStats)
}
}

0 comments on commit bcd628b

Please sign in to comment.