Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13916][SQL] Add a metric to WholeStageCodegen to measure duration. #11741

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public abstract class BufferedRowIterator {
protected LinkedList<InternalRow> currentRows = new LinkedList<>();
// used when there is no column in output
protected UnsafeRow unsafeRow = new UnsafeRow(0);
private long startTimeMs = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


public boolean hasNext() throws IOException {
if (currentRows.isEmpty()) {
Expand All @@ -46,6 +47,14 @@ public InternalRow next() {
return currentRows.remove();
}

/**
* Returns the elapsed time since this object is created. This object represents a pipeline so
* this is a measure of how long the pipeline has been running.
*/
public long durationMs() {
return System.currentTimeMillis() - startTimeMs;
}

/**
* Initializes from array of iterators of InternalRow.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.toCommentSafeString
import org.apache.spark.sql.execution.aggregate.TungstenAggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin}
import org.apache.spark.sql.execution.metric.LongSQLMetricValue
import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf

/**
Expand Down Expand Up @@ -263,6 +263,10 @@ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport
override def treeChildren: Seq[SparkPlan] = Nil
}

object WholeStageCodegen {
val PIPELINE_DURATION_METRIC = "duration"
}

/**
* WholeStageCodegen compile a subtree of plans that support codegen together into single Java
* function.
Expand Down Expand Up @@ -300,6 +304,10 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override private[sql] lazy val metrics = Map(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this work if there are two WholeStageCodegens in a single stage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't matter. That would be similar to have two projects in a single stage. The metrics are per instance.

"pipelineTime" -> SQLMetrics.createTimingMetric(sparkContext,
WholeStageCodegen.PIPELINE_DURATION_METRIC))

override def doExecute(): RDD[InternalRow] = {
val ctx = new CodegenContext
val code = child.asInstanceOf[CodegenSupport].produce(ctx, this)
Expand Down Expand Up @@ -338,6 +346,8 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
logDebug(s"${CodeFormatter.format(cleanedSource)}")
CodeGenerator.compile(cleanedSource)

val durationMs = longMetric("pipelineTime")

val rdds = child.asInstanceOf[CodegenSupport].upstreams()
assert(rdds.size <= 2, "Up to two upstream RDDs can be supported")
if (rdds.length == 1) {
Expand All @@ -346,7 +356,11 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(Array(iter))
new Iterator[InternalRow] {
override def hasNext: Boolean = buffer.hasNext
override def hasNext: Boolean = {
val v = buffer.hasNext
if (!v) durationMs += buffer.durationMs()
v
}
override def next: InternalRow = buffer.next()
}
}
Expand All @@ -357,7 +371,11 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(Array(leftIter, rightIter))
new Iterator[InternalRow] {
override def hasNext: Boolean = buffer.hasNext
override def hasNext: Boolean = {
val v = buffer.hasNext
if (!v) durationMs += buffer.durationMs()
v
}
override def next: InternalRow = buffer.next()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private class LongSQLMetricParam(val stringValue: Seq[Long] => String, initialVa

private object LongSQLMetricParam extends LongSQLMetricParam(_.sum.toString, 0L)

private object StaticsLongSQLMetricParam extends LongSQLMetricParam(
private object StatisticsBytesSQLMetricParam extends LongSQLMetricParam(
(values: Seq[Long]) => {
// This is a workaround for SPARK-11013.
// We use -1 as initial value of the accumulator, if the accumulator is valid, we will update
Expand All @@ -140,6 +140,24 @@ private object StaticsLongSQLMetricParam extends LongSQLMetricParam(
s"\n$sum ($min, $med, $max)"
}, -1L)

private object StatisticsTimingSQLMetricParam extends LongSQLMetricParam(
(values: Seq[Long]) => {
// This is a workaround for SPARK-11013.
// We use -1 as initial value of the accumulator, if the accumulator is valid, we will update
// it at the end of task and the value will be at least 0.
val validValues = values.filter(_ >= 0)
val Seq(sum, min, med, max) = {
val metric = if (validValues.length == 0) {
Seq.fill(4)(0L)
} else {
val sorted = validValues.sorted
Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
}
metric.map(Utils.msDurationToString)
}
s"\n$sum ($min, $med, $max)"
}, -1L)

private[sql] object SQLMetrics {

// Identifier for distinguishing SQL metrics from other accumulators
Expand Down Expand Up @@ -168,15 +186,24 @@ private[sql] object SQLMetrics {
// The final result of this metric in physical operator UI may looks like:
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
createLongMetric(sc, s"$name total (min, med, max)", StaticsLongSQLMetricParam)
createLongMetric(sc, s"$name total (min, med, max)", StatisticsBytesSQLMetricParam)
}

def createTimingMetric(sc: SparkContext, name: String): LongSQLMetric = {
// The final result of this metric in physical operator UI may looks like:
// duration(min, med, max):
// 1s (800msMB, 1s, 2s)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

msMB?

createLongMetric(sc, s"$name total (min, med, max)", StatisticsTimingSQLMetricParam)
}

def getMetricParam(metricParamName: String): SQLMetricParam[SQLMetricValue[Any], Any] = {
val longSQLMetricParam = Utils.getFormattedClassName(LongSQLMetricParam)
val staticsSQLMetricParam = Utils.getFormattedClassName(StaticsLongSQLMetricParam)
val bytesSQLMetricParam = Utils.getFormattedClassName(StatisticsBytesSQLMetricParam)
val timingsSQLMetricParam = Utils.getFormattedClassName(StatisticsTimingSQLMetricParam)
val metricParam = metricParamName match {
case `longSQLMetricParam` => LongSQLMetricParam
case `staticsSQLMetricParam` => StaticsLongSQLMetricParam
case `bytesSQLMetricParam` => StatisticsBytesSQLMetricParam
case `timingsSQLMetricParam` => StatisticsTimingSQLMetricParam
}
metricParam.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.{SparkPlanInfo, WholeStageCodegen}
import org.apache.spark.sql.execution.metric.SQLMetrics

/**
Expand Down Expand Up @@ -79,12 +79,19 @@ private[sql] object SparkPlanGraph {
exchanges: mutable.HashMap[SparkPlanInfo, SparkPlanGraphNode]): Unit = {
planInfo.nodeName match {
case "WholeStageCodegen" =>
val metrics = planInfo.metrics.map { metric =>
SQLPlanMetric(metric.name, metric.accumulatorId,
SQLMetrics.getMetricParam(metric.metricParam))
}

val cluster = new SparkPlanGraphCluster(
nodeIdGenerator.getAndIncrement(),
planInfo.nodeName,
planInfo.simpleString,
mutable.ArrayBuffer[SparkPlanGraphNode]())
mutable.ArrayBuffer[SparkPlanGraphNode](),
metrics)
nodes += cluster

buildSparkPlanGraphNode(
planInfo.children.head, nodeIdGenerator, nodes, edges, parent, cluster, exchanges)
case "InputAdapter" =>
Expand Down Expand Up @@ -166,13 +173,26 @@ private[ui] class SparkPlanGraphCluster(
id: Long,
name: String,
desc: String,
val nodes: mutable.ArrayBuffer[SparkPlanGraphNode])
extends SparkPlanGraphNode(id, name, desc, Map.empty, Nil) {
val nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
metrics: Seq[SQLPlanMetric])
extends SparkPlanGraphNode(id, name, desc, Map.empty, metrics) {

override def makeDotNode(metricsValue: Map[Long, String]): String = {
val duration = metrics.filter(_.name.startsWith(WholeStageCodegen.PIPELINE_DURATION_METRIC))
val labelStr = if (duration.nonEmpty) {
require(duration.length == 1)
val id = duration(0).accumulatorId
if (metricsValue.contains(duration(0).accumulatorId)) {
name + "\n\n" + metricsValue.get(id).get
} else {
name
}
} else {
name
}
s"""
| subgraph cluster${id} {
| label="${StringEscapeUtils.escapeJava(name)}";
| label="${StringEscapeUtils.escapeJava(labelStr)}";
| ${nodes.map(_.makeDotNode(metricsValue)).mkString(" \n")}
| }
""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
// Because "save" will create a new DataFrame internally, we cannot get the real metric id.
// However, we still can check the value.
assert(metricValues.values.toSeq === Seq("2"))
assert(metricValues.values.toSeq.exists(_ === "2"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,16 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {

test("basic") {
def checkAnswer(actual: Map[Long, String], expected: Map[Long, Long]): Unit = {
assert(actual === expected.mapValues(_.toString))
assert(actual.size == expected.size)
expected.foreach { e =>
// The values in actual can be SQL metrics meaning that they contain additional formatting
// when converted to string. Verify that they start with the expected value.
// TODO: this is brittle. There is no requirement that the actual string needs to start
// with the accumulator value.
assert(actual.contains(e._1))
val v = actual.get(e._1).get.trim
assert(v.startsWith(e._2.toString))
}
}

val listener = new SQLListener(sqlContext.sparkContext.conf)
Expand Down