-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2298] Encode stage attempt in SparkListener & UI. #1545
Conversation
i think we can add jobid to stageTable. because jobid is very useful when a application has many jobs.that can distinguish every job's stages. |
@@ -43,13 +43,16 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { | |||
// How many stages to remember | |||
val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) | |||
|
|||
val activeStages = HashMap[Int, StageInfo]() | |||
// Map from stageId to StageInfo | |||
val activeStages = new HashMap[Int, StageInfo] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't this also indexed by stageId+attemptID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh because only one attempt will be active at once? If so many add a comment describing that?
QA tests have started for PR 1545. This patch merges cleanly. |
I pushed a new version that merges cleanly with master. |
QA tests have started for PR 1545 at commit
|
@@ -1029,6 +1033,7 @@ class DAGScheduler( | |||
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => | |||
// Mark the stage that the reducer was in as unrunnable | |||
val failedStage = stageIdToStage(task.stageId) | |||
listenerBus.post(SparkListenerStageCompleted(failedStage.info)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to just call markStageAsFinished here (instead of the two lines above)? I just wonder if doing that will help avoid future bugs along this code path.
It looks like now, one stage can represent multiple stage attempts (in which case Stage.numTasks is wrong for the later attempts), but there's one StageInfo per attempt, and Stage.info is reset based on which attempt is currently running? This seems a bit ugly / error prone, and it also seems problematic in the case we discussed offline where a stage can have multiple active attempts (if this case really does happen). Did you consider changing the resubmitFailedStages() method in the DAGScheduler to create a new Stage for the failed one (and then adding a copy() method or something to Stage that creates a new one based on the current one)? |
Also what are the semantics for accumulables for resubmitted stages? I ask because right now, the way you copy StageInfo, the values of accumulables gets wiped when a stage gets resubmitted...JW is that's the desired behavior. |
Ok so if you're anxious to get this in, how about this simpler fix to make this a little less ugly: |
Tests timed out after a configured wait of |
QA tests have started for PR 1545 at commit
|
Tests timed out after a configured wait of |
Jenkins, retest this please. |
QA tests have started for PR 1545 at commit
|
Pulled this from the jenkins log
|
if (isSuccessful) { | ||
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) | ||
} else { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spacing seems off here
QA tests have started for PR 1545 at commit
|
QA tests have finished for PR 1545 at commit
|
QA tests have started for PR 1545 at commit
|
@@ -56,9 +57,15 @@ private[spark] object StageInfo { | |||
* shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a | |||
* sequence of narrow dependencies should also be associated with this Stage. | |||
*/ | |||
def fromStage(stage: Stage): StageInfo = { | |||
def fromStage(stage: Stage, numTasks: Option[Int] = None): StageInfo = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a nit, but I think this method might be better as an updateStageInfo(numTasks: Int) method in Stage(), that creates an appropriate StageInfo and then sets latestInfo accordingly (since I think that would make it a little clearer to a reader what the usage of this is). Fine if you think it's better this way though...
Ok I pushed a new version that should address the hanging JobCancellationSuite test. I also went through all the changes to make sure similar problems wouldn't happen due to racing. |
QA tests have started for PR 1545 at commit
|
QA tests have finished for PR 1545 at commit
|
Tests timed out after a configured wait of |
Jenkins, test this please. |
QA tests have started for PR 1545 at commit
|
QA tests have finished for PR 1545 at commit
|
Jenkins, retest this please. |
QA tests have started for PR 1545 at commit
|
QA tests have finished for PR 1545 at commit
|
Jenkins retest this please. |
Jenkins, test this please. |
QA tests have started for PR 1545 at commit
|
QA tests have finished for PR 1545 at commit
|
- Properly report stage failure in FetchFailed.
Jenkins, test this please. |
QA tests have started for PR 1545 at commit
|
QA tests have finished for PR 1545 at commit
|
Okay I did another pass on this - thanks @kayousterhout and @rxin for taking a lot of time on this. This will be a major usability improvement in the case of complex jobs that have failures. |
Simple way to reproduce this in the UI: ```scala val f = new java.io.File("/tmp/test") f.delete() sc.parallelize(1 to 2, 2).map(x => (x,x )).repartition(3).mapPartitionsWithContext { case (context, iter) => if (context.partitionId == 0) { val f = new java.io.File("/tmp/test") if (!f.exists) { f.mkdir() System.exit(0); } } iter }.count() ``` Author: Reynold Xin <[email protected]> Closes #1545 from rxin/stage-attempt and squashes the following commits: 3ee1d2a [Reynold Xin] - Rename attempt to retry in UI. - Properly report stage failure in FetchFailed. 40a6bd5 [Reynold Xin] Updated test suites. c414c36 [Reynold Xin] Fixed the hanging in JobCancellationSuite. b3e2eed [Reynold Xin] Oops previous code didn't compile. 0f36075 [Reynold Xin] Mark unknown stage attempt with id -1 and drop that in JobProgressListener. 6c08b07 [Reynold Xin] Addressed code review feedback. 4e5faa2 [Reynold Xin] [SPARK-2298] Encode stage attempt in SparkListener & UI.
Simple way to reproduce this in the UI: ```scala val f = new java.io.File("/tmp/test") f.delete() sc.parallelize(1 to 2, 2).map(x => (x,x )).repartition(3).mapPartitionsWithContext { case (context, iter) => if (context.partitionId == 0) { val f = new java.io.File("/tmp/test") if (!f.exists) { f.mkdir() System.exit(0); } } iter }.count() ``` Author: Reynold Xin <[email protected]> Closes apache#1545 from rxin/stage-attempt and squashes the following commits: 3ee1d2a [Reynold Xin] - Rename attempt to retry in UI. - Properly report stage failure in FetchFailed. 40a6bd5 [Reynold Xin] Updated test suites. c414c36 [Reynold Xin] Fixed the hanging in JobCancellationSuite. b3e2eed [Reynold Xin] Oops previous code didn't compile. 0f36075 [Reynold Xin] Mark unknown stage attempt with id -1 and drop that in JobProgressListener. 6c08b07 [Reynold Xin] Addressed code review feedback. 4e5faa2 [Reynold Xin] [SPARK-2298] Encode stage attempt in SparkListener & UI.
Simple way to reproduce this in the UI: