From 64d2ce1efee3aa5a8166c5fe108932b2279217fc Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 18 Feb 2014 18:29:21 -0800 Subject: [PATCH] Fix BlockManagerUI bug by introducing new event Previously, the storage information of persisted RDD's continued to rely on the old SparkContext, which is no longer accessible if the UI is rendered from disk. This fix solves it by introducing an event, SparkListenerGetRDDInfo, which captures this information. Per discussion with Patrick, an alternative is to encapsulate this information within SparkListenerTaskEnd. This would bypass the need to create a new event, but would also require a non-trivial refactor of BlockManager / BlockStore. --- .../spark/scheduler/SparkListener.scala | 45 ++++++++++++++----- .../spark/scheduler/SparkListenerBus.scala | 2 + .../apache/spark/storage/StorageUtils.scala | 41 ++++++++++++++--- .../scala/org/apache/spark/ui/SparkUI.scala | 4 +- .../org/apache/spark/ui/UISparkListener.scala | 9 ++-- .../scala/org/apache/spark/ui/UIUtils.scala | 6 +-- .../apache/spark/ui/env/EnvironmentUI.scala | 3 +- .../apache/spark/ui/exec/ExecutorsUI.scala | 2 +- .../org/apache/spark/ui/jobs/IndexPage.scala | 2 +- .../org/apache/spark/ui/jobs/PoolPage.scala | 2 +- .../org/apache/spark/ui/jobs/StagePage.scala | 5 ++- .../spark/ui/storage/BlockManagerUI.scala | 28 +++++++++++- .../apache/spark/ui/storage/IndexPage.scala | 11 +++-- .../org/apache/spark/ui/storage/RDDPage.scala | 6 +-- .../spark/util/TimeStampedHashMap.scala | 1 - 15 files changed, 125 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index b1745c67e7a34..0ac7462ee6aeb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -22,7 +22,7 @@ import java.util.Properties import org.apache.spark.util.{Utils, Distribution} import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.storage.StorageStatus +import org.apache.spark.storage.{RDDInfo, StorageStatus} import net.liftweb.json.JsonDSL._ import net.liftweb.json.JsonAST._ @@ -123,7 +123,7 @@ private[spark] case class SparkListenerLoadEnvironment( } } -/** An event used in the ExecutorUI to fetch storage status from SparkEnv */ +/** An event used in the ExecutorsUI and BlockManagerUI to fetch storage status from SparkEnv */ private[spark] case class SparkListenerStorageStatusFetch(storageStatusList: Seq[StorageStatus]) extends SparkListenerEvent { override def toJson = { @@ -133,6 +133,16 @@ private[spark] case class SparkListenerStorageStatusFetch(storageStatusList: Seq } } +/** An event used in the BlockManagerUI to query information of persisted RDDs */ +private[spark] case class SparkListenerGetRDDInfo(rddInfoList: Seq[RDDInfo]) + extends SparkListenerEvent { + override def toJson = { + val rddInfoListJson = JArray(rddInfoList.map(_.toJson).toList) + super.toJson ~ + ("RDD Info List" -> rddInfoListJson) + } +} + /** An event used in the listener to shutdown the listener daemon thread. */ private[scheduler] case object SparkListenerShutdown extends SparkListenerEvent @@ -151,6 +161,7 @@ object SparkListenerEvent { val jobEnd = Utils.getFormattedClassName(SparkListenerJobEnd) val loadEnvironment = Utils.getFormattedClassName(SparkListenerLoadEnvironment) val storageStatusFetch = Utils.getFormattedClassName(SparkListenerStorageStatusFetch) + val getRDDInfo = Utils.getFormattedClassName(SparkListenerGetRDDInfo) val shutdown = Utils.getFormattedClassName(SparkListenerShutdown) (json \ "Event").extract[String] match { @@ -163,32 +174,33 @@ object SparkListenerEvent { case `jobEnd` => jobEndFromJson(json) case `loadEnvironment` => loadEnvironmentFromJson(json) case `storageStatusFetch` => storageStatusFetchFromJson(json) + case `getRDDInfo` => getRDDInfoFromJson(json) case `shutdown` => SparkListenerShutdown } } - private def stageSubmittedFromJson(json: JValue) = { + private def stageSubmittedFromJson(json: JValue): SparkListenerEvent = { new SparkListenerStageSubmitted( StageInfo.fromJson(json \ "Stage Info"), Utils.propertiesFromJson(json \ "Properties")) } - private def stageCompletedFromJson(json: JValue) = { + private def stageCompletedFromJson(json: JValue): SparkListenerEvent = { new SparkListenerStageCompleted(StageInfo.fromJson(json \ "Stage Info")) } - private def taskStartFromJson(json: JValue) = { + private def taskStartFromJson(json: JValue): SparkListenerEvent = { implicit val format = DefaultFormats new SparkListenerTaskStart( (json \ "Stage ID").extract[Int], TaskInfo.fromJson(json \ "Task Info")) } - private def taskGettingResultFromJson(json: JValue) = { + private def taskGettingResultFromJson(json: JValue): SparkListenerEvent = { new SparkListenerTaskGettingResult(TaskInfo.fromJson(json \ "Task Info")) } - private def taskEndFromJson(json: JValue) = { + private def taskEndFromJson(json: JValue): SparkListenerEvent = { implicit val format = DefaultFormats new SparkListenerTaskEnd( (json \ "Stage ID").extract[Int], @@ -198,7 +210,7 @@ object SparkListenerEvent { TaskMetrics.fromJson(json \ "Task Metrics")) } - private def jobStartFromJson(json: JValue) = { + private def jobStartFromJson(json: JValue): SparkListenerEvent = { implicit val format = DefaultFormats val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int]) new SparkListenerJobStart( @@ -207,14 +219,14 @@ object SparkListenerEvent { Utils.propertiesFromJson(json \ "Properties")) } - private def jobEndFromJson(json: JValue) = { + private def jobEndFromJson(json: JValue): SparkListenerEvent = { implicit val format = DefaultFormats new SparkListenerJobEnd( (json \ "Job ID").extract[Int], JobResult.fromJson(json \ "Job Result")) } - private def loadEnvironmentFromJson(json: JValue) = { + private def loadEnvironmentFromJson(json: JValue): SparkListenerEvent = { implicit val format = DefaultFormats new SparkListenerLoadEnvironment( Utils.mapFromJson(json \ "JVM Information").toSeq, @@ -223,12 +235,19 @@ object SparkListenerEvent { Utils.mapFromJson(json \ "Classpath Entries").toSeq) } - private def storageStatusFetchFromJson(json: JValue) = { + private def storageStatusFetchFromJson(json: JValue): SparkListenerEvent = { implicit val format = DefaultFormats val storageStatusList = (json \ "Storage Status List").extract[List[JValue]].map(StorageStatus.fromJson) new SparkListenerStorageStatusFetch(storageStatusList) } + + private def getRDDInfoFromJson(json: JValue): SparkListenerEvent = { + implicit val format = DefaultFormats + val rddInfoList = + (json \ "RDD Info List").extract[List[JValue]].map(RDDInfo.fromJson) + new SparkListenerGetRDDInfo(rddInfoList) + } } @@ -282,6 +301,10 @@ trait SparkListener { */ def onStorageStatusFetch(storageStatusFetch: SparkListenerStorageStatusFetch) { } + /** + * Called when Spark queries statuses of persisted RDD's + */ + def onGetRDDInfo(getRDDInfo: SparkListenerGetRDDInfo) { } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index fa3c3d03e2616..9f1a22b8378ee 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -73,6 +73,8 @@ private[spark] class SparkListenerBus extends Logging { listeners.foreach(_.onLoadEnvironment(loadEnvironment)) case storageStatusFetch: SparkListenerStorageStatusFetch => listeners.foreach(_.onStorageStatusFetch(storageStatusFetch)) + case getRDDInfo: SparkListenerGetRDDInfo => + listeners.foreach(_.onGetRDDInfo(getRDDInfo)) case SparkListenerShutdown => return true case _ => diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index af32b2eafe568..6931c13bd6976 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -76,19 +76,48 @@ case object StorageStatus { } } -case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, - numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) - extends Ordered[RDDInfo] { +case class RDDInfo( + id: Int, + name: String, + storageLevel: StorageLevel, + numCachedPartitions: Int, + numPartitions: Int, + memSize: Long, + diskSize: Long) + extends JsonSerializable with Ordered[RDDInfo] { override def toString = { - import Utils.bytesToString ("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " + "DiskSize: %s").format(name, id, storageLevel.toString, numCachedPartitions, - numPartitions, bytesToString(memSize), bytesToString(diskSize)) + numPartitions, Utils.bytesToString(memSize), Utils.bytesToString(diskSize)) } override def compare(that: RDDInfo) = { this.id - that.id } + + override def toJson = { + ("RDD ID" -> id) ~ + ("Name" -> name) ~ + ("Storage Level" -> storageLevel.toJson) ~ + ("Number of Cached Partitions" -> numCachedPartitions) ~ + ("Number of Partitions" -> numPartitions) ~ + ("Memory Size" -> memSize) ~ + ("Disk Size" -> diskSize) + } +} + +case object RDDInfo { + def fromJson(json: JValue): RDDInfo = { + implicit val format = DefaultFormats + new RDDInfo( + (json \ "RDD ID").extract[Int], + (json \ "Name").extract[String], + StorageLevel.fromJson(json \ "Storage Level"), + (json \ "Number of Cached Partitions").extract[Int], + (json \ "Number of Partitions").extract[Int], + (json \ "Memory Size").extract[Long], + (json \ "Disk Size").extract[Long]) + } } /* Helper methods for storage-related objects */ @@ -114,7 +143,7 @@ object StorageUtils { sc: SparkContext) : Array[RDDInfo] = { // Group by rddId, ignore the partition name - val groupedRddBlocks = infos.groupBy { case(k, v) => k.rddId }.mapValues(_.values.toArray) + val groupedRddBlocks = infos.groupBy { case (k, v) => k.rddId }.mapValues(_.values.toArray) // For each RDD, generate an RDDInfo object val rddInfos = groupedRddBlocks.map { case (rddId, rddBlocks) => diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index af1fc32653617..ca5e101a1b03c 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -129,7 +129,9 @@ private[spark] class SparkUI(val sc: SparkContext, live: Boolean = true) extends .format(dirPath)) return false } - val logFiles = logDir.listFiles.filter(_.isFile) + // Maintaining the order of log files is important because information of one job is + // dependent on that of another + val logFiles = logDir.listFiles.filter(_.isFile).sortBy(_.getName) if (logFiles.size == 0) { logWarning("No logs found in given directory %s when rendering persisted Spark Web UI!" .format(dirPath)) diff --git a/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala b/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala index 3d234a4defbdf..9bf65637bcb59 100644 --- a/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/UISparkListener.scala @@ -111,6 +111,12 @@ private[spark] class GatewayUISparkListener(live: Boolean) extends SparkListener logEvent(storageStatusFetch) logger.foreach(_.flush()) } + + override def onGetRDDInfo(getRDDInfo: SparkListenerGetRDDInfo) { + listeners.foreach(_.onGetRDDInfo(getRDDInfo)) + logEvent(getRDDInfo) + logger.foreach(_.flush()) + } } /** @@ -140,9 +146,6 @@ private[spark] class StorageStatusFetchSparkListener( } } - /** - * Update local state with fetch result, and log the appropriate event - */ override def onStorageStatusFetch(storageStatusFetch: SparkListenerStorageStatusFetch) { storageStatusList = storageStatusFetch.storageStatusList } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 547a194d58a5c..2732f18423bff 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -32,7 +32,7 @@ private[spark] object UIUtils { def prependBaseUri(resource: String = "") = uiRoot + resource /** Returns a spark page with correctly formatted headers */ - def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value) + def headerSparkPage(content: => Seq[Node], appName: String, title: String, page: Page.Value) : Seq[Node] = { val jobs = page match { case Stages =>
  • Stages
  • @@ -60,7 +60,7 @@ private[spark] object UIUtils { type="text/css" /> - {sc.appName} - {title} + {appName} - {title} diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala index 924f2c03356e8..4819ce14b34ae 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala @@ -64,7 +64,7 @@ private[spark] class EnvironmentUI(parent: SparkUI, live: Boolean) {

    Classpath Entries

    {classpathEntriesTable} - UIUtils.headerSparkPage(content, sc, "Environment", Environment) + UIUtils.headerSparkPage(content, sc.appName, "Environment", Environment) } private def propertyHeader = Seq("Name", "Value") @@ -120,7 +120,6 @@ private[spark] class EnvironmentListener( } } - /** Prepare environment information for UI to render */ override def onLoadEnvironment(loadEnvironment: SparkListenerLoadEnvironment) { jvmInformation = loadEnvironment.jvmInformation sparkProperties = loadEnvironment.sparkProperties diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index 9dfcb50d41849..8ae10d14d64b3 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -72,7 +72,7 @@ private[spark] class ExecutorsUI(parent: SparkUI, live: Boolean) { ; - UIUtils.headerSparkPage(content, sc, "Executors (" + execInfo.size + ")", Executors) + UIUtils.headerSparkPage(content, sc.appName, "Executors (" + execInfo.size + ")", Executors) } /** Header fields for the executors table */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index abc1114e01162..7aff13f8134e7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -80,7 +80,7 @@ private[spark] class IndexPage(parent: JobProgressUI) {

    Failed Stages ({failedStages.size})

    ++ failedStagesTable.toNodeSeq - UIUtils.headerSparkPage(content, sc, "Spark Stages", Stages) + UIUtils.headerSparkPage(content, sc.appName, "Spark Stages", Stages) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index cce54fca6e168..27d688a092232 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -45,7 +45,7 @@ private[spark] class PoolPage(parent: JobProgressUI) { val content =

    Summary

    ++ poolTable.toNodeSeq ++

    {activeStages.size} Active Stages

    ++ activeStagesTable.toNodeSeq - UIUtils.headerSparkPage(content, sc, "Fair Scheduler Pool: " + poolName, Stages) + UIUtils.headerSparkPage(content, sc.appName, "Fair Scheduler Pool: " + poolName, Stages) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 09be0c323f696..b856ed0b12019 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -43,7 +43,8 @@ private[spark] class StagePage(parent: JobProgressUI) {

    Summary Metrics

    No tasks have started yet

    Tasks

    No tasks have started yet - return UIUtils.headerSparkPage(content, sc, "Details for Stage %s".format(stageId), Stages) + return UIUtils.headerSparkPage( + content, sc.appName, "Details for Stage %s".format(stageId), Stages) } val tasks = listener.stageIdToTaskInfos(stageId).values.toSeq.sortBy(_.taskInfo.launchTime) @@ -202,7 +203,7 @@ private[spark] class StagePage(parent: JobProgressUI) {

    Aggregated Metrics by Executor

    ++ executorTable.toNodeSeq ++

    Tasks

    ++ taskTable - UIUtils.headerSparkPage(content, sc, "Details for Stage %d".format(stageId), Stages) + UIUtils.headerSparkPage(content, sc.appName, "Details for Stage %d".format(stageId), Stages) } } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala index 2cc4d44fb78c6..ce1b7887286ce 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala @@ -22,6 +22,8 @@ import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.Handler import org.apache.spark.SparkContext +import org.apache.spark.scheduler._ +import org.apache.spark.storage.{StorageUtils, RDDInfo} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.{GatewayUISparkListener, SparkUI, StorageStatusFetchSparkListener} @@ -52,4 +54,28 @@ private[spark] class BlockManagerListener( sc: SparkContext, gateway: GatewayUISparkListener, live: Boolean) - extends StorageStatusFetchSparkListener(sc, gateway, live) + extends StorageStatusFetchSparkListener(sc, gateway, live) { + var rddInfoList: Seq[RDDInfo] = Seq() + + def getRDDInfo() { + if (live) { + val rddInfo = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) + val getRDDInfo = new SparkListenerGetRDDInfo(rddInfo) + gateway.onGetRDDInfo(getRDDInfo) + } + } + + override def onGetRDDInfo(getRDDInfo: SparkListenerGetRDDInfo) { + rddInfoList = getRDDInfo.rddInfoList + } + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = { + super.onStageSubmitted(stageSubmitted) + getRDDInfo() + } + + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = { + super.onStageCompleted(stageCompleted) + getRDDInfo() + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala index 7ad1b3909c506..4667e4cbeb049 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala @@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.storage.{RDDInfo, StorageUtils} +import org.apache.spark.storage.RDDInfo import org.apache.spark.ui.Page._ import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils @@ -32,13 +32,12 @@ private[spark] class IndexPage(parent: BlockManagerUI) { private def listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { - listener.fetchStorageStatus() - val storageStatusList = listener.storageStatusList - // Calculate macro-level statistics - val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) + listener.fetchStorageStatus() + listener.getRDDInfo() + val rdds = listener.rddInfoList val content = UIUtils.listingTable(rddHeader, rddRow, rdds) - UIUtils.headerSparkPage(content, sc, "Storage ", Storage) + UIUtils.headerSparkPage(content, sc.appName, "Storage ", Storage) } /** Header fields for the RDD table */ diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 32db30edad08a..6c2e5c239e0e8 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -27,7 +27,6 @@ import org.apache.spark.ui.Page._ import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils - /** Page showing storage details for a given RDD */ private[spark] class RDDPage(parent: BlockManagerUI) { private val sc = parent.sc @@ -35,11 +34,12 @@ private[spark] class RDDPage(parent: BlockManagerUI) { def render(request: HttpServletRequest): Seq[Node] = { listener.fetchStorageStatus() + listener.getRDDInfo() val storageStatusList = listener.storageStatusList val id = request.getParameter("id").toInt val filteredStorageStatusList = StorageUtils.filterStorageStatusByRDD(storageStatusList.toArray, id) - val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head + val rddInfo = listener.rddInfoList.filter(_.id == id).head // Worker table val workers = filteredStorageStatusList.map((id, _)) @@ -96,7 +96,7 @@ private[spark] class RDDPage(parent: BlockManagerUI) { ; - UIUtils.headerSparkPage(content, sc, "RDD Storage Info for " + rddInfo.name, Storage) + UIUtils.headerSparkPage(content, sc.appName, "RDD Storage Info for " + rddInfo.name, Storage) } /** Header fields for the worker table */ diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index 8e07a0f29addf..9451dee3c4c40 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConversions import scala.collection.mutable.Map import scala.collection.immutable -import org.apache.spark.scheduler.MapStatus import org.apache.spark.Logging /**