Skip to content

Commit

Permalink
[SPARK-13916][SQL] Add a metric to WholeStageCodegen to measure durat…
Browse files Browse the repository at this point in the history
…ion.

## What changes were proposed in this pull request?

WholeStageCodegen naturally breaks the execution into pipelines that are easier to
measure duration. This is more granular than the task timings (a task can be multiple
pipelines) and is integrated with the web ui.

We currently report total time (across all tasks), min/mask/median to get a sense of how long each is taking.

## How was this patch tested?

Manually tested looking at the web ui.

Author: Nong Li <[email protected]>

Closes apache#11741 from nongli/spark-13916.
  • Loading branch information
nongli authored and roygao94 committed Mar 22, 2016
1 parent 159239e commit b73a7cd
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public abstract class BufferedRowIterator {
protected LinkedList<InternalRow> currentRows = new LinkedList<>();
// used when there is no column in output
protected UnsafeRow unsafeRow = new UnsafeRow(0);
private long startTimeNs = System.nanoTime();

public boolean hasNext() throws IOException {
if (currentRows.isEmpty()) {
Expand All @@ -46,6 +47,14 @@ public InternalRow next() {
return currentRows.remove();
}

/**
* Returns the elapsed time since this object is created. This object represents a pipeline so
* this is a measure of how long the pipeline has been running.
*/
public long durationMs() {
return (System.nanoTime() - startTimeNs) / (1000 * 1000);
}

/**
* Initializes from array of iterators of InternalRow.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.toCommentSafeString
import org.apache.spark.sql.execution.aggregate.TungstenAggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin}
import org.apache.spark.sql.execution.metric.LongSQLMetricValue
import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf

/**
Expand Down Expand Up @@ -264,6 +264,10 @@ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport
override def treeChildren: Seq[SparkPlan] = Nil
}

object WholeStageCodegen {
val PIPELINE_DURATION_METRIC = "duration"
}

/**
* WholeStageCodegen compile a subtree of plans that support codegen together into single Java
* function.
Expand Down Expand Up @@ -301,6 +305,10 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override private[sql] lazy val metrics = Map(
"pipelineTime" -> SQLMetrics.createTimingMetric(sparkContext,
WholeStageCodegen.PIPELINE_DURATION_METRIC))

override def doExecute(): RDD[InternalRow] = {
val ctx = new CodegenContext
val code = child.asInstanceOf[CodegenSupport].produce(ctx, this)
Expand Down Expand Up @@ -339,6 +347,8 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
logDebug(s"${CodeFormatter.format(cleanedSource)}")
CodeGenerator.compile(cleanedSource)

val durationMs = longMetric("pipelineTime")

val rdds = child.asInstanceOf[CodegenSupport].upstreams()
assert(rdds.size <= 2, "Up to two upstream RDDs can be supported")
if (rdds.length == 1) {
Expand All @@ -347,7 +357,11 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(Array(iter))
new Iterator[InternalRow] {
override def hasNext: Boolean = buffer.hasNext
override def hasNext: Boolean = {
val v = buffer.hasNext
if (!v) durationMs += buffer.durationMs()
v
}
override def next: InternalRow = buffer.next()
}
}
Expand All @@ -358,7 +372,11 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(Array(leftIter, rightIter))
new Iterator[InternalRow] {
override def hasNext: Boolean = buffer.hasNext
override def hasNext: Boolean = {
val v = buffer.hasNext
if (!v) durationMs += buffer.durationMs()
v
}
override def next: InternalRow = buffer.next()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private class LongSQLMetricParam(val stringValue: Seq[Long] => String, initialVa

private object LongSQLMetricParam extends LongSQLMetricParam(_.sum.toString, 0L)

private object StaticsLongSQLMetricParam extends LongSQLMetricParam(
private object StatisticsBytesSQLMetricParam extends LongSQLMetricParam(
(values: Seq[Long]) => {
// This is a workaround for SPARK-11013.
// We use -1 as initial value of the accumulator, if the accumulator is valid, we will update
Expand All @@ -140,6 +140,24 @@ private object StaticsLongSQLMetricParam extends LongSQLMetricParam(
s"\n$sum ($min, $med, $max)"
}, -1L)

private object StatisticsTimingSQLMetricParam extends LongSQLMetricParam(
(values: Seq[Long]) => {
// This is a workaround for SPARK-11013.
// We use -1 as initial value of the accumulator, if the accumulator is valid, we will update
// it at the end of task and the value will be at least 0.
val validValues = values.filter(_ >= 0)
val Seq(sum, min, med, max) = {
val metric = if (validValues.length == 0) {
Seq.fill(4)(0L)
} else {
val sorted = validValues.sorted
Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
}
metric.map(Utils.msDurationToString)
}
s"\n$sum ($min, $med, $max)"
}, -1L)

private[sql] object SQLMetrics {

// Identifier for distinguishing SQL metrics from other accumulators
Expand Down Expand Up @@ -168,15 +186,24 @@ private[sql] object SQLMetrics {
// The final result of this metric in physical operator UI may looks like:
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
createLongMetric(sc, s"$name total (min, med, max)", StaticsLongSQLMetricParam)
createLongMetric(sc, s"$name total (min, med, max)", StatisticsBytesSQLMetricParam)
}

def createTimingMetric(sc: SparkContext, name: String): LongSQLMetric = {
// The final result of this metric in physical operator UI may looks like:
// duration(min, med, max):
// 5s (800ms, 1s, 2s)
createLongMetric(sc, s"$name total (min, med, max)", StatisticsTimingSQLMetricParam)
}

def getMetricParam(metricParamName: String): SQLMetricParam[SQLMetricValue[Any], Any] = {
val longSQLMetricParam = Utils.getFormattedClassName(LongSQLMetricParam)
val staticsSQLMetricParam = Utils.getFormattedClassName(StaticsLongSQLMetricParam)
val bytesSQLMetricParam = Utils.getFormattedClassName(StatisticsBytesSQLMetricParam)
val timingsSQLMetricParam = Utils.getFormattedClassName(StatisticsTimingSQLMetricParam)
val metricParam = metricParamName match {
case `longSQLMetricParam` => LongSQLMetricParam
case `staticsSQLMetricParam` => StaticsLongSQLMetricParam
case `bytesSQLMetricParam` => StatisticsBytesSQLMetricParam
case `timingsSQLMetricParam` => StatisticsTimingSQLMetricParam
}
metricParam.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.{SparkPlanInfo, WholeStageCodegen}
import org.apache.spark.sql.execution.metric.SQLMetrics

/**
Expand Down Expand Up @@ -79,12 +79,19 @@ private[sql] object SparkPlanGraph {
exchanges: mutable.HashMap[SparkPlanInfo, SparkPlanGraphNode]): Unit = {
planInfo.nodeName match {
case "WholeStageCodegen" =>
val metrics = planInfo.metrics.map { metric =>
SQLPlanMetric(metric.name, metric.accumulatorId,
SQLMetrics.getMetricParam(metric.metricParam))
}

val cluster = new SparkPlanGraphCluster(
nodeIdGenerator.getAndIncrement(),
planInfo.nodeName,
planInfo.simpleString,
mutable.ArrayBuffer[SparkPlanGraphNode]())
mutable.ArrayBuffer[SparkPlanGraphNode](),
metrics)
nodes += cluster

buildSparkPlanGraphNode(
planInfo.children.head, nodeIdGenerator, nodes, edges, parent, cluster, exchanges)
case "InputAdapter" =>
Expand Down Expand Up @@ -166,13 +173,26 @@ private[ui] class SparkPlanGraphCluster(
id: Long,
name: String,
desc: String,
val nodes: mutable.ArrayBuffer[SparkPlanGraphNode])
extends SparkPlanGraphNode(id, name, desc, Map.empty, Nil) {
val nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
metrics: Seq[SQLPlanMetric])
extends SparkPlanGraphNode(id, name, desc, Map.empty, metrics) {

override def makeDotNode(metricsValue: Map[Long, String]): String = {
val duration = metrics.filter(_.name.startsWith(WholeStageCodegen.PIPELINE_DURATION_METRIC))
val labelStr = if (duration.nonEmpty) {
require(duration.length == 1)
val id = duration(0).accumulatorId
if (metricsValue.contains(duration(0).accumulatorId)) {
name + "\n\n" + metricsValue.get(id).get
} else {
name
}
} else {
name
}
s"""
| subgraph cluster${id} {
| label="${StringEscapeUtils.escapeJava(name)}";
| label="${StringEscapeUtils.escapeJava(labelStr)}";
| ${nodes.map(_.makeDotNode(metricsValue)).mkString(" \n")}
| }
""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
// Because "save" will create a new DataFrame internally, we cannot get the real metric id.
// However, we still can check the value.
assert(metricValues.values.toSeq === Seq("2"))
assert(metricValues.values.toSeq.exists(_ === "2"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,16 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {

test("basic") {
def checkAnswer(actual: Map[Long, String], expected: Map[Long, Long]): Unit = {
assert(actual === expected.mapValues(_.toString))
assert(actual.size == expected.size)
expected.foreach { e =>
// The values in actual can be SQL metrics meaning that they contain additional formatting
// when converted to string. Verify that they start with the expected value.
// TODO: this is brittle. There is no requirement that the actual string needs to start
// with the accumulator value.
assert(actual.contains(e._1))
val v = actual.get(e._1).get.trim
assert(v.startsWith(e._2.toString))
}
}

val listener = new SQLListener(sqlContext.sparkContext.conf)
Expand Down

0 comments on commit b73a7cd

Please sign in to comment.