From b7dab85701d65e6606b469df9a834136625d29ae Mon Sep 17 00:00:00 2001 From: giwa Date: Wed, 20 Aug 2014 18:17:42 -0700 Subject: [PATCH] improve test case --- python/pyspark/streaming/dstream.py | 2 +- python/pyspark/streaming/duration.py | 4 ++-- python/pyspark/streaming/jtime.py | 7 +++--- python/pyspark/streaming/tests.py | 8 ++++++- .../pyspark/streaming/{utils.py => util.py} | 24 ++++++++++++++++--- python/run-tests | 2 ++ 6 files changed, 36 insertions(+), 11 deletions(-) rename python/pyspark/streaming/{utils.py => util.py} (79%) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index e1fc95db09eea..f91a3b8a355d2 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -25,7 +25,7 @@ from pyspark.rdd import _JavaStackTrace from pyspark.storagelevel import StorageLevel from pyspark.resultiterable import ResultIterable -from pyspark.streaming.utils import rddToFileName, RDDFunction +from pyspark.streaming.util import rddToFileName, RDDFunction from py4j.java_collections import ListConverter, MapConverter diff --git a/python/pyspark/streaming/duration.py b/python/pyspark/streaming/duration.py index a7f1036e4b856..fa03410f3f8e2 100644 --- a/python/pyspark/streaming/duration.py +++ b/python/pyspark/streaming/duration.py @@ -15,7 +15,7 @@ # limitations under the License. # -from pyspark.streaming import utils +from pyspark.streaming import util class Duration(object): @@ -82,7 +82,7 @@ def prettyPrint(self): >>> d_1hour.prettyPrint() '1.00 h' """ - return utils.msDurationToString(self._millis) + return util.msDurationToString(self._millis) def milliseconds(self): """ diff --git a/python/pyspark/streaming/jtime.py b/python/pyspark/streaming/jtime.py index f169228e81868..02e0e0cf2d17e 100644 --- a/python/pyspark/streaming/jtime.py +++ b/python/pyspark/streaming/jtime.py @@ -15,7 +15,6 @@ # limitations under the License. # -from pyspark.streaming import utils from pyspark.streaming.duration import Duration """ @@ -87,7 +86,7 @@ def __sub__(self, other): if isinstance(other, Duration): return Time(self._millis - other._millis) elif isinstance(other, Time): - return Duration(self._mills, other._millis) + return Duration(self._millis, other._millis) else: raise TypeError @@ -99,7 +98,7 @@ def __lt__(self, other): def __le__(self, other): """ Time <= Time """ Time._is_time(other) - return self.millis <= other._millis + return self._millis <= other._millis def __eq__(self, other): """ Time == Time """ @@ -121,7 +120,7 @@ def __ge__(self, other): Time._is_time(other) return self._millis >= other._millis - def isMultipbleOf(duration): + def isMultipbleOf(self, duration): """ is multiple by Duration """ Duration._is_duration(duration) return self._millis % duration._millis == 0 diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 067b168e2b528..4af48ee8f86b4 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -28,8 +28,13 @@ """ from itertools import chain import time -import unittest import operator +import sys + +if sys.version_info[:2] <= (2, 6): + import unittest2 as unittest + else: + import unittest from pyspark.context import SparkContext from pyspark.streaming.context import StreamingContext @@ -451,3 +456,4 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None): if __name__ == "__main__": unittest.main() + SparkContext._gateway._shutdown_callback_server() diff --git a/python/pyspark/streaming/utils.py b/python/pyspark/streaming/util.py similarity index 79% rename from python/pyspark/streaming/utils.py rename to python/pyspark/streaming/util.py index 5ba179cae7f9c..651ba363957ea 100644 --- a/python/pyspark/streaming/utils.py +++ b/python/pyspark/streaming/util.py @@ -20,7 +20,8 @@ class RDDFunction(): """ - This class is for py4j callback. This + This class is for py4j callback. This class is related with + org.apache.spark.streaming.api.python.PythonRDDFunction. """ def __init__(self, ctx, jrdd_deserializer, func): self.ctx = ctx @@ -41,10 +42,19 @@ class Java: def msDurationToString(ms): - #TODO: add doctest """ Returns a human-readable string representing a duration such as "35ms" + + >> msDurationToString(10) + '10 ms' + >>> msDurationToString(1000) + '1.0 s' + >>> msDurationToString(60000) + '1.0 m' + >>> msDurationToString(3600000) + '1.00 h' """ + #TODO: add doctest second = 1000 minute = 60 * second hour = 60 * minute @@ -60,7 +70,15 @@ def msDurationToString(ms): def rddToFileName(prefix, suffix, time): - #TODO: add doctest + """ + Return string prefix-time(.suffix) + + >>> rddToFileName("spark", None, 12345678910) + 'spark-12345678910' + >>> rddToFileName("spark", "tmp", 12345678910) + 'spark-12345678910.tmp' + + """ if suffix is None: return prefix + "-" + str(time) else: diff --git a/python/run-tests b/python/run-tests index 3d00727f0ab81..ef4994d4e4b00 100755 --- a/python/run-tests +++ b/python/run-tests @@ -69,6 +69,7 @@ run_test "pyspark/broadcast.py" run_test "pyspark/accumulators.py" run_test "pyspark/serializers.py" run_test "pyspark/streaming/duration.py" +run_test "pyspark/streaming/util.py" unset PYSPARK_DOC_TEST run_test "pyspark/shuffle.py" run_test "pyspark/tests.py" @@ -81,6 +82,7 @@ run_test "pyspark/mllib/recommendation.py" run_test "pyspark/mllib/regression.py" run_test "pyspark/mllib/tests.py" run_test "pyspark/mllib/util.py" +run_test "pyspark/streaming/tests.py" if [[ $FAILED == 0 ]]; then echo -en "\033[32m" # Green