Skip to content

Commit

Permalink
Updating unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Aug 5, 2014
1 parent c991b1b commit cc43f68
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 18 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,6 @@ private object Accumulators {
}
}

def stringifyPartialValue(partialValue: Any) = "%s".format(value)
def stringifyPartialValue(partialValue: Any) = "%s".format(partialValue)
def stringifyValue(value: Any) = "%s".format(value)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ class AccumulableInfo (
val id: Long,
val name: String,
val update: Option[String], // represents a partial update within a task
val value: String) { }
val value: String) {

override def equals(other: Any): Boolean = other match {
case acc: AccumulableInfo =>
this.id == acc.id && this.name == acc.name &&
this.update == acc.update && this.value == acc.value
case _ => false
}
}

object AccumulableInfo {
def apply(id: Long, name: String, update: Option[String], value: String) =
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import scala.collection.Map
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._


import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics,
ShuffleWriteMetrics, TaskMetrics}
Expand Down Expand Up @@ -538,10 +540,10 @@ private[spark] object JsonProtocol {
}

def accumulableInfoFromJson(json: JValue): AccumulableInfo = {
val id = (json \ "id").extract[Long]
val name = (json \ "name").extract[String]
val update = Utils.jsonOption(json \ "update").map(_.extract[String])
val value = (json \ "value").extract[String]
val id = (json \ "ID").extract[Long]
val name = (json \ "Name").extract[String]
val update = Utils.jsonOption(json \ "Update").map(_.extract[String])
val value = (json \ "Value").extract[String]
AccumulableInfo(id, name, update, value)
}

Expand Down
40 changes: 28 additions & 12 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class JsonProtocolSuite extends FunSuite {
assert(info.accumulables.nonEmpty)
val oldJson = newJson
.removeField { case (field, _) => field == "Details" }
.removeField { case (field, _) => field == "Accumulated Values" }
.removeField { case (field, _) => field == "Accumulables" }

val newInfo = JsonProtocol.stageInfoFromJson(oldJson)

Expand Down Expand Up @@ -486,20 +486,26 @@ class JsonProtocolSuite extends FunSuite {
private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) }
val stageInfo = new StageInfo(a, "greetings", b, rddInfos, "details")
stageInfo.accumulables("acc1") = "val1"
stageInfo.accumulables("acc2") = "val2"
val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
stageInfo.accumulables(acc1.id) = acc1
stageInfo.accumulables(acc2.id) = acc2
stageInfo
}

private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = {
val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL,
speculative)
taskInfo.accumulables += (("acc1", "val1"))
taskInfo.accumulables += (("acc1", "val1"))
taskInfo.accumulables += (("acc2", "val2"))
val (acc1, acc2, acc3) =
(makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3))
taskInfo.accumulables += acc1
taskInfo.accumulables += acc2
taskInfo.accumulables += acc3
taskInfo
}

private def makeAccumulableInfo(id: Int): AccumulableInfo =
AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id)

/**
* Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
* set to true) or read data from a shuffle otherwise.
Expand Down Expand Up @@ -554,7 +560,8 @@ class JsonProtocolSuite extends FunSuite {
"""
{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name":
"greetings","Number of Tasks":200,"RDD Info":[],"Details":"details",
"AccumulatedValues":{"acc2":"val2","acc1":"val1"}},"Properties":
"Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
{"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]},"Properties":
{"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
"""

Expand All @@ -565,15 +572,18 @@ class JsonProtocolSuite extends FunSuite {
Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true,
"Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301,
"Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details",
"AccumulatedValues":{"acc2":"val2","acc1":"val1"}}}
"Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
{"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]}}
"""

private val taskStartJsonString =
"""
|{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
|"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
|"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,
|"Failed":false,"AccumulableUpdates":[{"acc1":"val1"},{"acc1":"val1"},{"acc2":"val2"}]}}
|"Failed":false,"Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
|"Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
|{"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]}}
""".stripMargin

private val taskGettingResultJsonString =
Expand All @@ -582,7 +592,9 @@ class JsonProtocolSuite extends FunSuite {
| {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor",
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
| "Finish Time":0,"Failed":false,
| "AccumulableUpdates":[{"acc1":"val1"},{"acc1":"val1"},{"acc2":"val2"}]
| "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
| "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
| {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
| }
|}
""".stripMargin
Expand All @@ -595,7 +607,9 @@ class JsonProtocolSuite extends FunSuite {
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
| "Getting Result Time":0,"Finish Time":0,"Failed":false,
| "AccumulableUpdates":[{"acc1":"val1"},{"acc1":"val1"},{"acc2":"val2"}]
| "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
| "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
| {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
|},
|"Task Metrics":{
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
Expand Down Expand Up @@ -635,7 +649,9 @@ class JsonProtocolSuite extends FunSuite {
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
| "Getting Result Time":0,"Finish Time":0,"Failed":false,
| "AccumulableUpdates":[{"acc1":"val1"},{"acc1":"val1"},{"acc2":"val2"}]
| "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
| "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
| {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
|},
|"Task Metrics":{
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
Expand Down

0 comments on commit cc43f68

Please sign in to comment.