Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into allocat…
Browse files Browse the repository at this point in the history
…eExecutors
  • Loading branch information
witgo committed Jun 9, 2014
2 parents 04c6f7e + 6113ac1 commit 056b8c7
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ private[spark] class BlockManager(
if (blockId.isShuffle) {
// Reducer may need to read many local shuffle blocks and will wrap them into Iterators
// at the beginning. The wrapping will cost some memory (compression instance
// initialization, etc.). Reducer read shuffle blocks one by one so we could do the
// initialization, etc.). Reducer reads shuffle blocks one by one so we could do the
// wrapping lazily to save memory.
class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
lazy val proxy = f
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private[spark] object UIUtils extends Logging {
<body>
<div class="navbar navbar-static-top">
<div class="navbar-inner">
<a href="/" class="brand">
<a href={prependBaseUri("/")} class="brand">
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
</a>
<ul class="nav">{header}</ul>
Expand Down
45 changes: 27 additions & 18 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def getCheckpointFile(self):
def map(self, f, preservesPartitioning=False):
"""
Return a new RDD by applying a function to each element of this RDD.
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[('a', 1), ('b', 1), ('c', 1)]
Expand Down Expand Up @@ -312,6 +312,15 @@ def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
"use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
return self.mapPartitionsWithIndex(f, preservesPartitioning)

def getNumPartitions(self):
"""
Returns the number of partitions in RDD
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> rdd.getNumPartitions()
2
"""
return self._jrdd.splits().size()

def filter(self, f):
"""
Return a new RDD containing only the elements that satisfy a predicate.
Expand Down Expand Up @@ -413,9 +422,9 @@ def union(self, other):

def intersection(self, other):
"""
Return the intersection of this RDD and another one. The output will not
Return the intersection of this RDD and another one. The output will not
contain any duplicate elements, even if the input RDDs did.
Note that this method performs a shuffle internally.
>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
Expand Down Expand Up @@ -571,14 +580,14 @@ def foreachPartition(self, f):
"""
Applies a function to each partition of this RDD.
>>> def f(iterator):
... for x in iterator:
... print x
>>> def f(iterator):
... for x in iterator:
... print x
... yield None
>>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
"""
self.mapPartitions(f).collect() # Force evaluation

def collect(self):
"""
Return a list that contains all of the elements in this RDD.
Expand Down Expand Up @@ -673,7 +682,7 @@ def func(iterator):
yield acc

return self.mapPartitions(func).fold(zeroValue, combOp)


def max(self):
"""
Expand All @@ -692,7 +701,7 @@ def min(self):
1.0
"""
return self.reduce(min)

def sum(self):
"""
Add up the elements in this RDD.
Expand Down Expand Up @@ -786,7 +795,7 @@ def mergeMaps(m1, m2):
m1[k] += v
return m1
return self.mapPartitions(countPartition).reduce(mergeMaps)

def top(self, num):
"""
Get the top N elements from a RDD.
Expand Down Expand Up @@ -814,7 +823,7 @@ def merge(a, b):
def takeOrdered(self, num, key=None):
"""
Get the N elements from a RDD ordered in ascending order or as specified
by the optional key function.
by the optional key function.
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
[1, 2, 3, 4, 5, 6]
Expand All @@ -834,7 +843,7 @@ def unKey(x, key_=None):
if key_ != None:
x = [i[1] for i in x]
return x

def merge(a, b):
return next(topNKeyedElems(a + b))
result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge)
Expand Down Expand Up @@ -1169,21 +1178,21 @@ def _mergeCombiners(iterator):
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()
return shuffled.mapPartitions(_mergeCombiners)

def foldByKey(self, zeroValue, func, numPartitions=None):
"""
Merge the values for each key using an associative function "func" and a neutral "zeroValue"
which may be added to the result an arbitrary number of times, and must not change
the result (e.g., 0 for addition, or 1 for multiplication.).
which may be added to the result an arbitrary number of times, and must not change
the result (e.g., 0 for addition, or 1 for multiplication.).
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> from operator import add
>>> rdd.foldByKey(0, add).collect()
[('a', 2), ('b', 1)]
"""
return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)


# TODO: support variant with custom partitioner
def groupByKey(self, numPartitions=None):
"""
Expand Down Expand Up @@ -1302,7 +1311,7 @@ def keyBy(self, f):
def repartition(self, numPartitions):
"""
Return a new RDD that has exactly numPartitions partitions.
Can increase or decrease the level of parallelism in this RDD. Internally, this uses
a shuffle to redistribute data.
If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
Expand Down

0 comments on commit 056b8c7

Please sign in to comment.