Skip to content

Commit

Permalink
SPARK-3211 .take() is OOM-prone with empty partitions
Browse files Browse the repository at this point in the history
Instead of jumping straight from 1 partition to all partitions, do exponential
growth and double the number of partitions to attempt each time instead.

Fix proposed by Paul Nepywoda

Author: Andrew Ash <[email protected]>

Closes apache#2117 from ash211/SPARK-3211 and squashes the following commits:

8b2299a [Andrew Ash] Quadruple instead of double for a minor speedup
e5f7e4d [Andrew Ash] Update comment to better reflect what we're doing
09a27f7 [Andrew Ash] Update PySpark to be less OOM-prone as well
3a156b8 [Andrew Ash] SPARK-3211 .take() is OOM-prone with empty partitions
  • Loading branch information
ash211 authored and mateiz committed Sep 6, 2014
1 parent 7ff8c45 commit ba5bcad
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1064,11 +1064,10 @@ abstract class RDD[T: ClassTag](
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the first iteration, just try all partitions next.
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
// by 50%.
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
// interpolate the number of partitions we need to try, but overestimate it by 50%.
if (buf.size == 0) {
numPartsToTry = totalParts - 1
numPartsToTry = partsScanned * 4
} else {
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
}
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1089,11 +1089,11 @@ def take(self, num):
# we actually cap it at totalParts in runJob.
numPartsToTry = 1
if partsScanned > 0:
# If we didn't find any rows after the first iteration, just
# try all partitions next. Otherwise, interpolate the number
# of partitions we need to try, but overestimate it by 50%.
# If we didn't find any rows after the previous iteration,
# quadruple and retry. Otherwise, interpolate the number of
# partitions we need to try, but overestimate it by 50%.
if len(items) == 0:
numPartsToTry = totalParts - 1
numPartsToTry = partsScanned * 4
else:
numPartsToTry = int(1.5 * num * partsScanned / len(items))

Expand Down

0 comments on commit ba5bcad

Please sign in to comment.