From 77a0b241f6f0614bfa1f8e212b3e39422533e33e Mon Sep 17 00:00:00 2001 From: Sayat Satybaldiyev Date: Wed, 28 Mar 2018 17:06:36 +0200 Subject: [PATCH 1/2] More ideomatic scala code --- .../sparklens/analyzer/AppAnalyzer.scala | 2 +- .../analyzer/AppTimelineAnalyzer.scala | 18 ++++++++---------- .../EfficiencyStatisticsAnalyzer.scala | 2 +- .../qubole/sparklens/common/AppContext.scala | 18 +++++++++++------- .../sparklens/scheduler/TaskScheduler.scala | 2 +- 5 files changed, 22 insertions(+), 20 deletions(-) diff --git a/src/main/scala/com/qubole/sparklens/analyzer/AppAnalyzer.scala b/src/main/scala/com/qubole/sparklens/analyzer/AppAnalyzer.scala index f04357f..48c0388 100644 --- a/src/main/scala/com/qubole/sparklens/analyzer/AppAnalyzer.scala +++ b/src/main/scala/com/qubole/sparklens/analyzer/AppAnalyzer.scala @@ -40,7 +40,7 @@ trait AppAnalyzer { print time */ def pt(x: Long) : String = { - DF.format(new Date(x)) + DF.format(new Date(x)) } /* print duration diff --git a/src/main/scala/com/qubole/sparklens/analyzer/AppTimelineAnalyzer.scala b/src/main/scala/com/qubole/sparklens/analyzer/AppTimelineAnalyzer.scala index 45352df..9e9703e 100644 --- a/src/main/scala/com/qubole/sparklens/analyzer/AppTimelineAnalyzer.scala +++ b/src/main/scala/com/qubole/sparklens/analyzer/AppTimelineAnalyzer.scala @@ -35,8 +35,7 @@ class AppTimelineAnalyzer extends AppAnalyzer { val jobids = ac.jobMap.keySet.toBuffer.sortWith( _ < _ ) out.append(s"${pt(startTime)} app started \n") jobids.map( x => (x, ac.jobMap(x))) - .foreach( x => { - val (jobID, jobTimeSpan) = x + .foreach{ case (jobID, jobTimeSpan) => if (jobTimeSpan.duration().isDefined) { out.println(s"${pt(jobTimeSpan.startTime)} JOB ${jobID} started : duration ${pd(jobTimeSpan.duration().get)} ") printStageTimeLine(out, jobTimeSpan) @@ -47,7 +46,7 @@ class AppTimelineAnalyzer extends AppAnalyzer { if (stageTimeSpan.duration().isDefined) { out.println(s"${pt(stageTimeSpan.startTime)} Stage ${stageID} started : duration ${pd(stageTimeSpan.duration().get)} ") out.println(s"${pt(stageTimeSpan.endTime)} Stage ${stageID} ended : maxTaskTime ${maxTaskTime} taskCount ${stageTimeSpan.taskExecutionTimes.length}") - }else { + } else { out.println(s"${pt(stageTimeSpan.startTime)} Stage ${stageID} - duration not available ") } }) @@ -55,9 +54,9 @@ class AppTimelineAnalyzer extends AppAnalyzer { }else { out.println(s"${pt(jobTimeSpan.startTime)} JOB ${jobID} - duration not availble") } - }) + } out.println(s"${pt(endTime)} app ended \n") - out.toString() + out.toString } @@ -71,8 +70,8 @@ class AppTimelineAnalyzer extends AppAnalyzer { val x = (endTime-startTime) if (x <= 80) { 1 - }else { - x/80.toDouble + } else { + x/80.0 } } @@ -81,8 +80,7 @@ class AppTimelineAnalyzer extends AppAnalyzer { (x._2.startTime-startTime)/unit, //start position (x._2.endTime - startTime)/unit)) //end position .toBuffer.sortWith( (a, b) => a._1 < b._1) - .foreach( x => { - val (stageID, start, end) = x + .foreach{ case (stageID, start, end) => out.print(f"[${stageID}%7s ") out.print(" " * start.toInt) out.print("|" * (end.toInt - start.toInt)) @@ -90,6 +88,6 @@ class AppTimelineAnalyzer extends AppAnalyzer { out.print(" " * (80 - end.toInt)) } out.println("]") - }) + } } } diff --git a/src/main/scala/com/qubole/sparklens/analyzer/EfficiencyStatisticsAnalyzer.scala b/src/main/scala/com/qubole/sparklens/analyzer/EfficiencyStatisticsAnalyzer.scala index d673d10..c4e48f3 100644 --- a/src/main/scala/com/qubole/sparklens/analyzer/EfficiencyStatisticsAnalyzer.scala +++ b/src/main/scala/com/qubole/sparklens/analyzer/EfficiencyStatisticsAnalyzer.scala @@ -23,7 +23,7 @@ import scala.collection.mutable /* * Created by rohitk on 21/09/17. */ -class EfficiencyStatisticsAnalyzer extends AppAnalyzer { +class EfficiencyStatisticsAnalyzer extends AppAnalyzer { def analyze(appContext: AppContext, startTime: Long, endTime: Long): String = { val ac = appContext.filterByStartAndEndTime(startTime, endTime) diff --git a/src/main/scala/com/qubole/sparklens/common/AppContext.scala b/src/main/scala/com/qubole/sparklens/common/AppContext.scala index dc1cf60..34d2f48 100644 --- a/src/main/scala/com/qubole/sparklens/common/AppContext.scala +++ b/src/main/scala/com/qubole/sparklens/common/AppContext.scala @@ -33,15 +33,19 @@ case class AppContext(appInfo: ApplicationInfo, appMetrics, hostMap, executorMap - .filter(x => x._2.endTime == 0 || //still running - x._2.endTime >= startTime || //ended while app was running - x._2.startTime <= endTime), //started while app was running + .filter{ case (_, timeSpan) => timeSpan.endTime == 0 || //still running + timeSpan.endTime >= startTime || //ended while app was running + timeSpan.startTime <= endTime + }, //started while app was running jobMap - .filter(x => x._2.startTime >= startTime && - x._2.endTime <= endTime), + .filter{ case(_, timeSpan) => + timeSpan.startTime >= startTime && + timeSpan.endTime <= endTime + }, stageMap - .filter(x => x._2.startTime >= startTime && - x._2.endTime <= endTime), + .filter{ case (_, timeSpan) => timeSpan.startTime >= startTime && + timeSpan.endTime <= endTime + }, stageIDToJobID) } diff --git a/src/main/scala/com/qubole/sparklens/scheduler/TaskScheduler.scala b/src/main/scala/com/qubole/sparklens/scheduler/TaskScheduler.scala index 537d6b1..a45d5a8 100644 --- a/src/main/scala/com/qubole/sparklens/scheduler/TaskScheduler.scala +++ b/src/main/scala/com/qubole/sparklens/scheduler/TaskScheduler.scala @@ -24,5 +24,5 @@ trait TaskScheduler { def wallClockTime(): Long def runTillStageCompletion():Int def isStageComplete(stageID: Int): Boolean - def onStageFinished(stageID: Int): Unit = ??? + def onStageFinished(stageID: Int): Unit } From 4d6a09451f049b689b8dfa3aaf6089b33a22dff8 Mon Sep 17 00:00:00 2001 From: Sayat Satybaldiyev Date: Wed, 28 Mar 2018 17:16:33 +0200 Subject: [PATCH 2/2] Add default method to PQParallelStageScheduler --- .../qubole/sparklens/scheduler/PQParallelStageScheduler.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/scala/com/qubole/sparklens/scheduler/PQParallelStageScheduler.scala b/src/main/scala/com/qubole/sparklens/scheduler/PQParallelStageScheduler.scala index 3e329fb..130102d 100644 --- a/src/main/scala/com/qubole/sparklens/scheduler/PQParallelStageScheduler.scala +++ b/src/main/scala/com/qubole/sparklens/scheduler/PQParallelStageScheduler.scala @@ -101,4 +101,6 @@ class PQParallelStageScheduler(totalCores: Int, taskCountMap: mutable.HashMap[In wallClock } } + + override def onStageFinished(stageID: Int): Unit = ??? } \ No newline at end of file