Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12837][CORE] Do not send the name of internal accumulator to executor side #17596

Closed
wants to merge 3 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Apr 10, 2017

What changes were proposed in this pull request?

When sending accumulator updates back to driver, the network overhead is pretty big as there are a lot of accumulators, e.g. TaskMetrics will send about 20 accumulators everytime, there may be a lot of SQLMetric if the query plan is complicated.

Therefore, it's critical to reduce the size of serialized accumulator. A simple way is to not send the name of internal accumulators to executor side, as it's unnecessary. When executor sends accumulator updates back to driver, we can look up the accumulator name in AccumulatorContext easily. Note that, we still need to send names of normal accumulators, as the user code run at executor side may rely on accumulator names.

In the future, we should reimplement TaskMetrics to not rely on accumulators and use custom serialization.

Tried on the example in https://issues.apache.org/jira/browse/SPARK-12837, the size of serialized accumulator has been cut down by about 40%.

How was this patch tested?

existing tests.

@cloud-fan
Copy link
Contributor Author

cc @rxin @davies @andrewor14

@SparkQA
Copy link

SparkQA commented Apr 10, 2017

Test build #75672 has finished for PR 17596 at commit 67044b3.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

package org.apache.spark.util

/**
* A simpler version of [[LongAccumulator]], which doesn't track the value count and only for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does value count mean?

@rxin
Copy link
Contributor

rxin commented Apr 10, 2017

BTW a potential, better way to solve this is to combine all the metrics into a single accumulator.

@mridulm
Copy link
Contributor

mridulm commented Apr 11, 2017

The approach I took for this was slightly different.

  • Create a bitmask indicating which accumulators are required in TaskMetrics - that is, have non zero values, and emit this first.
  • Instead of relying on default serialization, simply do custom serialization for all internal accumulators - directly emit the long's (based on the bitmask for writing/reading).
  • Encode long/int's so that they take less than 8/4 bytes (currently this is sitting inside graphx iirc - essentially same code as from kryo for optimizePositve)

For 1.6, this brought down the size from 1.6k or so average down to 200+ bytes

* internal usage.
*/
private[spark] class InternalLongAccumulator extends AccumulatorV2[Long, Long] {
private[spark] var _value = 0L
Copy link
Member

@viirya viirya Apr 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original name _sum seems better.


/**
* A simpler version of [[LongAccumulator]], which doesn't track the value count and only for
* internal usage.
Copy link
Member

@viirya viirya Apr 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be useful for Spark applications too, if they don't want the count and avg. It can be non internal.

@cloud-fan cloud-fan changed the title [SPARK-12837][SQL] reduce the serialized size of accumulator [SPARK-12837][CORE] reduce the serialized size of accumulator Apr 11, 2017
@cloud-fan
Copy link
Contributor Author

@mridulm I actually have the same plan, I think it's an overkill to implement TaskMetrics with accumulators, we don't need to merge the accumulator updates at driver side for TaskMetrics accumulators. We should send back TaskMetrics directly with hearbeat, task failure and task finish, then we can just send a bunch of longs and compress.

One thing I'm not 100% agree with you is about the bitmask. According to my experiment, most of the task metrics will not be 0, so the bitmask may not be very useful.

@mridulm
Copy link
Contributor

mridulm commented Apr 11, 2017

My comments were based on an experimental internal fix in 1.6; actually lot of values were actually observed to be 0 for a lot of cases - just a few were not (even here it is relevant - resultSize, gctime, various bytes spilled, etc). The bitmask actually ends up being a single long for the cardinality of metrics we have - which typically gets encoded in a byte or two in reality.

In addition, things like input/output/shuffle metrics, accumulator updates, block updates, etc (present in 1.6 in TaskMetrics) - can all be avoided in the serialized stream when not present.
When present, I used read/write External on those classes to directly encode the values.

IIRC this is relevant not just for the final result, but for heartbeats also - so serde saving helped a lot more than initially expected.

@cloud-fan cloud-fan changed the title [SPARK-12837][CORE] reduce the serialized size of accumulator [SPARK-12837][CORE] Do not send the accumulator name to executor side Apr 13, 2017
@cloud-fan
Copy link
Contributor Author

@mridulm ok I agree with you, we should have a bitmask. I updated this PR to only avoid sending accumulator names, which is a very simple change but cut down the size a lot. I'll work on the custom serialization of TaskMetrics later.

@SparkQA
Copy link

SparkQA commented Apr 13, 2017

Test build #75777 has finished for PR 17596 at commit ca84b12.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan force-pushed the oom branch 2 times, most recently from 7609259 to 36840ee Compare April 14, 2017 04:56
@SparkQA
Copy link

SparkQA commented Apr 14, 2017

Test build #75790 has started for PR 17596 at commit 36840ee.

@@ -490,7 +490,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

test("Fiters should be pushed down for vectorized Parquet reader at row group level") {
ignore("Fiters should be pushed down for vectorized Parquet reader at row group level") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we do not send accumulator names to executor, this test is not valid anymore. cc @viirya is there any other ways to test it?

Copy link
Member

@viirya viirya Apr 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible we can assign a custom id to an accumulator? So we can retrieve it back in executor side with the custom id.

@@ -501,16 +501,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex

Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map { case (push, func) =>
Copy link
Member

@HyukjinKwon HyukjinKwon Apr 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(BTW, let's don't forget to change map to foreach here and maybe in Filter applied on merged Parquet schema with new column should work too if we are fine)

@SparkQA
Copy link

SparkQA commented Apr 14, 2017

Test build #75789 has finished for PR 17596 at commit 7609259.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 14, 2017

Test build #75803 has finished for PR 17596 at commit d1f42e1.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -537,3 +539,27 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}

class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. This approach looks good.

@cloud-fan
Copy link
Contributor Author

seems this breaks python accumulator.... anyone know how python accumulator works?

copyAcc
} else {
this
val copyAcc = copy()
Copy link
Member

@HyukjinKwon HyukjinKwon Apr 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just took a look to help. It seems the cause here and it thows an exception as below:

>>> from pyspark.accumulators import INT_ACCUMULATOR_PARAM
>>>
>>> acc1 = sc.accumulator(0, INT_ACCUMULATOR_PARAM)
>>> sc.parallelize(xrange(100), 20).foreach(lambda x: acc1.add(x))
17/04/17 17:10:39 ERROR DAGScheduler: Failed to update accumulators for task 2
java.lang.ClassCastException: org.apache.spark.util.CollectionAccumulator cannot be cast to org.apache.spark.api.python.PythonAccumulatorV2
	at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:903)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1105)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1097)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1097)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1173)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1716)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1674)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1663)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

It looks because copy() here returns a CollectionAccumulator from PythonAccumulatorV2 and this is failed in

val otherPythonAccumulator = other.asInstanceOf[PythonAccumulatorV2]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks a lot!

@SparkQA
Copy link

SparkQA commented Apr 18, 2017

Test build #75896 has finished for PR 17596 at commit a3e1d78.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

cc @mridulm any more comments on this PR?

tmAcc.metadata = acc.metadata
tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
} else {
acc.metadata = acc.metadata.copy(name = name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think name is None here. Do we need to copy it? Or this is should be in above branch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name can be defined if the tm.nameToAccums.contains(name.get) is false. Right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right. Another question is, since name is coming from acc.name, why we copy it back to acc.metadata?

tmAcc.metadata = acc.metadata
tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
for (acc <- accums) {
val name = AccumulatorContext.get(acc.id).flatMap(_.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is here 4 times. I am wondering if we should remove name from accumulator metadata and just implement accumulator.name using this function.

@SparkQA
Copy link

SparkQA commented Apr 21, 2017

Test build #76017 has finished for PR 17596 at commit e8633f9.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AccumulatorMetadata(id: Long, countFailedValues: Boolean) extends Serializable

@SparkQA
Copy link

SparkQA commented Apr 21, 2017

Test build #76036 has finished for PR 17596 at commit 3b0c770.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AccumulatorMetadata(id: Long, countFailedValues: Boolean) extends Serializable

@cloud-fan cloud-fan changed the title [SPARK-12837][CORE] Do not send the accumulator name to executor side [SPARK-12837][CORE] Do not send the name of internal accumulator to executor side Apr 25, 2017
@SparkQA
Copy link

SparkQA commented Apr 25, 2017

Test build #76134 has finished for PR 17596 at commit 0616716.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 25, 2017

Test build #76136 has finished for PR 17596 at commit 7f41155.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 26, 2017

Test build #76157 has finished for PR 17596 at commit 7f41155.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 26, 2017

Test build #76169 has finished for PR 17596 at commit 4df95f2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 26, 2017

Test build #76173 has finished for PR 17596 at commit 4fcee4f.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 26, 2017

Test build #76178 has finished for PR 17596 at commit 4fcee4f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 27, 2017

Test build #76209 has finished for PR 17596 at commit 0f028b1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

hvanhovell commented Apr 28, 2017

LGTM - merging to master/2.2. Thanks!

asfgit pushed a commit that referenced this pull request Apr 28, 2017
…xecutor side

## What changes were proposed in this pull request?

When sending accumulator updates back to driver, the network overhead is pretty big as there are a lot of accumulators, e.g. `TaskMetrics` will send about 20 accumulators everytime, there may be a lot of `SQLMetric` if the query plan is complicated.

Therefore, it's critical to reduce the size of serialized accumulator. A simple way is to not send the name of internal accumulators to executor side, as it's unnecessary. When executor sends accumulator updates back to driver, we can look up the accumulator name in `AccumulatorContext` easily. Note that, we still need to send names of normal accumulators, as the user code run at executor side may rely on accumulator names.

In the future, we should reimplement `TaskMetrics` to not rely on accumulators and use custom serialization.

Tried on the example in https://issues.apache.org/jira/browse/SPARK-12837, the size of serialized accumulator has been cut down by about 40%.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <[email protected]>

Closes #17596 from cloud-fan/oom.

(cherry picked from commit b90bf52)
Signed-off-by: Herman van Hovell <[email protected]>
@asfgit asfgit closed this in b90bf52 Apr 28, 2017
asfgit pushed a commit that referenced this pull request May 15, 2017
…il if accumulator is garbage collected

## What changes were proposed in this pull request?

After #17596 , we do not send internal accumulator name to executor side anymore, and always look up the accumulator name in `AccumulatorContext`.

This cause a regression if the accumulator is already garbage collected, this PR fixes this by still sending accumulator name for `SQLMetrics`.

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes #17931 from cloud-fan/bug.

(cherry picked from commit e1aaab1)
Signed-off-by: Marcelo Vanzin <[email protected]>
ghost pushed a commit to dbtsai/spark that referenced this pull request May 15, 2017
…il if accumulator is garbage collected

## What changes were proposed in this pull request?

After apache#17596 , we do not send internal accumulator name to executor side anymore, and always look up the accumulator name in `AccumulatorContext`.

This cause a regression if the accumulator is already garbage collected, this PR fixes this by still sending accumulator name for `SQLMetrics`.

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes apache#17931 from cloud-fan/bug.
robert3005 pushed a commit to palantir/spark that referenced this pull request May 19, 2017
…il if accumulator is garbage collected

## What changes were proposed in this pull request?

After apache#17596 , we do not send internal accumulator name to executor side anymore, and always look up the accumulator name in `AccumulatorContext`.

This cause a regression if the accumulator is already garbage collected, this PR fixes this by still sending accumulator name for `SQLMetrics`.

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes apache#17931 from cloud-fan/bug.
liyichao pushed a commit to liyichao/spark that referenced this pull request May 24, 2017
…il if accumulator is garbage collected

## What changes were proposed in this pull request?

After apache#17596 , we do not send internal accumulator name to executor side anymore, and always look up the accumulator name in `AccumulatorContext`.

This cause a regression if the accumulator is already garbage collected, this PR fixes this by still sending accumulator name for `SQLMetrics`.

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes apache#17931 from cloud-fan/bug.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants