Skip to content

Commit

Permalink
fix code style and change next_limit to memory_limit
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Jul 23, 2014
1 parent 400be01 commit e74b785
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
6 changes: 3 additions & 3 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash):
# Instead, we'll form the hash buckets in Python,
# transferring O(numPartitions) objects to Java.
# Each object is a (splitNumber, [objects]) pair.
# In order to void too huge objects, the objects are
# In order to avoid too huge objects, the objects are
# grouped into chunks.
outputSerializer = self.ctx._unbatched_serializer

Expand Down Expand Up @@ -1316,11 +1316,11 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower()
== 'true')
memory = (_parse_memory(self.ctx._conf.get(
"spark.python.worker.memory","512m")
"spark.python.worker.memory", "512m")
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)

def combineLocally(iterator):
merger = ExternalMerger(agg, memory, serializer) \
merger = ExternalMerger(agg, memory * 0.9, serializer) \
if spill else InMemoryMerger(agg)
merger.mergeValues(iterator)
return merger.iteritems()
Expand Down
29 changes: 15 additions & 14 deletions python/pyspark/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,14 @@ def _get_spill_dir(self, n):
""" Choose one directory for spill by number n """
return os.path.join(self.localdirs[n % len(self.localdirs)], str(n))

def next_limit(self):
"""
Return the next memory limit. If the memory is not released
after spilling, it will dump the data only when the used memory
starts to increase.
"""
return max(self.memory_limit, get_used_memory() * 1.05)
def _next_limit(self):
#"""
#Return the next memory limit. If the memory is not released
#after spilling, it will dump the data only when the used memory
#starts to increase.
#"""
#return max(self.memory_limit, get_used_memory() * 1.05)
return self.memory_limit

def mergeValues(self, iterator):
""" Combine the items by creator and combiner """
Expand All @@ -237,7 +238,7 @@ def mergeValues(self, iterator):
c += 1
if c % batch == 0 and get_used_memory() > self.memory_limit:
self._first_spill()
self._partitioned_mergeValues(iterator, self.next_limit())
self._partitioned_mergeValues(iterator, self._next_limit())
break

def _partition(self, key):
Expand All @@ -259,7 +260,7 @@ def _partitioned_mergeValues(self, iterator, limit=0):
c += 1
if c % batch == 0 and get_used_memory() > limit:
self._spill()
limit = self.next_limit()
limit = self._next_limit()

def mergeCombiners(self, iterator, check=True):
""" Merge (K,V) pair by mergeCombiner """
Expand All @@ -275,7 +276,7 @@ def mergeCombiners(self, iterator, check=True):
c += 1
if c % batch == 0 and get_used_memory() > self.memory_limit:
self._first_spill()
self._partitioned_mergeCombiners(iterator, self.next_limit())
self._partitioned_mergeCombiners(iterator, self._next_limit())
break

def _partitioned_mergeCombiners(self, iterator, limit=0):
Expand All @@ -291,7 +292,7 @@ def _partitioned_mergeCombiners(self, iterator, limit=0):
c += 1
if c % self.batch == 0 and get_used_memory() > limit:
self._spill()
limit = self.next_limit()
limit = self._next_limit()

def _first_spill(self):
"""
Expand Down Expand Up @@ -349,7 +350,7 @@ def _external_items(self):
assert not self.data
if any(self.pdata):
self._spill()
hard_limit = self.next_limit()
hard_limit = self._next_limit()

try:
for i in range(self.partitions):
Expand Down Expand Up @@ -406,7 +407,7 @@ def _recursive_merged_items(self, start):
m = ExternalMerger(self.agg, self.memory_limit, self.serializer,
subdirs, self.scale * self.partitions)
m.pdata = [{} for _ in range(self.partitions)]
limit = self.next_limit()
limit = self._next_limit()

for j in range(self.spills):
path = self._get_spill_dir(j)
Expand All @@ -416,7 +417,7 @@ def _recursive_merged_items(self, start):

if get_used_memory() > limit:
m._spill()
limit = self.next_limit()
limit = self._next_limit()

for v in m._external_items():
yield v
Expand Down

0 comments on commit e74b785

Please sign in to comment.