Skip to content

Commit

Permalink
clean up codes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and giwa committed Sep 20, 2014
1 parent 6197a11 commit 2ad7bd3
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
5 changes: 3 additions & 2 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def _sum(self):
"""
Add up the elements in this DStream.
"""
return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
pass
#return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)

def print_(self, label=None):
"""
Expand Down Expand Up @@ -152,7 +153,7 @@ def combineLocally(iterator):
else:
combiners[k] = mergeValue(combiners[k], v)
return combiners.iteritems()
locally_combined = self.mapPartitions(combineLocally)
locally_combined = self._mapPartitions(combineLocally)
shuffled = locally_combined.partitionBy(numPartitions)

def _mergeCombiners(iterator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ abstract class DStream[T: ClassTag] (
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}

//TODO move pyprint to PythonDStream and executed by py4j call back function
//TODO: move pyprint to PythonDStream and executed by py4j call back function
/**
* Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
* operator, so this PythonDStream will be registered as an output stream and there materialized.
Expand All @@ -644,6 +644,7 @@ abstract class DStream[T: ClassTag] (

// pythonExec should be passed from python. Move pyprint to PythonDStream
val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")

val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
// Call python script to deserialize and print result in stdout
val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath)
Expand Down

0 comments on commit 2ad7bd3

Please sign in to comment.