-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3293] yarn's web show "SUCCEEDED" when the driver throw a exception in yarn-client #2311
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, | |
.asInstanceOf[YarnConfiguration] | ||
private val isDriver = args.userClass != null | ||
|
||
private var exitCode = 0 | ||
|
||
// Default to numExecutors * 2, with minimum of 3 | ||
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", | ||
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) | ||
|
@@ -95,7 +97,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, | |
if (sc != null) { | ||
logInfo("Invoking sc stop from shutdown hook") | ||
sc.stop() | ||
finish(FinalApplicationStatus.SUCCEEDED) | ||
} | ||
|
||
// Shuts down the AM. | ||
if (!finished) { | ||
finish(finalStatus) | ||
} | ||
|
||
// Cleanup the staging dir after the app is finished, or if it's the last attempt at | ||
|
@@ -123,13 +129,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, | |
} else { | ||
runExecutorLauncher(securityMgr) | ||
} | ||
|
||
if (finalStatus != FinalApplicationStatus.UNDEFINED) { | ||
finish(finalStatus) | ||
0 | ||
} else { | ||
1 | ||
} | ||
exitCode | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Feels like with your changes there's a possibility that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nevermind. I see you added the call to the shutdown hook. Still, the |
||
} | ||
|
||
final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized { | ||
|
@@ -211,7 +211,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, | |
|
||
// In client mode the actor will stop the reporter thread. | ||
reporterThread.join() | ||
finalStatus = FinalApplicationStatus.SUCCEEDED | ||
} | ||
|
||
private def launchReporterThread(): Thread = { | ||
|
@@ -386,31 +385,50 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, | |
private def startUserClass(): Thread = { | ||
logInfo("Starting the user JAR in a separate Thread") | ||
System.setProperty("spark.executor.instances", args.numExecutors.toString) | ||
var stopped = false | ||
val mainMethod = Class.forName(args.userClass, false, | ||
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) | ||
|
||
userClassThread = new Thread { | ||
override def run() { | ||
var status = FinalApplicationStatus.FAILED | ||
finalStatus = FinalApplicationStatus.FAILED | ||
try { | ||
// Copy | ||
val mainArgs = new Array[String](args.userArgs.size) | ||
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) | ||
mainMethod.invoke(null, mainArgs) | ||
// Some apps have "System.exit(0)" at the end. The user thread will stop here unless | ||
// it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED. | ||
status = FinalApplicationStatus.SUCCEEDED | ||
} catch { | ||
case e: InvocationTargetException => | ||
e.getCause match { | ||
case _: InterruptedException => | ||
// Reporter thread can interrupt to stop user class | ||
System.setSecurityManager(new java.lang.SecurityManager() { | ||
override def checkExit(paramInt: Int) { | ||
if (!stopped) { | ||
exitCode = paramInt | ||
if (exitCode == 0) { | ||
finalStatus = FinalApplicationStatus.SUCCEEDED | ||
} | ||
stopped = true | ||
} | ||
} | ||
|
||
override def checkPermission(perm: java.security.Permission): Unit = { | ||
|
||
case e => throw e | ||
} | ||
} finally { | ||
logDebug("Finishing main") | ||
finalStatus = status | ||
}) | ||
} | ||
catch { | ||
case e: SecurityException => | ||
logError("Error in setSecurityManager:", e) | ||
} | ||
|
||
Utils.tryOrExit { | ||
try { | ||
val mainArgs = new Array[String](args.userArgs.size) | ||
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) | ||
mainMethod.invoke(null, mainArgs) | ||
finalStatus = FinalApplicationStatus.SUCCEEDED | ||
} catch { | ||
case e: InvocationTargetException => | ||
e.getCause match { | ||
case _: InterruptedException => | ||
// Reporter thread can interrupt to stop user class | ||
|
||
case e => throw e | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could cause problems with YARN retry logic. We shouldn't be finishing (unregistering with RM) unless we've explicitly succeeded or failed and don't want a retry to happen in those weird cases. Since this is in shutdown hook I'm not sure we explicitly know that.
i guess it comes down to us defining when the spark app is cleanly exiting. This could be failed, killed, or succeeded. Unfortunately I'm not sure spark really has this final reporting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mateiz @pwendell is there anything in spark that we can/should use where we consider the application in a final state (either success or failure) such that we wouldn't want to retry it? On MR there is explicit states for finishing and it also has checks for the committed output. Spark I don't think is quite as straight forward. Do we have any guidance on what an application should do on error and success?
ie can we say if sc.stop() is called then it finished cleanly, or perhaps if System.exit(0) is called. Should we be peaking at the metrics or something to see if any jobs failed with the application. thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to understand, is the YARN retry logic there for resubmitting the AM? Most Spark applications don't support that, since their driver contains a lot of state. (Though I guess applications submitted in yarn-client mode might).
In general, if the user's driver is running within the AM, I would say it's successful if the driver's main() returns 0 or calls System.exit(0). If the driver is running remotely, it should tell the AM when to shut down cleanly when it closes its SparkContext or exits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the retry logic is for resubmitting the AM in the case it was on machine that went down, slow, network loss, etc..
Ok so we'll go with if it System.exit(0) or returns 0 then it succeeded and everything else for now is failure and will retry if configured to retry. If we don't want it to retry in certain cases we would need more information from the driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well my feeling is that if the driver program crashed, it's going to crash again the next time you try. Is that not the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, that depends a lot on the crash. If the host where the AM was running just dies, running it on a different host might succeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically there are a few cases:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin But this code is in the AM, isn't it? That's what I'm saying above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, unless we want to add more logic about trying to figure out what shouldn't be retried, the easy thing is to just retry on any failure. Obviously if the machine dies this code won't be running, its more that something weird happens causing it to crash or exit badly.
There are actually some potential issues with rerunning the AM though. One is what we refer to as split brain (one AM losing connection from RM but still running so it starts a second AM) and both write to the same output dir and cause issues with the output data. I filed a jira for this to try to handle in Spark AM.
The second occurs if the fist run had committed its output and we rerun it when shouldn't.
The reason we don't want that to happen is to prevent data corruption. Many times in MR one job will start once anothers output is committed, so if it was to get changed out from under them by a rerun of the AM it could lose data. I'm not sure that same kind of check is as easy with Spark.
MR handles both of those cases. Obviously if your MR job is writing to some other service or using custom fileoutput or has some other side effects its up to the user to guarantee that it can be rerun.
I'm assuming its the users responsibility with Spark since spark can rerun tasks/stages on failure. Any input on that Matei?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our
saveAs*File
operations in Spark also avoid running if the output directly already exists, so there is that support in there. But for Spark I'd make this AM rerun configurable. The reason is that some Spark apps might be accessing weird data sources, serving a UI to outside users, or just doing fairly complicate logic inside that is hard to retry. These are things that MR jobs don't do as commonly.