Skip to content

Commit

Permalink
More merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Aug 4, 2014
1 parent 1da15e3 commit c5ace9e
Showing 1 changed file with 4 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,48 +130,15 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val info = taskEnd.taskInfo
if (info != null) {
<<<<<<< HEAD
val emptyMap = HashMap[Long, AccumulableInfo]()
val accumulables = stageIdToAccumulables.getOrElseUpdate(sid, emptyMap)
for (accumulableInfo <- info.accumulables) {
accumulables(accumulableInfo.id) = accumulableInfo
}

// create executor summary map if necessary
val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
op = new HashMap[String, ExecutorSummary]())
executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new ExecutorSummary)

val executorSummary = executorSummaryMap.get(info.executorId)
executorSummary match {
case Some(y) => {
// first update failed-task, succeed-task
taskEnd.reason match {
case Success =>
y.succeededTasks += 1
case _ =>
y.failedTasks += 1
}

// update duration
y.taskTime += info.duration

val metrics = taskEnd.taskMetrics
if (metrics != null) {
metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
y.memoryBytesSpilled += metrics.memoryBytesSpilled
y.diskBytesSpilled += metrics.diskBytesSpilled
}
}
case _ => {}
=======
val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, {
logWarning("Task end for unknown stage " + taskEnd.stageId)
new StageUIData
})

for (accumulableInfo <- info.accumulables) {
stageData.accumulables(accumulableInfo.id) = accumulableInfo
}

val execSummaryMap = stageData.executorSummary
val execSummary = execSummaryMap.getOrElseUpdate(info.executorId, new ExecutorSummary)

Expand All @@ -180,7 +147,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
execSummary.succeededTasks += 1
case _ =>
execSummary.failedTasks += 1
>>>>>>> apache/master
}
execSummary.taskTime += info.duration
stageData.numActiveTasks -= 1
Expand Down

0 comments on commit c5ace9e

Please sign in to comment.