From c5ace9ebf3792b806411ccf271c938efefadebb4 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 4 Aug 2014 12:38:01 -0700 Subject: [PATCH] More merge conflicts --- .../spark/ui/jobs/JobProgressListener.scala | 42 ++----------------- 1 file changed, 4 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 6a5cfbe7906ec..8606ef6434445 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -130,48 +130,15 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val info = taskEnd.taskInfo if (info != null) { -<<<<<<< HEAD - val emptyMap = HashMap[Long, AccumulableInfo]() - val accumulables = stageIdToAccumulables.getOrElseUpdate(sid, emptyMap) - for (accumulableInfo <- info.accumulables) { - accumulables(accumulableInfo.id) = accumulableInfo - } - - // create executor summary map if necessary - val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid, - op = new HashMap[String, ExecutorSummary]()) - executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new ExecutorSummary) - - val executorSummary = executorSummaryMap.get(info.executorId) - executorSummary match { - case Some(y) => { - // first update failed-task, succeed-task - taskEnd.reason match { - case Success => - y.succeededTasks += 1 - case _ => - y.failedTasks += 1 - } - - // update duration - y.taskTime += info.duration - - val metrics = taskEnd.taskMetrics - if (metrics != null) { - metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead } - metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } - metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } - y.memoryBytesSpilled += metrics.memoryBytesSpilled - y.diskBytesSpilled += metrics.diskBytesSpilled - } - } - case _ => {} -======= val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, { logWarning("Task end for unknown stage " + taskEnd.stageId) new StageUIData }) + for (accumulableInfo <- info.accumulables) { + stageData.accumulables(accumulableInfo.id) = accumulableInfo + } + val execSummaryMap = stageData.executorSummary val execSummary = execSummaryMap.getOrElseUpdate(info.executorId, new ExecutorSummary) @@ -180,7 +147,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { execSummary.succeededTasks += 1 case _ => execSummary.failedTasks += 1 ->>>>>>> apache/master } execSummary.taskTime += info.duration stageData.numActiveTasks -= 1