From 0bb0e33ff1dbe3230e5f7d0073f34cb57fe383af Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 3 Aug 2014 23:58:50 -0700 Subject: [PATCH] Remove "display" variable and assume display = name.isDefined --- .../scala/org/apache/spark/Accumulators.scala | 15 ++++++--------- .../scala/org/apache/spark/SparkContext.scala | 4 ++-- .../org/apache/spark/scheduler/DAGScheduler.scala | 6 +++--- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 92fc037d4d049..39c00c9d3267f 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -36,23 +36,20 @@ import org.apache.spark.serializer.JavaSerializer * * @param initialValue initial value of accumulator * @param param helper object defining how to add elements of type `R` and `T` - * @param _name human-readable name for use in Spark's web UI - * @param display whether to show accumulator values Spark's web UI + * @param name human-readable name for use in Spark's web UI * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in */ class Accumulable[R, T] ( @transient initialValue: R, param: AccumulableParam[R, T], - _name: Option[String], - val display: Boolean) + val name: Option[String]) extends Serializable { def this(@transient initialValue: R, param: AccumulableParam[R, T]) = - this(initialValue, param, None, true) + this(initialValue, param, None) val id: Long = Accumulators.newId - val name = _name.getOrElse(s"accumulator_$id") @transient private var value_ = initialValue // Current value on master val zero = param.zero(initialValue) // Zero value to be passed to workers @@ -228,9 +225,9 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa * @param param helper object defining how to add elements of type `T` * @tparam T result type */ -class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String], - display: Boolean) extends Accumulable[T,T](initialValue, param, name, display) { - def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None, false) +class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String]) + extends Accumulable[T,T](initialValue, param, name) { + def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None) } /** diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 61392507ac32c..09736be47fa65 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -763,7 +763,7 @@ class SparkContext(config: SparkConf) extends Logging { * driver can access the accumulator's `value`. */ def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = { - new Accumulator(initialValue, param, Some(name), true) + new Accumulator(initialValue, param, Some(name)) } /** @@ -783,7 +783,7 @@ class SparkContext(config: SparkConf) extends Logging { * @tparam R type that can be added to the accumulator */ def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) = - new Accumulable(initialValue, param, Some(name), true) + new Accumulable(initialValue, param, Some(name)) /** * Create an accumulator from a "mutable collection" type. diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 03a8c98c0a95a..b6e2aab31ee24 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -819,12 +819,12 @@ class DAGScheduler( Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted event.accumUpdates.foreach { case (id, partialValue) => val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]] - val name = acc.name // To avoid UI cruft, ignore cases where value wasn't updated - if (partialValue != acc.zero) { + if (acc.name.isDefined && partialValue != acc.zero) { + val name = acc.name.get val stringPartialValue = "%s".format(partialValue) val stringValue = "%s".format(acc.value) - stageToInfos(stage).accumulables(id) = AccumulableInfo(id, acc.name, stringValue) + stageToInfos(stage).accumulables(id) = AccumulableInfo(id, name, stringValue) event.taskInfo.accumulables += AccumulableInfo(id, name, Some(stringPartialValue), stringValue) }