diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index c8407bbcb780b..99924bf67099e 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -22,7 +22,7 @@ import java.util.{Properties, UUID} import org.apache.spark.scheduler.cluster.ExecutorInfo import scala.collection.JavaConverters._ -import scala.collection.Map +import scala.collection.{immutable, Map} import org.json4s.DefaultFormats import org.json4s.JsonDSL._ @@ -793,7 +793,7 @@ private[spark] object JsonProtocol { def executorInfoFromJson(json: JValue): ExecutorInfo = { val executorHost = (json \ "Host").extract[String] val totalCores = (json \ "Total Cores").extract[Int] - val logUrls = mapFromJson(json \ "Log Urls").toMap + val logUrls = Utils.jsonOption(json \ "Log Urls").map(mapFromJson).getOrElse(Map.empty) new ExecutorInfo(executorHost, totalCores, logUrls) } @@ -801,7 +801,7 @@ private[spark] object JsonProtocol { * Util JSON deserialization methods | * --------------------------------- */ - def mapFromJson(json: JValue): Map[String, String] = { + def mapFromJson(json: JValue): immutable.Map[String, String] = { val jsonFields = json.asInstanceOf[JObject].obj jsonFields.map { case JField(k, JString(v)) => (k, v) }.toMap } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 842f54529baf0..113553cb5c633 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -22,7 +22,7 @@ import java.util.Properties import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.shuffle.MetadataFetchFailedException -import scala.collection.Map +import scala.collection.{immutable, Map} import org.json4s.jackson.JsonMethods._ import org.scalatest.FunSuite @@ -280,6 +280,16 @@ class JsonProtocolSuite extends FunSuite { assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent)) } + test("ExecutorInfo backward compatibility") { + // Prior to Spark 1.3.0, ExecutorInfo did not have a "Log Urls" property + val executorInfo = + new ExecutorInfo("hostname", totalCores = 10, immutable.Map("stderr" -> "somePath")) + val oldExecutorInfoJson = JsonProtocol.executorInfoToJson(executorInfo) + .removeField{ _._1 == "Log Urls" } + val expectedExecutorInfo = new ExecutorInfo("hostname", totalCores = 10, immutable.Map.empty) + assertEquals(expectedExecutorInfo, JsonProtocol.executorInfoFromJson(oldExecutorInfoJson)) + } + /** -------------------------- * | Helper test running methods | * --------------------------- */