-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2521] Broadcast RDD object (instead of sending it along with every task). #1452
Conversation
* task gets a different copy of the RDD. This provides stronger isolation between tasks that | ||
* might modify state of objects referenced in their closures. | ||
*/ | ||
@transient private[spark] lazy val broadcasted = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get the type here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure :)
QA tests have started for PR 1452. This patch merges cleanly. |
*/ | ||
@transient private[spark] lazy val broadcasted = { | ||
val ser = SparkEnv.get.closureSerializer.newInstance() | ||
sc.broadcast(ser.serialize(this).array()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this be compressed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes broadcast compresses it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the byte array isn't compressed. That's probably okay but it could also be a slight regression compared to the current task caches.
How does this interact with state cleanup, I guess the broadcast var becomes dereferenced when the RDD does? We may want to add some tests for that. |
* Broadcasted copy of this RDD, used to dispatch tasks to executors. Note that we broadcast | ||
* the serialized copy of the RDD and for each task we will deserialize it, which means each | ||
* task gets a different copy of the RDD. This provides stronger isolation between tasks that | ||
* might modify state of objects referenced in their closures. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should look into removing this restriction sometime but it's probably too risky to do in 1.1.
Also, this doesn't deal with the closure for an action being large. That can be done as a separate JIRA but have you looked at that? |
That was actually my main concern from the beginning with this change. From my initial observation everything does seem work. I intentionally avoided keeping references to the broadcast variables outside of RDDs, so the broadcast variable should have the same scope as the RDD itself, and thus gets cleaned up as long as we properly clean up RDDs and broadcast variables. |
Yes - actions were intentionally not broadcast for now. It makes it more complicated ... let's do that in a separate PR. (there was two TODOs related to this) |
QA tests have started for PR 1452. This patch merges cleanly. |
QA tests have started for PR 1452. This patch merges cleanly. |
* where the JobConf/Configuration object is not thread-safe. | ||
*/ | ||
@transient private[spark] lazy val broadcasted: Broadcast[Array[Byte]] = { | ||
// TODO: Warn users about very large RDDs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to add this in this patch, we can just choose a threshold
This looks good to me. It might be good for @tdas to look over the test. |
I'm interested in this for the JobConf/Configuration threadsafety issue here: https://issues.apache.org/jira/browse/SPARK-2546 but agree that should go in a separate commit after this PR is merged. |
@ash211 I created https://issues.apache.org/jira/browse/SPARK-2585 to track this. |
QA tests have started for PR 1452. This patch merges cleanly. |
Thanks Patrick! On Fri, Jul 18, 2014 at 3:07 PM, Patrick Wendell [email protected]
|
* | ||
* @param stageId id of the stage this task belongs to | ||
* @param rdd input to func | ||
* @param rddBinary broadcast version of of the serialized RDD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*of - ha!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also past tense -- broadcasted
Only a set of very minor comments, LGTM. |
Thanks for taking a look. I'm merging this one as is, and will submit a small PR to fix the issues. |
Apparently this broke the build. Reverting and will work on a fix. |
Hah, the new Spark QA messages are really confusing! Is there no timeout on the build? |
…very task). Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables. The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large. The user-facing impact of the change include: 1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations 2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput. In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently). A simple way to test this: ```scala val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a); sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count ``` Numbers on 3 r3.8xlarge instances on EC2 ``` master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s ``` Author: Reynold Xin <[email protected]> Closes apache#1452 from rxin/broadcast-task and squashes the following commits: 762e0be [Reynold Xin] Warn large broadcasts. ade6eac [Reynold Xin] Log broadcast size. c3b6f11 [Reynold Xin] Added a unit test for clean up. 754085f [Reynold Xin] Explain why broadcasting serialized copy of the task. 04b17f0 [Reynold Xin] [SPARK-2521] Broadcast RDD object once per TaskSet (instead of sending it for every task). (cherry picked from commit 7b8cd17) Signed-off-by: Reynold Xin <[email protected]>
…very task). Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables. The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large. The user-facing impact of the change include: 1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations 2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput. In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently). A simple way to test this: ```scala val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a); sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count ``` Numbers on 3 r3.8xlarge instances on EC2 ``` master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s ``` Author: Reynold Xin <[email protected]> Closes apache#1452 from rxin/broadcast-task and squashes the following commits: 762e0be [Reynold Xin] Warn large broadcasts. ade6eac [Reynold Xin] Log broadcast size. c3b6f11 [Reynold Xin] Added a unit test for clean up. 754085f [Reynold Xin] Explain why broadcasting serialized copy of the task. 04b17f0 [Reynold Xin] [SPARK-2521] Broadcast RDD object once per TaskSet (instead of sending it for every task). (cherry picked from commit 7b8cd17) Signed-off-by: Reynold Xin <[email protected]>
…very task) This is a resubmission of #1452. It was reverted because it broke the build. Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables. The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large. The user-facing impact of the change include: 1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations 2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput. In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently). A simple way to test this: ```scala val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a); sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count ``` Numbers on 3 r3.8xlarge instances on EC2 ``` master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s ``` Author: Reynold Xin <[email protected]> Closes #1498 from rxin/broadcast-task and squashes the following commits: f7364db [Reynold Xin] Code review feedback. f8535dc [Reynold Xin] Fixed the style violation. 252238d [Reynold Xin] Serialize the final task closure as well as ShuffleDependency in taskBinary. 111007d [Reynold Xin] Fix broadcast tests. 797c247 [Reynold Xin] Properly send SparkListenerStageSubmitted and SparkListenerStageCompleted. bab1d8b [Reynold Xin] Check for NotSerializableException in submitMissingTasks. cf38450 [Reynold Xin] Use TorrentBroadcastFactory. 991c002 [Reynold Xin] Use HttpBroadcast. de779f8 [Reynold Xin] Fix TaskContextSuite. cc152fc [Reynold Xin] Don't cache the RDD broadcast variable. d256b45 [Reynold Xin] Fixed unit test failures. One more to go. cae0af3 [Reynold Xin] [SPARK-2521] Broadcast RDD object (instead of sending it along with every task).
…very task). Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables. The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large. The user-facing impact of the change include: 1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations 2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput. In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently). A simple way to test this: ```scala val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a); sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count ``` Numbers on 3 r3.8xlarge instances on EC2 ``` master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s ``` Author: Reynold Xin <[email protected]> Closes apache#1452 from rxin/broadcast-task and squashes the following commits: 762e0be [Reynold Xin] Warn large broadcasts. ade6eac [Reynold Xin] Log broadcast size. c3b6f11 [Reynold Xin] Added a unit test for clean up. 754085f [Reynold Xin] Explain why broadcasting serialized copy of the task. 04b17f0 [Reynold Xin] [SPARK-2521] Broadcast RDD object once per TaskSet (instead of sending it for every task).
…very task) This is a resubmission of apache#1452. It was reverted because it broke the build. Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables. The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large. The user-facing impact of the change include: 1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations 2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput. In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently). A simple way to test this: ```scala val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a); sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count ``` Numbers on 3 r3.8xlarge instances on EC2 ``` master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s ``` Author: Reynold Xin <[email protected]> Closes apache#1498 from rxin/broadcast-task and squashes the following commits: f7364db [Reynold Xin] Code review feedback. f8535dc [Reynold Xin] Fixed the style violation. 252238d [Reynold Xin] Serialize the final task closure as well as ShuffleDependency in taskBinary. 111007d [Reynold Xin] Fix broadcast tests. 797c247 [Reynold Xin] Properly send SparkListenerStageSubmitted and SparkListenerStageCompleted. bab1d8b [Reynold Xin] Check for NotSerializableException in submitMissingTasks. cf38450 [Reynold Xin] Use TorrentBroadcastFactory. 991c002 [Reynold Xin] Use HttpBroadcast. de779f8 [Reynold Xin] Fix TaskContextSuite. cc152fc [Reynold Xin] Don't cache the RDD broadcast variable. d256b45 [Reynold Xin] Fixed unit test failures. One more to go. cae0af3 [Reynold Xin] [SPARK-2521] Broadcast RDD object (instead of sending it along with every task).
Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables.
The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large.
The user-facing impact of the change include:
In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently).
A simple way to test this:
Numbers on 3 r3.8xlarge instances on EC2