Skip to content

Commit

Permalink
Moving some code into the Accumulators class
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Aug 4, 2014
1 parent 9a9ba3c commit c991b1b
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 4 deletions.
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,7 @@ private object Accumulators {
}
}
}

def stringifyPartialValue(partialValue: Any) = "%s".format(value)
def stringifyValue(value: Any) = "%s".format(value)
}
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,6 @@ class DAGScheduler(
listenerBus.post(SparkListenerStageCompleted(stage.info))
runningStages -= stage
}

event.reason match {
case Success =>
if (event.accumUpdates != null) {
Expand All @@ -918,8 +917,8 @@ class DAGScheduler(
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
val stringPartialValue = "%s".format(partialValue)
val stringValue = "%s".format(acc.value)
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
val stringValue = Accumulators.stringifyValue(acc.value)
stage.info.accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class TaskInfo(
/**
* Intermediate updates to accumulables during this task. Note that it is valid for the same
* accumulable to be updated multiple times in a single task or for two accumulables with the
* same name but different ID's to exist in a task.
* same name but different IDs to exist in a task.
*/
val accumulables = ListBuffer[AccumulableInfo]()

Expand Down

0 comments on commit c991b1b

Please sign in to comment.