-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3627] - [yarn] - fix exit code and final status reporting to RM #2577
Conversation
QA tests have started for PR 2577 at commit
|
also note this does change everything to allow yarn to retry. previously when it hit the maximum number of executor failures it didn't retry the AM. I waffled back and forth on this one. At first the thought was that if that many executors are dying its probably an issue with the user code, but then again if you have a really long running job then I can think of situations you want it to retry. Anyone have strong opinion on that? |
QA tests have finished for PR 2577 at commit
|
Test PASSed. |
@@ -450,6 +539,15 @@ object ApplicationMaster extends Logging { | |||
|
|||
val SHUTDOWN_HOOK_PRIORITY: Int = 30 | |||
|
|||
// exit codes for different causes, no reason behind the values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use this class?
ExecutorExitCode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The application Master is not an executor so I chose not to use it. It also doesn't have the same exit reasons which could be useful if the user has an exit code and wants to know what that matches up to
Looks ok to me, although the exception handling does feel a little paranoid. :-) Just had a few nits. |
val sc = sparkContextRef.get() | ||
if (sc != null) { | ||
logInfo("Invoking sc stop from finish") | ||
sc.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm feeling a little bit weird about this call.
Feels to me like it would be better to do it after the user thread is interrupted and the user thread stops. And since we already have a shutdown hook that takes care of calling it if the user code doesn't, that it's already handled.
Is there a particular case you're thinking about here that is not covered by the current code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking it would be nicer (as far as like cleanup and such) to do the sc.stop() before the interrupt, in case the interrupt didn't end up behind handled nicely. Note that under normal exit situations this wouldn't be invoked here. Its when something else goes wrong (like max executor failures, etc).
Is there some condition you know its bad to call it?
I'll do a few more tests on it to see what happens in both cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just wondering what will be the side-effects on user code if the context is stopped before the code expects it to. In the end everything will fail anyway, but maybe telling the user code to shut down "nicely" first is better?
thanks for the review @vanzin. I've updated it. |
QA tests have started for PR 2577 at commit
|
QA tests have finished for PR 2577 at commit
|
Test PASSed. |
LGTM. Thanks! |
override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized { | ||
if (registered) { | ||
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) | ||
.asInstanceOf[FinishApplicationMasterRequest] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably don't need this cast
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this pr didn't change this code, other then wrapping it with an if. Its also going to be deprecated soon so I don't see a reason to fix it.
// spark driver should already be up since it launched us, but we don't want to | ||
// wait forever, so wait 100 seconds max to match the cluster mode setting. | ||
// Leave this config unpublished for now. | ||
val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.client.waitTries", 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This config should use camel case for applicationMaster
. Also, there's already a spark.yarn.applicationMaster.waitTries
. Does the extra client
mean it's for client mode? Do we want a separate setting for client vs deploy modes here?
By the way there is a mismatch between what is already there (spark.yarn.ApplicationMatser.waitTries
) and what we document (spark.yarn.applicationMaster.waitTries
). I think this is a bug that we can fix separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes the client was tacked on to mean it used in the client mode because the timing of the loops are different between the modes. Its an internal config right now so user shouldn't be setting. The timing is different because client mode is already up when this is launched, versus in cluster mode we are launching the user code, which takes some times (10's of seconds).
I'll file a separate jira to fix up the mismatch in doc/config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it's kind of inconsistent to use applicationMaster.client.waitTries
for client mode but applicationMaster.waitTries
for cluster mode, and the existing documentation for the latter makes no mention of cluster mode even though it's only used there. It's fine to keep the client
config here but we should make the other one applicationMaster.cluster.waitTries
in a future JIRA and deprecate the less specific one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok for this pr I'll leave it applicationMaster.waitTries and match cluster mode and I'll file a separate jira to clean it up. The documentation doesn't state how long each loop is for example. I think these would be better to just change to be a wait times versus number of tries and then they can be used for both modes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed all the review comments. |
QA tests have started for PR 2577 at commit
|
QA tests have finished for PR 2577 at commit
|
Test FAILed. |
QA tests have started for PR 2577 at commit
|
QA tests have finished for PR 2577 at commit
|
Test PASSed. |
|
||
// spark driver should already be up since it launched us, but we don't want to | ||
// wait forever, so wait 100 seconds max to match the cluster mode setting. | ||
// Leave this config unpublished for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor, but can you add SPARK-3779
to the comment so others know we're tracking this issue?
Hey @tgravescs this LGTM pending a few minor comments. |
QA tests have started for PR 2577 at commit
|
QA tests have finished for PR 2577 at commit
|
Test PASSed. |
LGTM, feel free to merge it. |
Thanks @andrewor14. I've merged this into 1.2 |
} catch { | ||
case e: InvocationTargetException => | ||
e.getCause match { | ||
case _: InterruptedException => | ||
// Reporter thread can interrupt to stop user class | ||
|
||
case e => throw e | ||
case e: Exception => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious, should this be Throwable? If my application throws an uncaught Error, shouldn't that also result in FAILED, and would it (still) do so with this change? P.S. my Scala is not that strong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was changed in a subsequent PR. Check the current code.
See the description and whats handled in the jira comment: https://issues.apache.org/jira/browse/SPARK-3627?focusedCommentId=14150013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14150013
This does not handle yarn client mode reporting of the driver to the AM. I think that should be handled when we make it an unmanaged AM.