Skip to content

Commit

Permalink
Minor style changes and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Jul 7, 2014
1 parent 7a63abc commit 9f18bad
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
poolToActiveStages(stageIdToPool(stageId)).remove(stageId)

val accumulables = stageIdToAccumulables.getOrElseUpdate(stageId, HashMap[String, String]())
stageCompleted.stageInfo.accumulatedValues.foreach { case (name, value) =>
for ((name, value) <- stageCompleted.stageInfo.accumulatedValues) {
accumulables(name) = value
}

Expand Down Expand Up @@ -156,7 +156,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {

if (info != null) {
val accumulables = stageIdToAccumulables.getOrElseUpdate(sid, HashMap[String, String]())
info.accumulableValues.map { case (name, value) =>
for ((name, value) <- info.accumulableValues) {
accumulables(name) = value
}

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<td sorttable_customkey={gcTime.toString}>
{if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
</td>
<td>{Unparsed(info.accumulableValues.map{ case (k, v) => s"$k: $v" }.mkString("<br/>"))}</td>
<td>
{Unparsed(info.accumulableValues.map{ case (k, v) => s"$k: $v" }.mkString("<br/>"))}
</td>
<!--
TODO: Add this back after we add support to hide certain columns.
<td sorttable_customkey={serializationTime.toString}>
Expand Down
15 changes: 11 additions & 4 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,22 @@ class JsonProtocolSuite extends FunSuite {
testBlockId(StreamBlockId(1, 2L))
}

test("StageInfo.details backward compatibility") {
// StageInfo.details was added after 1.0.0.
test("StageInfo backward compatibility") {
val info = makeStageInfo(1, 2, 3, 4L, 5L)
assert(info.details.nonEmpty)
val newJson = JsonProtocol.stageInfoToJson(info)
val oldJson = newJson.removeField { case (field, _) => field == "Details" }

// Fields added after 1.0.0.
assert(info.details.nonEmpty)
assert(info.accumulatedValues.nonEmpty)
val oldJson = newJson
.removeField { case (field, _) => field == "Details" }
.removeField { case (field, _) => field == "Accumulated Values" }

val newInfo = JsonProtocol.stageInfoFromJson(oldJson)

assert(info.name === newInfo.name)
assert("" === newInfo.details)
assert(0 === newInfo.accumulatedValues.size)
}

test("InputMetrics backward compatibility") {
Expand Down

0 comments on commit 9f18bad

Please sign in to comment.