Skip to content

Commit

Permalink
Remove "display" variable and assume display = name.isDefined
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Aug 4, 2014
1 parent 0ec4ac7 commit 0bb0e33
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 14 deletions.
15 changes: 6 additions & 9 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,20 @@ import org.apache.spark.serializer.JavaSerializer
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param _name human-readable name for use in Spark's web UI
* @param display whether to show accumulator values Spark's web UI
* @param name human-readable name for use in Spark's web UI
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] (
@transient initialValue: R,
param: AccumulableParam[R, T],
_name: Option[String],
val display: Boolean)
val name: Option[String])
extends Serializable {

def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
this(initialValue, param, None, true)
this(initialValue, param, None)

val id: Long = Accumulators.newId
val name = _name.getOrElse(s"accumulator_$id")

@transient private var value_ = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
Expand Down Expand Up @@ -228,9 +225,9 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String],
display: Boolean) extends Accumulable[T,T](initialValue, param, name, display) {
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None, false)
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T,T](initialValue, param, name) {
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
}

/**
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ class SparkContext(config: SparkConf) extends Logging {
* driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = {
new Accumulator(initialValue, param, Some(name), true)
new Accumulator(initialValue, param, Some(name))
}

/**
Expand All @@ -783,7 +783,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @tparam R type that can be added to the accumulator
*/
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
new Accumulable(initialValue, param, Some(name), true)
new Accumulable(initialValue, param, Some(name))

/**
* Create an accumulator from a "mutable collection" type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,12 +819,12 @@ class DAGScheduler(
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
event.accumUpdates.foreach { case (id, partialValue) =>
val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
val name = acc.name
// To avoid UI cruft, ignore cases where value wasn't updated
if (partialValue != acc.zero) {
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
val stringPartialValue = "%s".format(partialValue)
val stringValue = "%s".format(acc.value)
stageToInfos(stage).accumulables(id) = AccumulableInfo(id, acc.name, stringValue)
stageToInfos(stage).accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
Expand Down

0 comments on commit 0bb0e33

Please sign in to comment.