-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12837][CORE] Do not send the name of internal accumulator to executor side #17596
Conversation
Test build #75672 has finished for PR 17596 at commit
|
package org.apache.spark.util | ||
|
||
/** | ||
* A simpler version of [[LongAccumulator]], which doesn't track the value count and only for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does value count mean?
BTW a potential, better way to solve this is to combine all the metrics into a single accumulator. |
The approach I took for this was slightly different.
For 1.6, this brought down the size from 1.6k or so average down to 200+ bytes |
* internal usage. | ||
*/ | ||
private[spark] class InternalLongAccumulator extends AccumulatorV2[Long, Long] { | ||
private[spark] var _value = 0L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Original name _sum
seems better.
|
||
/** | ||
* A simpler version of [[LongAccumulator]], which doesn't track the value count and only for | ||
* internal usage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be useful for Spark applications too, if they don't want the count and avg. It can be non internal.
@mridulm I actually have the same plan, I think it's an overkill to implement TaskMetrics with accumulators, we don't need to merge the accumulator updates at driver side for TaskMetrics accumulators. We should send back TaskMetrics directly with hearbeat, task failure and task finish, then we can just send a bunch of One thing I'm not 100% agree with you is about the bitmask. According to my experiment, most of the task metrics will not be 0, so the bitmask may not be very useful. |
My comments were based on an experimental internal fix in 1.6; actually lot of values were actually observed to be 0 for a lot of cases - just a few were not (even here it is relevant - resultSize, gctime, various bytes spilled, etc). The bitmask actually ends up being a single long for the cardinality of metrics we have - which typically gets encoded in a byte or two in reality. In addition, things like input/output/shuffle metrics, accumulator updates, block updates, etc (present in 1.6 in TaskMetrics) - can all be avoided in the serialized stream when not present. IIRC this is relevant not just for the final result, but for heartbeats also - so serde saving helped a lot more than initially expected. |
@mridulm ok I agree with you, we should have a bitmask. I updated this PR to only avoid sending accumulator names, which is a very simple change but cut down the size a lot. I'll work on the custom serialization of |
Test build #75777 has finished for PR 17596 at commit
|
7609259
to
36840ee
Compare
Test build #75790 has started for PR 17596 at commit |
@@ -490,7 +490,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex | |||
} | |||
} | |||
|
|||
test("Fiters should be pushed down for vectorized Parquet reader at row group level") { | |||
ignore("Fiters should be pushed down for vectorized Parquet reader at row group level") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we do not send accumulator names to executor, this test is not valid anymore. cc @viirya is there any other ways to test it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible we can assign a custom id to an accumulator? So we can retrieve it back in executor side with the custom id.
@@ -501,16 +501,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex | |||
|
|||
Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map { case (push, func) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(BTW, let's don't forget to change map
to foreach
here and maybe in Filter applied on merged Parquet schema with new column should work
too if we are fine)
Test build #75789 has finished for PR 17596 at commit
|
Test build #75803 has finished for PR 17596 at commit
|
@@ -537,3 +539,27 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex | |||
} | |||
} | |||
} | |||
|
|||
class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh. This approach looks good.
seems this breaks python accumulator.... anyone know how python accumulator works? |
copyAcc | ||
} else { | ||
this | ||
val copyAcc = copy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just took a look to help. It seems the cause here and it thows an exception as below:
>>> from pyspark.accumulators import INT_ACCUMULATOR_PARAM
>>>
>>> acc1 = sc.accumulator(0, INT_ACCUMULATOR_PARAM)
>>> sc.parallelize(xrange(100), 20).foreach(lambda x: acc1.add(x))
17/04/17 17:10:39 ERROR DAGScheduler: Failed to update accumulators for task 2
java.lang.ClassCastException: org.apache.spark.util.CollectionAccumulator cannot be cast to org.apache.spark.api.python.PythonAccumulatorV2
at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:903)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1105)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1097)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1097)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1716)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1674)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1663)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
It looks because copy()
here returns a CollectionAccumulator
from PythonAccumulatorV2
and this is failed in
val otherPythonAccumulator = other.asInstanceOf[PythonAccumulatorV2] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks a lot!
Test build #75896 has finished for PR 17596 at commit
|
cc @mridulm any more comments on this PR? |
tmAcc.metadata = acc.metadata | ||
tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]]) | ||
} else { | ||
acc.metadata = acc.metadata.copy(name = name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think name
is None here. Do we need to copy it? Or this is should be in above branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name
can be defined if the tm.nameToAccums.contains(name.get)
is false
. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right. Another question is, since name is coming from acc.name, why we copy it back to acc.metadata?
tmAcc.metadata = acc.metadata | ||
tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]]) | ||
for (acc <- accums) { | ||
val name = AccumulatorContext.get(acc.id).flatMap(_.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is here 4 times. I am wondering if we should remove name from accumulator metadata and just implement accumulator.name
using this function.
Test build #76017 has finished for PR 17596 at commit
|
Test build #76036 has finished for PR 17596 at commit
|
Test build #76134 has finished for PR 17596 at commit
|
Test build #76136 has finished for PR 17596 at commit
|
retest this please. |
Test build #76157 has finished for PR 17596 at commit
|
Test build #76169 has finished for PR 17596 at commit
|
retest this please |
Test build #76173 has finished for PR 17596 at commit
|
Test build #76178 has finished for PR 17596 at commit
|
Test build #76209 has finished for PR 17596 at commit
|
LGTM - merging to master/2.2. Thanks! |
…xecutor side ## What changes were proposed in this pull request? When sending accumulator updates back to driver, the network overhead is pretty big as there are a lot of accumulators, e.g. `TaskMetrics` will send about 20 accumulators everytime, there may be a lot of `SQLMetric` if the query plan is complicated. Therefore, it's critical to reduce the size of serialized accumulator. A simple way is to not send the name of internal accumulators to executor side, as it's unnecessary. When executor sends accumulator updates back to driver, we can look up the accumulator name in `AccumulatorContext` easily. Note that, we still need to send names of normal accumulators, as the user code run at executor side may rely on accumulator names. In the future, we should reimplement `TaskMetrics` to not rely on accumulators and use custom serialization. Tried on the example in https://issues.apache.org/jira/browse/SPARK-12837, the size of serialized accumulator has been cut down by about 40%. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes #17596 from cloud-fan/oom. (cherry picked from commit b90bf52) Signed-off-by: Herman van Hovell <[email protected]>
…il if accumulator is garbage collected ## What changes were proposed in this pull request? After #17596 , we do not send internal accumulator name to executor side anymore, and always look up the accumulator name in `AccumulatorContext`. This cause a regression if the accumulator is already garbage collected, this PR fixes this by still sending accumulator name for `SQLMetrics`. ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes #17931 from cloud-fan/bug. (cherry picked from commit e1aaab1) Signed-off-by: Marcelo Vanzin <[email protected]>
…il if accumulator is garbage collected ## What changes were proposed in this pull request? After apache#17596 , we do not send internal accumulator name to executor side anymore, and always look up the accumulator name in `AccumulatorContext`. This cause a regression if the accumulator is already garbage collected, this PR fixes this by still sending accumulator name for `SQLMetrics`. ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes apache#17931 from cloud-fan/bug.
…il if accumulator is garbage collected ## What changes were proposed in this pull request? After apache#17596 , we do not send internal accumulator name to executor side anymore, and always look up the accumulator name in `AccumulatorContext`. This cause a regression if the accumulator is already garbage collected, this PR fixes this by still sending accumulator name for `SQLMetrics`. ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes apache#17931 from cloud-fan/bug.
…il if accumulator is garbage collected ## What changes were proposed in this pull request? After apache#17596 , we do not send internal accumulator name to executor side anymore, and always look up the accumulator name in `AccumulatorContext`. This cause a regression if the accumulator is already garbage collected, this PR fixes this by still sending accumulator name for `SQLMetrics`. ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes apache#17931 from cloud-fan/bug.
What changes were proposed in this pull request?
When sending accumulator updates back to driver, the network overhead is pretty big as there are a lot of accumulators, e.g.
TaskMetrics
will send about 20 accumulators everytime, there may be a lot ofSQLMetric
if the query plan is complicated.Therefore, it's critical to reduce the size of serialized accumulator. A simple way is to not send the name of internal accumulators to executor side, as it's unnecessary. When executor sends accumulator updates back to driver, we can look up the accumulator name in
AccumulatorContext
easily. Note that, we still need to send names of normal accumulators, as the user code run at executor side may rely on accumulator names.In the future, we should reimplement
TaskMetrics
to not rely on accumulators and use custom serialization.Tried on the example in https://issues.apache.org/jira/browse/SPARK-12837, the size of serialized accumulator has been cut down by about 40%.
How was this patch tested?
existing tests.