diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 5d6740893ada5..66bed6bf76d77 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -103,12 +103,12 @@ def start(self): def awaitTermination(self, timeout=None): """ Wait for the execution to stop. - timeout is milliseconds + @param timeout: time to wait in milliseconds """ if timeout is None: self._jssc.awaitTermination() else: - time.sleep(timeout/1000) + self._jssc.awaitTermination(timeout) #TODO: add storageLevel def socketTextStream(self, hostname, port): diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index 2964107f2d92e..fb24858dee964 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -39,7 +39,6 @@ class PySparkStreamingTestCase(unittest.TestCase): def setUp(self): class_name = self.__class__.__name__ self.ssc = StreamingContext(appName=class_name, duration=Seconds(1)) - time.sleep(1) def tearDown(self): # Do not call pyspark.streaming.context.StreamingContext.stop directly because @@ -52,7 +51,7 @@ def tearDown(self): @classmethod def tearDownClass(cls): - time.sleep(5) + # Make sure tp shutdown the callback server SparkContext._gateway._shutdown_callback_server() @@ -436,7 +435,8 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None): # Check time out. if (current_time - start_time) > self.timeout: break - #self.ssc.awaitTermination(50) + # StreamingContext.awaitTermination is not used to wait because + # if py4j server is called every 50 milliseconds, it gets an error time.sleep(0.05) # Check if the output is the same length of expexted output. if len(expected_output) == len(result): diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 75e086134f896..c6782215fc869 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -112,8 +112,6 @@ class PythonForeachDStream( this.register() } -/* -This does not work. Ignore this for now. -TD class PythonTransformedDStream( prev: DStream[Array[Byte]], transformFunction: PythonRDDFunction @@ -131,7 +129,6 @@ class PythonTransformedDStream( val asJavaDStream = JavaDStream.fromDStream(this) } -*/ /** * This is a input stream just for the unitest. This is equivalent to a checkpointable,