Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More idiomatic scala code #3

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ trait AppAnalyzer {
print time
*/
def pt(x: Long) : String = {
DF.format(new Date(x))
DF.format(new Date(x))
}
/*
print duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ class AppTimelineAnalyzer extends AppAnalyzer {
val jobids = ac.jobMap.keySet.toBuffer.sortWith( _ < _ )
out.append(s"${pt(startTime)} app started \n")
jobids.map( x => (x, ac.jobMap(x)))
.foreach( x => {
val (jobID, jobTimeSpan) = x
.foreach{ case (jobID, jobTimeSpan) =>
if (jobTimeSpan.duration().isDefined) {
out.println(s"${pt(jobTimeSpan.startTime)} JOB ${jobID} started : duration ${pd(jobTimeSpan.duration().get)} ")
printStageTimeLine(out, jobTimeSpan)
Expand All @@ -47,17 +46,17 @@ class AppTimelineAnalyzer extends AppAnalyzer {
if (stageTimeSpan.duration().isDefined) {
out.println(s"${pt(stageTimeSpan.startTime)} Stage ${stageID} started : duration ${pd(stageTimeSpan.duration().get)} ")
out.println(s"${pt(stageTimeSpan.endTime)} Stage ${stageID} ended : maxTaskTime ${maxTaskTime} taskCount ${stageTimeSpan.taskExecutionTimes.length}")
}else {
} else {
out.println(s"${pt(stageTimeSpan.startTime)} Stage ${stageID} - duration not available ")
}
})
out.println(s"${pt(jobTimeSpan.endTime)} JOB ${jobID} ended ")
}else {
out.println(s"${pt(jobTimeSpan.startTime)} JOB ${jobID} - duration not availble")
}
})
}
out.println(s"${pt(endTime)} app ended \n")
out.toString()
out.toString
}


Expand All @@ -71,8 +70,8 @@ class AppTimelineAnalyzer extends AppAnalyzer {
val x = (endTime-startTime)
if (x <= 80) {
1
}else {
x/80.toDouble
} else {
x/80.0
}
}

Expand All @@ -81,15 +80,14 @@ class AppTimelineAnalyzer extends AppAnalyzer {
(x._2.startTime-startTime)/unit, //start position
(x._2.endTime - startTime)/unit)) //end position
.toBuffer.sortWith( (a, b) => a._1 < b._1)
.foreach( x => {
val (stageID, start, end) = x
.foreach{ case (stageID, start, end) =>
out.print(f"[${stageID}%7s ")
out.print(" " * start.toInt)
out.print("|" * (end.toInt - start.toInt))
if (80 > end) {
out.print(" " * (80 - end.toInt))
}
out.println("]")
})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable
/*
* Created by rohitk on 21/09/17.
*/
class EfficiencyStatisticsAnalyzer extends AppAnalyzer {
class EfficiencyStatisticsAnalyzer extends AppAnalyzer {

def analyze(appContext: AppContext, startTime: Long, endTime: Long): String = {
val ac = appContext.filterByStartAndEndTime(startTime, endTime)
Expand Down
18 changes: 11 additions & 7 deletions src/main/scala/com/qubole/sparklens/common/AppContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@ case class AppContext(appInfo: ApplicationInfo,
appMetrics,
hostMap,
executorMap
.filter(x => x._2.endTime == 0 || //still running
x._2.endTime >= startTime || //ended while app was running
x._2.startTime <= endTime), //started while app was running
.filter{ case (_, timeSpan) => timeSpan.endTime == 0 || //still running
timeSpan.endTime >= startTime || //ended while app was running
timeSpan.startTime <= endTime
}, //started while app was running
jobMap
.filter(x => x._2.startTime >= startTime &&
x._2.endTime <= endTime),
.filter{ case(_, timeSpan) =>
timeSpan.startTime >= startTime &&
timeSpan.endTime <= endTime
},
stageMap
.filter(x => x._2.startTime >= startTime &&
x._2.endTime <= endTime),
.filter{ case (_, timeSpan) => timeSpan.startTime >= startTime &&
timeSpan.endTime <= endTime
},
stageIDToJobID)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,6 @@ class PQParallelStageScheduler(totalCores: Int, taskCountMap: mutable.HashMap[In
wallClock
}
}

override def onStageFinished(stageID: Int): Unit = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ trait TaskScheduler {
def wallClockTime(): Long
def runTillStageCompletion():Int
def isStageComplete(stageID: Int): Boolean
def onStageFinished(stageID: Int): Unit = ???
def onStageFinished(stageID: Int): Unit
}