From 171edebc73c76858705bdc4ec2a5d9f5dc930a2e Mon Sep 17 00:00:00 2001 From: giwa Date: Wed, 20 Aug 2014 16:07:42 -0700 Subject: [PATCH] clean up --- python/pyspark/streaming/context.py | 10 ++--- python/pyspark/streaming/dstream.py | 28 +++++-------- python/pyspark/streaming/jtime.py | 3 +- python/pyspark/streaming/utils.py | 12 ++++-- python/pyspark/streaming_tests.py | 15 +++---- .../streaming/api/python/PythonDStream.scala | 31 ++++---------- .../api/python/PythonRDDFunction.java | 4 ++ .../api/python/PythonTransformedDStream.scala | 42 ------------------- 8 files changed, 46 insertions(+), 99 deletions(-) delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 66bed6bf76d77..f7e356319ecac 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -16,7 +16,6 @@ # import sys -import time from signal import signal, SIGTERM, SIGINT from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer @@ -143,9 +142,9 @@ def stop(self, stopSparkContext=True, stopGraceFully=False): def _testInputStream(self, test_inputs, numSlices=None): """ - This function is only for test. - This implementation is inspired by QueStream implementation. - Give list of RDD to generate DStream which contains the RDD. + This function is only for unittest. + It requires a sequence as input, and returns the i_th element at the i_th batch + under manual clock. """ test_rdds = list() test_rdd_deserializers = list() @@ -153,7 +152,8 @@ def _testInputStream(self, test_inputs, numSlices=None): test_rdd = self._sc.parallelize(test_input, numSlices) test_rdds.append(test_rdd._jrdd) test_rdd_deserializers.append(test_rdd._jrdd_deserializer) - + # All deserializer has to be the same. + # TODO: add deserializer validation jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client) jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 7834809f1cbee..caf4378a9b1b9 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -276,23 +276,6 @@ def func(iterator): yield list(iterator) return self.mapPartitions(func) - #def transform(self, func): - TD - # from utils import RDDFunction - # wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func) - # jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream - # return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW - - def _test_output(self, result): - """ - This function is only for test case. - Store data in a DStream to result to verify the result in test case - """ - def get_output(rdd, time): - taken = rdd.collect() - result.append(taken) - - self.foreachRDD(get_output) - def cache(self): """ Persist this DStream with the default storage level (C{MEMORY_ONLY_SER}). @@ -398,6 +381,17 @@ def saveAsPickleFile(rdd, time): return self.foreachRDD(saveAsPickleFile) + def _test_output(self, result): + """ + This function is only for test case. + Store data in a DStream to result to verify the result in test case + """ + def get_output(rdd, time): + collected = rdd.collect() + result.append(collected) + + self.foreachRDD(get_output) + # TODO: implement updateStateByKey # TODO: implement slice diff --git a/python/pyspark/streaming/jtime.py b/python/pyspark/streaming/jtime.py index 32ef741051283..f169228e81868 100644 --- a/python/pyspark/streaming/jtime.py +++ b/python/pyspark/streaming/jtime.py @@ -19,10 +19,11 @@ from pyspark.streaming.duration import Duration """ -The name of this file, time is not good naming for python +The name of this file, time is not a good naming for python because if we do import time when we want to use native python time package, it does not import python time package. """ +# TODO: add doctest class Time(object): diff --git a/python/pyspark/streaming/utils.py b/python/pyspark/streaming/utils.py index 9178577743e0b..5ba179cae7f9c 100644 --- a/python/pyspark/streaming/utils.py +++ b/python/pyspark/streaming/utils.py @@ -19,6 +19,9 @@ class RDDFunction(): + """ + This class is for py4j callback. This + """ def __init__(self, ctx, jrdd_deserializer, func): self.ctx = ctx self.deserializer = jrdd_deserializer @@ -38,6 +41,7 @@ class Java: def msDurationToString(ms): + #TODO: add doctest """ Returns a human-readable string representing a duration such as "35ms" """ @@ -54,8 +58,10 @@ def msDurationToString(ms): else: return "%.2f h" % (float(ms) / hour) + def rddToFileName(prefix, suffix, time): - if suffix is not None: - return prefix + "-" + str(time) + "." + suffix - else: + #TODO: add doctest + if suffix is None: return prefix + "-" + str(time) + else: + return prefix + "-" + str(time) + "." + suffix diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index fb24858dee964..f2ef45ab23ccc 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -20,9 +20,10 @@ individual modules. This file would be merged to tests.py after all functions are ready. -But for now, this file is separated due to focusing to streaming test case. +Since python API for streaming is beta, this file is separated. -Callback server seems like unstable sometimes, which cause error in test case. +Callback server is sometimes unstable sometimes, which cause error in test case. +But this is very rare case. """ from itertools import chain @@ -58,15 +59,14 @@ def tearDownClass(cls): class TestBasicOperationsSuite(PySparkStreamingTestCase): """ 2 tests for each function for batach deserializer and unbatch deserilizer because - we cannot change the deserializer after streaming process starts. + the deserializer is not changed dunamically after streaming process starts. Default numInputPartitions is 2. If the number of input element is over 3, that DStream use batach deserializer. If not, that DStream use unbatch deserializer. - Most of the operation uses UTF8 deserializer to get value from Scala. - I am wondering if these test are enough or not. - All tests input should have list of lists. This represents stream. + All tests input should have list of lists. This list represents stream. Every batch interval, the first object of list are chosen to make DStream. + e.g The first list in the list is input of the first batch. Please see the BasicTestSuits in Scala which is close to this implementation. """ def setUp(self): @@ -412,7 +412,7 @@ def _sort_result_based_on_key(self, outputs): def _run_stream(self, test_input, test_func, expected_output, numSlices=None): """ - Start stream and return the output. + Start stream and return the result. @param test_input: dataset for the test. This should be list of lists. @param test_func: wrapped test_function. This function should return PythonDstream object. @param expexted_output: expected output for this testcase. @@ -444,6 +444,7 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None): return result + class TestSaveAsFilesSuite(PySparkStreamingTestCase): def setUp(self): PySparkStreamingTestCase.setUp(self) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index c6782215fc869..37df73717c65d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -18,10 +18,8 @@ package org.apache.spark.streaming.api.python import java.io._ -import java.io.{ObjectInputStream, IOException} -import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} +import java.util.{List => JList, ArrayList => JArrayList, Map => JMap} -import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import scala.collection.JavaConversions._ @@ -56,7 +54,9 @@ class PythonDStream[T: ClassTag]( override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { parent.getOrCompute(validTime) match{ case Some(rdd) => - val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator) + // create PythonRDD to compute Python functions. + val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, + preservePartitoning, pythonExec, broadcastVars, accumulator) Some(pythonRDD.asJavaRDD.rdd) case None => None } @@ -81,8 +81,8 @@ DStream[Array[Byte]](prev.ssc){ case Some(rdd)=>Some(rdd) val pairwiseRDD = new PairwiseRDD(rdd) /* - * Since python operation is executed by Scala after StreamingContext.start. - * What PythonPairwiseDStream does is equivalent to python code in pySpark. + * Since python function is executed by Scala after StreamingContext.start. + * What PythonPairwiseDStream does is equivalent to python code in pyspark. * * with _JavaStackTrace(self.context) as st: * pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() @@ -99,6 +99,7 @@ DStream[Array[Byte]](prev.ssc){ val asJavaDStream = JavaDStream.fromDStream(this) } + class PythonForeachDStream( prev: DStream[Array[Byte]], foreachFunction: PythonRDDFunction @@ -112,29 +113,11 @@ class PythonForeachDStream( this.register() } -class PythonTransformedDStream( - prev: DStream[Array[Byte]], - transformFunction: PythonRDDFunction - ) extends DStream[Array[Byte]](prev.ssc) { - - override def dependencies = List(prev) - - override def slideDuration: Duration = prev.slideDuration - - override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { - prev.getOrCompute(validTime).map(rdd => { - transformFunction.call(rdd.toJavaRDD(), validTime.milliseconds).rdd - }) - } - - val asJavaDStream = JavaDStream.fromDStream(this) -} /** * This is a input stream just for the unitest. This is equivalent to a checkpointable, * replayable, reliable message queue like Kafka. It requires a sequence as input, and * returns the i_th element at the i_th batch under manual clock. - * This implementation is inspired by QueStream */ class PythonTestInputStream(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]]) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonRDDFunction.java b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonRDDFunction.java index 88f7036c3a05b..b46a644dacb7c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonRDDFunction.java +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonRDDFunction.java @@ -3,6 +3,10 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.streaming.Time; +/* + * Interface for py4j callback function. + * This function is called by pyspark.streaming.dstream.DStream.foreachRDD . + */ public interface PythonRDDFunction { JavaRDD call(JavaRDD rdd, long time); } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala deleted file mode 100644 index bc07e09ec6d03..0000000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - -package org.apache.spark.streaming.api.python - -import org.apache.spark.Accumulator -import org.apache.spark.api.python.PythonRDD -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.api.java.JavaDStream -import org.apache.spark.streaming.{Time, Duration} -import org.apache.spark.streaming.dstream.DStream - -import scala.reflect.ClassTag - -class PythonTransformedDStream[T: ClassTag]( - parent: DStream[T], - command: Array[Byte], - envVars: JMap[String, String], - pythonIncludes: JList[String], - preservePartitoning: Boolean, - pythonExec: String, - broadcastVars: JList[Broadcast[Array[Byte]]], - accumulator: Accumulator[JList[Array[Byte]]] - ) extends DStream[Array[Byte]](parent.ssc) { - - override def dependencies = List(parent) - - override def slideDuration: Duration = parent.slideDuration - - //pythonDStream compute - override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { - -// val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq -// parents.map(_.getOrCompute(validTime).orNull).to -// parent = parents.head.asInstanceOf[RDD] -// Some() - } - - val asJavaDStream = JavaDStream.fromDStream(this) -} - -*/