Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2298] Encode stage attempt in SparkListener & UI. #1545

Closed
wants to merge 7 commits into from

Conversation

rxin
Copy link
Contributor

@rxin rxin commented Jul 23, 2014

Simple way to reproduce this in the UI:

val f = new java.io.File("/tmp/test")
f.delete()
sc.parallelize(1 to 2, 2).map(x => (x,x )).repartition(3).mapPartitionsWithContext { case (context, iter) =>
  if (context.partitionId == 0) {
    val f = new java.io.File("/tmp/test")
    if (!f.exists) {
      f.mkdir()
      System.exit(0);
    }
  }
  iter
}.count()

@lianhuiwang
Copy link
Contributor

i think we can add jobid to stageTable. because jobid is very useful when a application has many jobs.that can distinguish every job's stages.

@@ -43,13 +43,16 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// How many stages to remember
val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)

val activeStages = HashMap[Int, StageInfo]()
// Map from stageId to StageInfo
val activeStages = new HashMap[Int, StageInfo]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this also indexed by stageId+attemptID?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh because only one attempt will be active at once? If so many add a comment describing that?

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA tests have started for PR 1545. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17392/consoleFull

@rxin
Copy link
Contributor Author

rxin commented Aug 18, 2014

I pushed a new version that merges cleanly with master.

@SparkQA
Copy link

SparkQA commented Aug 18, 2014

QA tests have started for PR 1545 at commit 4e5faa2.

  • This patch merges cleanly.

@@ -1029,6 +1033,7 @@ class DAGScheduler(
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable
val failedStage = stageIdToStage(task.stageId)
listenerBus.post(SparkListenerStageCompleted(failedStage.info))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to just call markStageAsFinished here (instead of the two lines above)? I just wonder if doing that will help avoid future bugs along this code path.

@kayousterhout
Copy link
Contributor

It looks like now, one stage can represent multiple stage attempts (in which case Stage.numTasks is wrong for the later attempts), but there's one StageInfo per attempt, and Stage.info is reset based on which attempt is currently running? This seems a bit ugly / error prone, and it also seems problematic in the case we discussed offline where a stage can have multiple active attempts (if this case really does happen).

Did you consider changing the resubmitFailedStages() method in the DAGScheduler to create a new Stage for the failed one (and then adding a copy() method or something to Stage that creates a new one based on the current one)?

@kayousterhout
Copy link
Contributor

Also what are the semantics for accumulables for resubmitted stages? I ask because right now, the way you copy StageInfo, the values of accumulables gets wiped when a stage gets resubmitted...JW is that's the desired behavior.

@kayousterhout
Copy link
Contributor

Ok so if you're anxious to get this in, how about this simpler fix to make this a little less ugly:
(1) Change the numTasks parameter to Stage not to be a val -- so it's not saved as part of the class, since it's incorrect for later attempts. Then, change StageInfo.fromStage to always accept a number of tasks. Also update the docstring for Stage to specify that a Stage object is used across multiple stage attempts.
(2) Change the comment above Stage.info to say it's a pointer to the most recent StageInfo, and will be updated by the DAGScheduler for new stage attempts. Maybe also change the name to latestInfo so it's abundantly clear that this can be updated.
(3) Reset the info in resubmitFailedStages, rather than the current place that you have it. I think that makes it more clear what's going on / why Stage.info needs to be set.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

Tests timed out after a configured wait of 120m.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 1545 at commit 6c08b07.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

Tests timed out after a configured wait of 120m.

@pwendell
Copy link
Contributor

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 1545 at commit 6c08b07.

  • This patch merges cleanly.

@pwendell
Copy link
Contributor

Pulled this from the jenkins log

14/08/18 22:52:57.452 INFO BlockManager: Found block broadcast_13 locally
14/08/18 22:52:57.453 ERROR Executor: Exception in task 1.0 in stage 13.0 (TID 36)
org.apache.spark.TaskKilledException
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
14/08/18 22:52:57.453 WARN TaskSetManager: Lost task 1.0 in stage 13.0 (TID 36, localhost): org.apache.spark.TaskKilledException:
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
14/08/18 22:52:57.454 INFO TaskSchedulerImpl: Removed TaskSet 13.0, whose tasks have all completed, from pool
14/08/18 22:52:57.456 ERROR DAGSchedulerActorSupervisor: eventProcesserActor failed; shutting down SparkContext
java.util.NoSuchElementException: key not found: 13
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:58)
        at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:900)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1378)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/08/18 22:52:57.472 INFO SparkContext: Starting job: first at ChiSqTest.scala:81

if (isSuccessful) {
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
} else {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spacing seems off here

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 1545 at commit 0f36075.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have finished for PR 1545 at commit 0f36075.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
    • case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")
    • case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")
    • case class Params(input: String = "data/mllib/sample_binary_classification_data.txt")

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 1545 at commit b3e2eed.

  • This patch merges cleanly.

@@ -56,9 +57,15 @@ private[spark] object StageInfo {
* shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a
* sequence of narrow dependencies should also be associated with this Stage.
*/
def fromStage(stage: Stage): StageInfo = {
def fromStage(stage: Stage, numTasks: Option[Int] = None): StageInfo = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nit, but I think this method might be better as an updateStageInfo(numTasks: Int) method in Stage(), that creates an appropriate StageInfo and then sets latestInfo accordingly (since I think that would make it a little clearer to a reader what the usage of this is). Fine if you think it's better this way though...

@rxin
Copy link
Contributor Author

rxin commented Aug 20, 2014

Ok I pushed a new version that should address the hanging JobCancellationSuite test. I also went through all the changes to make sure similar problems wouldn't happen due to racing.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have started for PR 1545 at commit c414c36.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have finished for PR 1545 at commit c414c36.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

Tests timed out after a configured wait of 120m.

@pwendell
Copy link
Contributor

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have started for PR 1545 at commit 40a6bd5.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have finished for PR 1545 at commit 40a6bd5.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)

@rxin
Copy link
Contributor Author

rxin commented Aug 20, 2014

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have started for PR 1545 at commit 40a6bd5.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have finished for PR 1545 at commit 40a6bd5.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)

@pwendell
Copy link
Contributor

Jenkins retest this please.

@pwendell
Copy link
Contributor

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have started for PR 1545 at commit 40a6bd5.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have finished for PR 1545 at commit 40a6bd5.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

- Properly report stage failure in FetchFailed.
@rxin
Copy link
Contributor Author

rxin commented Aug 20, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have started for PR 1545 at commit 3ee1d2a.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 20, 2014

QA tests have finished for PR 1545 at commit 3ee1d2a.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)

@pwendell
Copy link
Contributor

Okay I did another pass on this - thanks @kayousterhout and @rxin for taking a lot of time on this. This will be a major usability improvement in the case of complex jobs that have failures.

@asfgit asfgit closed this in fb60bec Aug 20, 2014
asfgit pushed a commit that referenced this pull request Aug 20, 2014
Simple way to reproduce this in the UI:

```scala
val f = new java.io.File("/tmp/test")
f.delete()
sc.parallelize(1 to 2, 2).map(x => (x,x )).repartition(3).mapPartitionsWithContext { case (context, iter) =>
  if (context.partitionId == 0) {
    val f = new java.io.File("/tmp/test")
    if (!f.exists) {
      f.mkdir()
      System.exit(0);
    }
  }
  iter
}.count()
```

Author: Reynold Xin <[email protected]>

Closes #1545 from rxin/stage-attempt and squashes the following commits:

3ee1d2a [Reynold Xin] - Rename attempt to retry in UI. - Properly report stage failure in FetchFailed.
40a6bd5 [Reynold Xin] Updated test suites.
c414c36 [Reynold Xin] Fixed the hanging in JobCancellationSuite.
b3e2eed [Reynold Xin] Oops previous code didn't compile.
0f36075 [Reynold Xin] Mark unknown stage attempt with id -1 and drop that in JobProgressListener.
6c08b07 [Reynold Xin] Addressed code review feedback.
4e5faa2 [Reynold Xin] [SPARK-2298] Encode stage attempt in SparkListener & UI.
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
Simple way to reproduce this in the UI:

```scala
val f = new java.io.File("/tmp/test")
f.delete()
sc.parallelize(1 to 2, 2).map(x => (x,x )).repartition(3).mapPartitionsWithContext { case (context, iter) =>
  if (context.partitionId == 0) {
    val f = new java.io.File("/tmp/test")
    if (!f.exists) {
      f.mkdir()
      System.exit(0);
    }
  }
  iter
}.count()
```

Author: Reynold Xin <[email protected]>

Closes apache#1545 from rxin/stage-attempt and squashes the following commits:

3ee1d2a [Reynold Xin] - Rename attempt to retry in UI. - Properly report stage failure in FetchFailed.
40a6bd5 [Reynold Xin] Updated test suites.
c414c36 [Reynold Xin] Fixed the hanging in JobCancellationSuite.
b3e2eed [Reynold Xin] Oops previous code didn't compile.
0f36075 [Reynold Xin] Mark unknown stage attempt with id -1 and drop that in JobProgressListener.
6c08b07 [Reynold Xin] Addressed code review feedback.
4e5faa2 [Reynold Xin] [SPARK-2298] Encode stage attempt in SparkListener & UI.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants