From d05871e912ee4828a4ac68a6a0ceed0454e44722 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 2 Oct 2014 12:46:50 -0700 Subject: [PATCH] remove reuse of PythonRDD --- python/pyspark/streaming/dstream.py | 28 ++++++------- python/pyspark/streaming/tests.py | 4 +- .../streaming/api/python/PythonDStream.scala | 39 ++++--------------- 3 files changed, 20 insertions(+), 51 deletions(-) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index a77e8f505e147..fddfd757b8674 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -286,13 +286,11 @@ def transform(self, func): `func` can have one argument of `rdd`, or have two arguments of (`time`, `rdd`) """ - resue = False if func.func_code.co_argcount == 1: - reuse = True oldfunc = func func = lambda t, rdd: oldfunc(rdd) assert func.func_code.co_argcount == 2, "func should take one or two arguments" - return TransformedDStream(self, func, reuse) + return TransformedDStream(self, func) def transformWith(self, func, other, keepSerializer=False): """ @@ -597,26 +595,23 @@ class TransformedDStream(DStream): Multiple continuous transformations of DStream can be combined into one transformation. """ - def __init__(self, prev, func, reuse=False): + def __init__(self, prev, func): ssc = prev._ssc self._ssc = ssc self.ctx = ssc._sc self._jrdd_deserializer = self.ctx.serializer self.is_cached = False self.is_checkpointed = False + self._jdstream_val = None if (isinstance(prev, TransformedDStream) and not prev.is_cached and not prev.is_checkpointed): prev_func = prev.func - old_func = func - func = lambda t, rdd: old_func(t, prev_func(t, rdd)) - reuse = reuse and prev.reuse - prev = prev.prev - - self.prev = prev - self.func = func - self.reuse = reuse - self._jdstream_val = None + self.func = lambda t, rdd: func(t, prev_func(t, rdd)) + self.prev = prev.prev + else: + self.prev = prev + self.func = func @property def _jdstream(self): @@ -624,7 +619,6 @@ def _jdstream(self): return self._jdstream_val jfunc = TransformFunction(self.ctx, self.func, self.prev._jrdd_deserializer) - jdstream = self.ctx._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), - jfunc, self.reuse).asJavaDStream() - self._jdstream_val = jdstream - return jdstream + dstream = self.ctx._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc) + self._jdstream_val = dstream.asJavaDStream() + return self._jdstream_val diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 6a7dfd574701d..a839faecf9a16 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -504,7 +504,7 @@ def setup(): return ssc cpd = tempfile.mkdtemp("test_streaming_cps") - ssc = StreamingContext.getOrCreate(cpd, setup) + self.ssc = ssc = StreamingContext.getOrCreate(cpd, setup) ssc.start() def check_output(n): @@ -539,7 +539,7 @@ def check_output(n): ssc.stop(True, True) time.sleep(1) - ssc = StreamingContext.getOrCreate(cpd, setup) + self.ssc = ssc = StreamingContext.getOrCreate(cpd, setup) ssc.start() check_output(3) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 59552bb0a2205..96b84b45b2ebf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -157,43 +157,18 @@ private[python] abstract class PythonDStream( /** * Transformed DStream in Python. - * - * If `reuse` is true and the result of the `func` is an PythonRDD, then it will cache it - * as an template for future use, this can reduce the Python callbacks. */ private[python] class PythonTransformedDStream ( parent: DStream[_], - @transient pfunc: PythonTransformFunction, - var reuse: Boolean = false) + @transient pfunc: PythonTransformFunction) extends PythonDStream(parent, pfunc) { - // rdd returned by func - var lastResult: PythonRDD = _ - override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { val rdd = parent.getOrCompute(validTime) - if (rdd.isEmpty) { - return None - } - if (reuse && lastResult != null) { - // use the previous result as the template to generate new RDD - Some(lastResult.copyTo(rdd.get)) + if (rdd.isDefined) { + func(rdd, validTime) } else { - val r = func(rdd, validTime) - if (reuse && r.isDefined && lastResult == null) { - // try to use the result as a template - r.get match { - case pyrdd: PythonRDD => - if (pyrdd.firstParent == rdd) { - // only one PythonRDD - lastResult = pyrdd - } else { - // maybe have multiple stages, don't check it anymore - reuse = false - } - } - } - r + None } } } @@ -209,10 +184,10 @@ private[python] class PythonTransformed2DStream( val func = new TransformFunction(pfunc) - override def slideDuration: Duration = parent.slideDuration - override def dependencies = List(parent, parent2) + override def slideDuration: Duration = parent.slideDuration + override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { val empty: RDD[_] = ssc.sparkContext.emptyRDD val rdd1 = parent.getOrCompute(validTime).getOrElse(empty) @@ -220,7 +195,7 @@ private[python] class PythonTransformed2DStream( func(Some(rdd1), Some(rdd2), validTime) } - val asJavaDStream = JavaDStream.fromDStream(this) + val asJavaDStream = JavaDStream.fromDStream(this) } /**