Skip to content

Commit

Permalink
Merge pull request apache#505 from JoshRosen/SPARK-1026
Browse files Browse the repository at this point in the history
Deprecate mapPartitionsWithSplit in PySpark (SPARK-1026)

This commit deprecates `mapPartitionsWithSplit` in PySpark (see [SPARK-1026](https://spark-project.atlassian.net/browse/SPARK-1026) and removes the remaining references to it from the docs.
  • Loading branch information
pwendell committed Jan 24, 2014
2 parents 3d6e754 + 4cebb79 commit 05be704
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
4 changes: 2 additions & 2 deletions docs/scala-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ The following tables list the transformations and actions currently supported (s
Iterator[T] => Iterator[U] when running on an RDD of type T. </td>
</tr>
<tr>
<td> <b>mapPartitionsWithSplit</b>(<i>func</i>) </td>
<td> <b>mapPartitionsWithIndex</b>(<i>func</i>) </td>
<td> Similar to mapPartitions, but also provides <i>func</i> with an integer value representing the index of
the split, so <i>func</i> must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.
the partition, so <i>func</i> must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.
</td>
</tr>
<tr>
Expand Down
25 changes: 21 additions & 4 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from threading import Thread
import warnings

from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, pack_long
Expand Down Expand Up @@ -179,7 +180,7 @@ def flatMap(self, f, preservesPartitioning=False):
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
"""
def func(s, iterator): return chain.from_iterable(imap(f, iterator))
return self.mapPartitionsWithSplit(func, preservesPartitioning)
return self.mapPartitionsWithIndex(func, preservesPartitioning)

def mapPartitions(self, f, preservesPartitioning=False):
"""
Expand All @@ -191,10 +192,24 @@ def mapPartitions(self, f, preservesPartitioning=False):
[3, 7]
"""
def func(s, iterator): return f(iterator)
return self.mapPartitionsWithSplit(func)
return self.mapPartitionsWithIndex(func)

def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
Return a new RDD by applying a function to each partition of this RDD,
while tracking the index of the original partition.
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(splitIndex, iterator): yield splitIndex
>>> rdd.mapPartitionsWithIndex(f).sum()
6
"""
return PipelinedRDD(self, f, preservesPartitioning)

def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
"""
Deprecated: use mapPartitionsWithIndex instead.
Return a new RDD by applying a function to each partition of this RDD,
while tracking the index of the original partition.
Expand All @@ -203,7 +218,9 @@ def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
>>> rdd.mapPartitionsWithSplit(f).sum()
6
"""
return PipelinedRDD(self, f, preservesPartitioning)
warnings.warn("mapPartitionsWithSplit is deprecated; "
"use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
return self.mapPartitionsWithIndex(f, preservesPartitioning)

def filter(self, f):
"""
Expand Down Expand Up @@ -235,7 +252,7 @@ def sample(self, withReplacement, fraction, seed):
>>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
"""
return self.mapPartitionsWithSplit(RDDSampler(withReplacement, fraction, seed).func, True)
return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)

# this is ported from scala/spark/RDD.scala
def takeSample(self, withReplacement, num, seed):
Expand Down

0 comments on commit 05be704

Please sign in to comment.