From 98c2a00250e826ac45eec7f17f820db9b6106f1d Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Fri, 18 Jul 2014 17:58:58 -0700 Subject: [PATCH] added count operation but this implementation need double check --- python/pyspark/streaming/dstream.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 5a56a3d958254..18415fcebe771 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -139,6 +139,12 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, def combineLocally(iterator): combiners = {} for x in iterator: + + #TODO for count operation make sure count implementation + # This is different from what pyspark does + if isinstance(x, int): + x = ("", x) + (k, v) = x if k not in combiners: combiners[k] = createCombiner(v)