-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4109][CORE] Correctly deserialize Task.stageId #2971
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -90,19 +90,19 @@ private[spark] object ShuffleMapTask { | |
* | ||
* See [[org.apache.spark.scheduler.Task]] for more information. | ||
* | ||
* @param stageId id of the stage this task belongs to | ||
* @param _stageId id of the stage this task belongs to | ||
* @param rdd the final RDD in this stage | ||
* @param dep the ShuffleDependency | ||
* @param _partitionId index of the number in the RDD | ||
* @param locs preferred task execution locations for locality scheduling | ||
*/ | ||
private[spark] class ShuffleMapTask( | ||
stageId: Int, | ||
_stageId: Int, | ||
var rdd: RDD[_], | ||
var dep: ShuffleDependency[_,_], | ||
_partitionId: Int, | ||
@transient private var locs: Seq[TaskLocation]) | ||
extends Task[MapStatus](stageId, _partitionId) | ||
extends Task[MapStatus](_stageId, _partitionId) | ||
with Externalizable | ||
with Logging { | ||
|
||
|
@@ -128,7 +128,7 @@ private[spark] class ShuffleMapTask( | |
} | ||
|
||
override def readExternal(in: ObjectInput) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As long as you're modifying this code, mind tossing a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess I missed this case in my PR. |
||
val stageId = in.readInt() | ||
stageId = in.readInt() | ||
val numBytes = in.readInt() | ||
val bytes = new Array[Byte](numBytes) | ||
in.readFully(bytes) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question, does this need to be renamed? it's just a constructor parameter. Later the assignment to
stageId
is already setting the parent value?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't actually need to rename this. scala will handle it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renaming is better, otherwise simply by accessing stageId you could upgrade it to a field accidentally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala is smart enough to not to do that actually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually hold on - I tested it again and the compiler is not doing the right thing. Let me verify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take a look at here: https://gist.github.com/rxin/6be132f46b72c27d8f89
I think if the parent constructor declares the field as val or var, then it is fine. Otherwise it is not. So technically in this case it should be fine to not rename, but it is better to rename it explicitly just to avoid the compiler behavior change (since this might be a corner case) in the future.