Skip to content

Commit

Permalink
switch resultitr to resultiterable
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Apr 8, 2014
1 parent eb06216 commit 7a092a3
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
6 changes: 3 additions & 3 deletions python/pyspark/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""

from pyspark.resultitr import ResultItr
from pyspark.resultiterable import ResultIterable

def _do_python_join(rdd, other, numPartitions, dispatch):
vs = rdd.map(lambda (k, v): (k, (1, v)))
Expand Down Expand Up @@ -89,5 +89,5 @@ def dispatch(seq):
vbuf.append(v)
elif n == 2:
wbuf.append(v)
return (ResultItr(vbuf), ResultItr(wbuf))
return vs.union(ws).groupByKey(numPartitions).mapValues(lambda x : dispatch(x.__iter__()))
return (ResultIterable(vbuf), ResultIterable(wbuf))
return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch)
4 changes: 2 additions & 2 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler
from pyspark.storagelevel import StorageLevel
from pyspark.resultitr import ResultItr
from pyspark.resultiterable import ResultIterable

from py4j.java_collections import ListConverter, MapConverter

Expand Down Expand Up @@ -1134,7 +1134,7 @@ def mergeCombiners(a, b):
return a + b

return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
numPartitions).mapValues(lambda x: ResultItr(x))
numPartitions).mapValues(lambda x: ResultIterable(x))

# TODO: add tests
def flatMapValues(self, f):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
# limitations under the License.
#

__all__ = ["ResultItr"]
__all__ = ["ResultIterable"]

import collections

class ResultItr(collections.Iterable):
class ResultIterable(collections.Iterator):
"""
A special result iterable. This is used because the standard iterator can not be pickled
"""
Expand Down

0 comments on commit 7a092a3

Please sign in to comment.