diff --git a/examples/src/main/python/streaming/test_oprations.py b/examples/src/main/python/streaming/test_oprations.py index 084902b6a2f0d..3338a766b9cc3 100644 --- a/examples/src/main/python/streaming/test_oprations.py +++ b/examples/src/main/python/streaming/test_oprations.py @@ -6,20 +6,14 @@ from pyspark.streaming.duration import * if __name__ == "__main__": - if len(sys.argv) != 3: - print >> sys.stderr, "Usage: wordcount " - exit(-1) conf = SparkConf() conf.setAppName("PythonStreamingNetworkWordCount") ssc = StreamingContext(conf=conf, duration=Seconds(1)) - lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) - words = lines.flatMap(lambda line: line.split(" ")) -# ssc.checkpoint("checkpoint") - mapped_words = words.map(lambda word: (word, 1)) - count = mapped_words.reduceByKey(add) + test_input = ssc._testInputStream([1,1,1,1]) + mapped = test_input.map(lambda x: (x, 1)) + mapped.pyprint() - count.pyprint() ssc.start() - ssc.awaitTermination() +# ssc.awaitTermination() # ssc.stop() diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 1e284b58941c4..765c4d5b96c74 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -17,6 +17,7 @@ import sys from signal import signal, SIGTERM, SIGINT +from tempfile import NamedTemporaryFile from pyspark.conf import SparkConf diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index be6da7f2aad68..a1cccac2eed4e 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -156,6 +156,7 @@ def _mergeCombiners(iterator): combiners[k] = v else: combiners[k] = mergeCombiners(combiners[k], v) + return combiners.iteritems() return shuffled.mapPartitions(_mergeCombiners)