Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4109][CORE] Correctly deserialize Task.stageId #2971

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private[spark] object ResultTask {
*
* See [[org.apache.spark.scheduler.Task]] for more information.
*
* @param stageId id of the stage this task belongs to
* @param _stageId id of the stage this task belongs to
* @param rdd input to func
* @param func a function to apply on a partition of the RDD
* @param _partitionId index of the number in the RDD
Expand All @@ -89,13 +89,13 @@ private[spark] object ResultTask {
* input RDD's partitions).
*/
private[spark] class ResultTask[T, U](
stageId: Int,
_stageId: Int,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question, does this need to be renamed? it's just a constructor parameter. Later the assignment to stageId is already setting the parent value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't actually need to rename this. scala will handle it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renaming is better, otherwise simply by accessing stageId you could upgrade it to a field accidentally

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala is smart enough to not to do that actually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually hold on - I tested it again and the compiler is not doing the right thing. Let me verify.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at here: https://gist.github.com/rxin/6be132f46b72c27d8f89

I think if the parent constructor declares the field as val or var, then it is fine. Otherwise it is not. So technically in this case it should be fine to not rename, but it is better to rename it explicitly just to avoid the compiler behavior change (since this might be a corner case) in the future.

var rdd: RDD[T],
var func: (TaskContext, Iterator[T]) => U,
_partitionId: Int,
@transient locs: Seq[TaskLocation],
var outputId: Int)
extends Task[U](stageId, _partitionId) with Externalizable {
extends Task[U](_stageId, _partitionId) with Externalizable {

def this() = this(0, null, null, 0, null, 0)

Expand Down Expand Up @@ -134,7 +134,7 @@ private[spark] class ResultTask[T, U](
}

override def readExternal(in: ObjectInput) {
val stageId = in.readInt()
stageId = in.readInt()
val numBytes = in.readInt()
val bytes = new Array[Byte](numBytes)
in.readFully(bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,19 @@ private[spark] object ShuffleMapTask {
*
* See [[org.apache.spark.scheduler.Task]] for more information.
*
* @param stageId id of the stage this task belongs to
* @param _stageId id of the stage this task belongs to
* @param rdd the final RDD in this stage
* @param dep the ShuffleDependency
* @param _partitionId index of the number in the RDD
* @param locs preferred task execution locations for locality scheduling
*/
private[spark] class ShuffleMapTask(
stageId: Int,
_stageId: Int,
var rdd: RDD[_],
var dep: ShuffleDependency[_,_],
_partitionId: Int,
@transient private var locs: Seq[TaskLocation])
extends Task[MapStatus](stageId, _partitionId)
extends Task[MapStatus](_stageId, _partitionId)
with Externalizable
with Logging {

Expand All @@ -128,7 +128,7 @@ private[spark] class ShuffleMapTask(
}

override def readExternal(in: ObjectInput) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as you're modifying this code, mind tossing a Utils.tryOrIOException here so that any errors that occur here are reported properly? See #2932 for explanation / context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I missed this case in my PR.

val stageId = in.readInt()
stageId = in.readInt()
val numBytes = in.readInt()
val bytes = new Array[Byte](numBytes)
in.readFully(bytes)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.util.ByteBufferInputStream
* @param stageId id of the stage this task belongs to
* @param partitionId index of the number in the RDD
*/
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {
private[spark] abstract class Task[T](var stageId: Int, var partitionId: Int) extends Serializable {

final def run(attemptId: Long): T = {
context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
Expand Down