Skip to content

Commit

Permalink
rollback next_limit() again, the performance difference is huge:
Browse files Browse the repository at this point in the history
Given next_limit = memory_limit
$ time python pyspark/shuffle.py
real    0m20.674s
user    0m6.119s
sys 0m13.993s

Given next_limit = max(memory_limit, used_memory * 1.05)
time python pyspark/shuffle.py
real    0m0.583s
user    0m0.488s
sys 0m0.093s
  • Loading branch information
davies committed Jul 23, 2014
1 parent e74b785 commit f6bd5d6
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
6 changes: 3 additions & 3 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1234,7 +1234,7 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash):
outputSerializer = self.ctx._unbatched_serializer

limit = (_parse_memory(self.ctx._conf.get(
"spark.python.worker.memory", "512m") / 2)
"spark.python.worker.memory", "512m")) / 2)

def add_shuffle_key(split, iterator):

Expand Down Expand Up @@ -1315,8 +1315,8 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
serializer = self.ctx.serializer
spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower()
== 'true')
memory = (_parse_memory(self.ctx._conf.get(
"spark.python.worker.memory", "512m")
memory = _parse_memory(self.ctx._conf.get(
"spark.python.worker.memory", "512m"))
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)

def combineLocally(iterator):
Expand Down
15 changes: 7 additions & 8 deletions python/pyspark/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,12 @@ def _get_spill_dir(self, n):
return os.path.join(self.localdirs[n % len(self.localdirs)], str(n))

def _next_limit(self):
#"""
#Return the next memory limit. If the memory is not released
#after spilling, it will dump the data only when the used memory
#starts to increase.
#"""
#return max(self.memory_limit, get_used_memory() * 1.05)
return self.memory_limit
"""
Return the next memory limit. If the memory is not released
after spilling, it will dump the data only when the used memory
starts to increase.
"""
return max(self.memory_limit, get_used_memory() * 1.05)

def mergeValues(self, iterator):
""" Combine the items by creator and combiner """
Expand Down Expand Up @@ -363,7 +362,7 @@ def _external_items(self):
False)

# limit the total partitions
if (self.scale * self.partitions < self.TOTAL_PARTITIONS)
if (self.scale * self.partitions < self.TOTAL_PARTITIONS
and j < self.spills - 1
and get_used_memory() > hard_limit):
self.data.clear() # will read from disk again
Expand Down

0 comments on commit f6bd5d6

Please sign in to comment.