diff --git a/bin/spark-submit b/bin/spark-submit
index c557311b4b20e..37e973a50b6fa 100755
--- a/bin/spark-submit
+++ b/bin/spark-submit
# paths, library paths, java options and memory early on. Otherwise, it will
# be too late by the time the driver JVM has started.
+<<<<<<< HEAD
if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]]; then
# Parse the properties file only if the special configs exist
@@ -57,6 +58,16 @@ if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FI
if [ -n "$contains_special_configs" ]; then
+# Figure out which Python executable to use
+if [[ -z "$PYSPARK_PYTHON" ]]; then
+if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then
+>>>>>>> initial commit for pySparkStreaming
exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"
diff --git a/core/pom.xml b/core/pom.xml
index 2a81f6df289c0..7eb0b48eaeebd 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,11 @@
+<<<<<<< HEAD
+ 1.0.0
+>>>>>>> initial commit for pySparkStreaming
diff --git a/examples/src/main/python/streaming/wordcount.py b/examples/src/main/python/streaming/wordcount.py
index 2426345711086..c19eb74c44ed6 100644
--- a/examples/src/main/python/streaming/wordcount.py
+++ b/examples/src/main/python/streaming/wordcount.py
@@ -1,7 +1,10 @@
import sys
from operator import add
+<<<<<<< HEAD
from pyspark.conf import SparkConf
+>>>>>>> initial commit for pySparkStreaming
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.duration import *
@@ -9,6 +12,7 @@
if len(sys.argv) != 2:
print >> sys.stderr, "Usage: wordcount "
+<<<<<<< HEAD
conf = SparkConf()
@@ -20,5 +24,17 @@
count = mapped_words.reduceByKey(add)
+ ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
+ lines = ssc.textFileStream(sys.argv[1])
+ fm_lines = lines.flatMap(lambda x: x.split(" "))
+ filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
+ mapped_lines = fm_lines.map(lambda x: (x, 1))
+ fm_lines.pyprint()
+ filtered_lines.pyprint()
+ mapped_lines.pyprint()
+>>>>>>> initial commit for pySparkStreaming
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index f3c6d231ab777..9b18696213691 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -108,10 +108,16 @@ def run(self):
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
+<<<<<<< HEAD
java_import(gateway.jvm, "org.apache.spark.streaming.*") # do we need this?
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*") # do we need this?
+ java_import(gateway.jvm, "org.apache.spark.streaming.*")
+ java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
+ java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
+>>>>>>> initial commit for pySparkStreaming
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
diff --git a/python/pyspark/streaming/__init__.py b/python/pyspark/streaming/__init__.py
index e69de29bb2d1d..719592912e80c 100644
--- a/python/pyspark/streaming/__init__.py
+++ b/python/pyspark/streaming/__init__.py
@@ -0,0 +1 @@
+__author__ = 'ktakagiw'
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index 3f455a3e06072..e65d2243ca7d5 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -1,3 +1,9 @@
+<<<<<<< HEAD
+__author__ = 'ktakagiw'
+>>>>>>> initial commit for pySparkStreaming
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -15,6 +21,7 @@
# limitations under the License.
+<<<<<<< HEAD
import sys
from signal import signal, SIGTERM, SIGINT
@@ -29,12 +36,43 @@ class StreamingContext(object):
Main entry point for Spark Streaming functionality. A StreamingContext represents the
connection to a Spark cluster, and can be used to create L{DStream}s and
+import os
+import shutil
+import sys
+from threading import Lock
+from tempfile import NamedTemporaryFile
+from pyspark import accumulators
+from pyspark.accumulators import Accumulator
+from pyspark.broadcast import Broadcast
+from pyspark.conf import SparkConf
+from pyspark.files import SparkFiles
+from pyspark.java_gateway import launch_gateway
+from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
+from pyspark.storagelevel import StorageLevel
+from pyspark.rdd import RDD
+from pyspark.context import SparkContext
+from py4j.java_collections import ListConverter
+from pyspark.streaming.dstream import DStream
+class StreamingContext(object):
+ """
+ Main entry point for Spark functionality. A StreamingContext represents the
+ connection to a Spark cluster, and can be used to create L{RDD}s and
+>>>>>>> initial commit for pySparkStreaming
broadcast variables on that cluster.
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
+<<<<<<< HEAD
gateway=None, sparkContext=None, duration=None):
+ gateway=None, duration=None):
+>>>>>>> initial commit for pySparkStreaming
Create a new StreamingContext. At least the master and app name and duration
should be set, either through the named parameters here or through C{conf}.
@@ -55,6 +93,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
@param conf: A L{SparkConf} object setting Spark properties.
@param gateway: Use an existing gateway and JVM, otherwise a new JVM
will be instatiated.
+<<<<<<< HEAD
@param sparkContext: L{SparkContext} object.
@param duration: A L{Duration} object for SparkStreaming.
@@ -73,6 +112,15 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
# is started in StreamingContext.
+ @param duration: A L{Duration} Duration for SparkStreaming
+ """
+ # Create the Python Sparkcontext
+ self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome,
+ pyFiles=pyFiles, environment=environment, batchSize=batchSize,
+ serializer=serializer, conf=conf, gateway=gateway)
+>>>>>>> initial commit for pySparkStreaming
self._jvm = self._sc._jvm
self._jssc = self._initialize_context(self._sc._jsc, duration._jduration)
@@ -80,6 +128,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
def _initialize_context(self, jspark_context, jduration):
return self._jvm.JavaStreamingContext(jspark_context, jduration)
+<<<<<<< HEAD
def _clean_up_trigger(self):
"""Kill py4j callback server properly using signal lib"""
@@ -156,3 +205,53 @@ def _testInputStream(self, test_inputs, numSlices=None):
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()
return DStream(jinput_stream, self, test_rdd_deserializers[0])
+ def actorStream(self, props, name, storageLevel, supervisorStrategy):
+ raise NotImplementedError
+ def addStreamingListener(self, streamingListener):
+ raise NotImplementedError
+ def awaitTermination(self, timeout=None):
+ if timeout:
+ self._jssc.awaitTermination(timeout)
+ else:
+ self._jssc.awaitTermination()
+ def checkpoint(self, directory):
+ raise NotImplementedError
+ def fileStream(self, directory, filter=None, newFilesOnly=None):
+ raise NotImplementedError
+ def networkStream(self, receiver):
+ raise NotImplementedError
+ def queueStream(self, queue, oneAtATime=True, defaultRDD=None):
+ raise NotImplementedError
+ def rawSocketStream(self, hostname, port, storagelevel):
+ raise NotImplementedError
+ def remember(self, duration):
+ raise NotImplementedError
+ def socketStream(hostname, port, converter,storageLevel):
+ raise NotImplementedError
+ def start(self):
+ self._jssc.start()
+ def stop(self, stopSparkContext=True):
+ raise NotImplementedError
+ def textFileStream(self, directory):
+ return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
+ def transform(self, seq):
+ raise NotImplementedError
+ def union(self, seq):
+ raise NotImplementedError
+>>>>>>> initial commit for pySparkStreaming
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index 5fa30f6d89fbd..eeb6b5644d1d3 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -1,3 +1,4 @@
+<<<<<<< HEAD
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -26,18 +27,50 @@
from pyspark.resultiterable import ResultIterable
from pyspark.streaming.utils import rddToFileName
+from base64 import standard_b64encode as b64enc
+import copy
+from collections import defaultdict
+from collections import namedtuple
+from itertools import chain, ifilter, imap
+import operator
+import os
+import sys
+import shlex
+import traceback
+from subprocess import Popen, PIPE
+from tempfile import NamedTemporaryFile
+from threading import Thread
+import warnings
+import heapq
+from random import Random
+from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
+ BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
+from pyspark.join import python_join, python_left_outer_join, \
+ python_right_outer_join, python_cogroup
+from pyspark.statcounter import StatCounter
+from pyspark.rddsampler import RDDSampler
+from pyspark.storagelevel import StorageLevel
+#from pyspark.resultiterable import ResultIterable
+from pyspark.rdd import _JavaStackTrace
+>>>>>>> initial commit for pySparkStreaming
from py4j.java_collections import ListConverter, MapConverter
__all__ = ["DStream"]
+<<<<<<< HEAD
+>>>>>>> initial commit for pySparkStreaming
class DStream(object):
def __init__(self, jdstream, ssc, jrdd_deserializer):
self._jdstream = jdstream
self._ssc = ssc
self.ctx = ssc._sc
self._jrdd_deserializer = jrdd_deserializer
+<<<<<<< HEAD
self.is_cached = False
self.is_checkpointed = False
@@ -73,12 +106,81 @@ def print_(self, label=None):
def filter(self, f):
Return a new DStream containing only the elements that satisfy predicate.
+ def generatedRDDs(self):
+ """
+ // RDDs generated, marked as private[streaming] so that testsuites can access it
+ @transient
+ """
+ pass
+ def print_(self):
+ """
+ """
+ # print is a resrved name of Python. We cannot give print to function name
+ getattr(self._jdstream, "print")()
+ def pyprint(self):
+ """
+ """
+ self._jdstream.pyprint()
+ def cache(self):
+ """
+ """
+ raise NotImplementedError
+ def checkpoint(self):
+ """
+ """
+ raise NotImplementedError
+ def compute(self, time):
+ """
+ """
+ raise NotImplementedError
+ def context(self):
+ """
+ """
+ raise NotImplementedError
+ def count(self):
+ """
+ """
+ raise NotImplementedError
+ def countByValue(self, numPartitions=None):
+ """
+ """
+ raise NotImplementedError
+ def countByValueAndWindow(self, duration, slideDuration=None):
+ """
+ """
+ raise NotImplementedError
+ def countByWindow(self, duration, slideDuration=None):
+ """
+ """
+ raise NotImplementedError
+ def dstream(self):
+ """
+ """
+ raise NotImplementedError
+ def filter(self, f):
+ """
+>>>>>>> initial commit for pySparkStreaming
def func(iterator): return ifilter(f, iterator)
return self.mapPartitions(func)
def flatMap(self, f, preservesPartitioning=False):
+<<<<<<< HEAD
Pass each value in the key-value pair DStream through flatMap function
without changing the keys: this also retains the original RDD's partition.
@@ -137,6 +239,51 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
+ """
+ def func(s, iterator): return chain.from_iterable(imap(f, iterator))
+ return self.mapPartitionsWithIndex(func, preservesPartitioning)
+ def foreachRDD(self, f, time):
+ """
+ """
+ raise NotImplementedError
+ def glom(self):
+ """
+ """
+ raise NotImplementedError
+ def map(self, f, preservesPartitioning=False):
+ """
+ """
+ def func(split, iterator): return imap(f, iterator)
+ return PipelinedDStream(self, func, preservesPartitioning)
+ def mapPartitions(self, f):
+ """
+ """
+ def func(s, iterator): return f(iterator)
+ return self.mapPartitionsWithIndex(func)
+ def perist(self, storageLevel):
+ """
+ """
+ raise NotImplementedError
+ def reduce(self, func, numPartitions=None):
+ """
+ """
+ return self._combineByKey(lambda x:x, func, func, numPartitions)
+ def _combineByKey(self, createCombiner, mergeValue, mergeCombiners,
+ numPartitions = None):
+ """
+ """
+ if numPartitions is None:
+ numPartitions = self.ctx._defaultParallelism()
+>>>>>>> initial commit for pySparkStreaming
def combineLocally(iterator):
combiners = {}
for x in iterator:
@@ -148,7 +295,10 @@ def combineLocally(iterator):
return combiners.iteritems()
locally_combined = self.mapPartitions(combineLocally)
shuffled = locally_combined.partitionBy(numPartitions)
+<<<<<<< HEAD
+>>>>>>> initial commit for pySparkStreaming
def _mergeCombiners(iterator):
combiners = {}
for (k, v) in iterator:
@@ -157,25 +307,43 @@ def _mergeCombiners(iterator):
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()
+<<<<<<< HEAD
return shuffled.mapPartitions(_mergeCombiners)
def partitionBy(self, numPartitions, partitionFunc=None):
Return a copy of the DStream partitioned using the specified partitioner.
+ return shuffled.mapPartitions(_mergeCombiners)
+ def partitionBy(self, numPartitions, partitionFunc=None):
+ """
+ Return a copy of the DStream partitioned using the specified partitioner.
+>>>>>>> initial commit for pySparkStreaming
if numPartitions is None:
numPartitions = self.ctx._defaultReducePartitions()
if partitionFunc is None:
partitionFunc = lambda x: 0 if x is None else hash(x)
+<<<<<<< HEAD
+>>>>>>> initial commit for pySparkStreaming
# Transferring O(n) objects to Java is too expensive. Instead, we'll
# form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair.
outputSerializer = self.ctx._unbatched_serializer
+<<<<<<< HEAD
def add_shuffle_key(split, iterator):
+ def add_shuffle_key(split, iterator):
+>>>>>>> initial commit for pySparkStreaming
buckets = defaultdict(list)
for (k, v) in iterator:
@@ -186,16 +354,26 @@ def add_shuffle_key(split, iterator):
keyed = PipelinedDStream(self, add_shuffle_key)
keyed._bypass_serializer = True
with _JavaStackTrace(self.ctx) as st:
+<<<<<<< HEAD
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
jdstream = self.ctx._jvm.PythonPairwiseDStream(keyed._jdstream.dstream(),
+ #JavaDStream
+ #pairRDD = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairRDD()
+ pairDStream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairDStream()
+ partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
+ id(partitionFunc))
+ jdstream = pairDStream.partitionBy(partitioner).values()
+>>>>>>> initial commit for pySparkStreaming
dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer))
# This is required so that id(partitionFunc) remains unique, even if
# partitionFunc is a lambda:
dstream._partitionFunc = partitionFunc
return dstream
+<<<<<<< HEAD
def _defaultReducePartitions(self):
Returns the default number of partitions to use during reduce tasks (e.g., groupBy).
@@ -432,6 +610,53 @@ def saveAsTextFile(rdd, time):
# TODO: implement join
# TODO: implement leftOuterJoin
# TODO: implemtnt rightOuterJoin
+ def reduceByWindow(self, reduceFunc, windowDuration, slideDuration, inReduceTunc):
+ """
+ """
+ raise NotImplementedError
+ def repartition(self, numPartitions):
+ """
+ """
+ raise NotImplementedError
+ def slice(self, fromTime, toTime):
+ """
+ """
+ raise NotImplementedError
+ def transform(self, transformFunc):
+ """
+ """
+ raise NotImplementedError
+ def transformWith(self, other, transformFunc):
+ """
+ """
+ raise NotImplementedError
+ def union(self, that):
+ """
+ """
+ raise NotImplementedError
+ def window(self, windowDuration, slideDuration=None):
+ """
+ """
+ raise NotImplementedError
+ def wrapRDD(self, rdd):
+ """
+ """
+ raise NotImplementedError
+ def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
+ return PipelinedDStream(self, f, preservesPartitioning)
+>>>>>>> initial commit for pySparkStreaming
class PipelinedDStream(DStream):
diff --git a/python/pyspark/streaming/duration.py b/python/pyspark/streaming/duration.py
index a7f1036e4b856..245c137ecfc29 100644
--- a/python/pyspark/streaming/duration.py
+++ b/python/pyspark/streaming/duration.py
@@ -1,3 +1,4 @@
+<<<<<<< HEAD
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -18,6 +19,12 @@
from pyspark.streaming import utils
+__author__ = 'ktakagiw'
+from pyspark.streaming import utils
+>>>>>>> initial commit for pySparkStreaming
class Duration(object):
Duration for Spark Streaming application. Used to set duration
@@ -43,6 +50,7 @@ def __init__(self, millis, _jvm=None):
self._jduration = _jvm.Duration(millis)
def toString(self):
+<<<<<<< HEAD
Return duration as string
@@ -63,11 +71,19 @@ def isZero(self):
>>> d_0.isZero()
+ """ Return duration as string """
+ return str(self._millis) + " ms"
+ def isZero(self):
+ """ Check if millis is zero """
+>>>>>>> initial commit for pySparkStreaming
return self._millis == 0
def prettyPrint(self):
Return a human-readable string representing a duration
+<<<<<<< HEAD
>>> d_10 = Duration(10)
>>> d_10.prettyPrint()
@@ -81,10 +97,13 @@ def prettyPrint(self):
>>> d_1hour = Duration(60 * 60 * 1000)
>>> d_1hour.prettyPrint()
'1.00 h'
+>>>>>>> initial commit for pySparkStreaming
return utils.msDurationToString(self._millis)
def milliseconds(self):
+<<<<<<< HEAD
Return millisecond
@@ -117,6 +136,17 @@ def max(self, other):
100 ms
+ """ Return millisecond """
+ return self._millis
+ def toFormattedString(self):
+ """ Return millisecond """
+ return str(self._millis)
+ def max(self, other):
+ """ Return higher Duration """
+>>>>>>> initial commit for pySparkStreaming
if self > other:
return self
@@ -124,6 +154,7 @@ def max(self, other):
return other
def min(self, other):
+<<<<<<< HEAD
Return lower Durattion
@@ -134,6 +165,9 @@ def min(self, other):
10 ms
+ """ Return lower Durattion """
+>>>>>>> initial commit for pySparkStreaming
if self < other:
return self
@@ -141,6 +175,7 @@ def min(self, other):
return other
def __str__(self):
+<<<<<<< HEAD
>>> d_10 = Duration(10)
>>> str(d_10)
@@ -159,10 +194,17 @@ def __add__(self, other):
>>> print d_110
110 ms
+ return self.toString()
+ def __add__(self, other):
+ """ Add Duration and Duration """
+>>>>>>> initial commit for pySparkStreaming
return Duration(self._millis + other._millis)
def __sub__(self, other):
+<<<<<<< HEAD
Subtract Duration by Duration
@@ -173,10 +215,14 @@ def __sub__(self, other):
90 ms
+ """ Subtract Duration by Duration """
+>>>>>>> initial commit for pySparkStreaming
return Duration(self._millis - other._millis)
def __mul__(self, other):
+<<<<<<< HEAD
Multiple Duration by Duration
@@ -187,6 +233,9 @@ def __mul__(self, other):
1000 ms
+ """ Multiple Duration by Duration """
+>>>>>>> initial commit for pySparkStreaming
return Duration(self._millis * other._millis)
@@ -194,6 +243,7 @@ def __div__(self, other):
Divide Duration by Duration
for Python 2.X
+<<<<<<< HEAD
>>> d_10 = Duration(10)
>>> d_20 = Duration(20)
@@ -201,6 +251,8 @@ def __div__(self, other):
>>> print d_2
2 ms
+>>>>>>> initial commit for pySparkStreaming
return Duration(self._millis / other._millis)
@@ -209,6 +261,7 @@ def __truediv__(self, other):
Divide Duration by Duration
for Python 3.0
+<<<<<<< HEAD
>>> d_10 = Duration(10)
>>> d_20 = Duration(20)
@@ -216,11 +269,14 @@ def __truediv__(self, other):
>>> print d_2
2 ms
+>>>>>>> initial commit for pySparkStreaming
return Duration(self._millis / other._millis)
def __floordiv__(self, other):
+<<<<<<< HEAD
Divide Duration by Duration
@@ -246,10 +302,23 @@ def __lt__(self, other):
+ """ Divide Duration by Duration """
+ Duration._is_duration(other)
+ return Duration(self._millis // other._millis)
+ def __len__(self):
+ """ Length of miilisecond in Duration """
+ return len(self._millis)
+ def __lt__(self, other):
+ """ Duration < Duration """
+>>>>>>> initial commit for pySparkStreaming
return self._millis < other._millis
def __le__(self, other):
+<<<<<<< HEAD
Duration <= Duration
@@ -277,10 +346,19 @@ def __eq__(self, other):
+ """ Duration <= Duration """
+ Duration._is_duration(other)
+ return self.millis <= other._millis
+ def __eq__(self, other):
+ """ Duration == Duration """
+>>>>>>> initial commit for pySparkStreaming
return self._millis == other._millis
def __ne__(self, other):
+<<<<<<< HEAD
Duration != Duration
@@ -293,10 +371,14 @@ def __ne__(self, other):
+ """ Duration != Duration """
+>>>>>>> initial commit for pySparkStreaming
return self._millis != other._millis
def __gt__(self, other):
+<<<<<<< HEAD
Duration > Duration
@@ -308,10 +390,14 @@ def __gt__(self, other):
+ """ Duration > Duration """
+>>>>>>> initial commit for pySparkStreaming
return self._millis > other._millis
def __ge__(self, other):
+<<<<<<< HEAD
Duration >= Duration
@@ -324,6 +410,9 @@ def __ge__(self, other):
+ """ Duration >= Duration """
+>>>>>>> initial commit for pySparkStreaming
return self._millis >= other._millis
@@ -337,12 +426,15 @@ def Milliseconds(milliseconds):
Helper function that creates instance of [[pysparkstreaming.duration]] representing
a given number of milliseconds.
+<<<<<<< HEAD
>>> milliseconds = Milliseconds(1)
>>> d_1 = Duration(1)
>>> milliseconds == d_1
+>>>>>>> initial commit for pySparkStreaming
return Duration(milliseconds)
@@ -350,6 +442,7 @@ def Seconds(seconds):
Helper function that creates instance of [[pysparkstreaming.duration]] representing
a given number of seconds.
+<<<<<<< HEAD
>>> seconds = Seconds(1)
>>> d_1sec = Duration(1000)
@@ -371,3 +464,20 @@ def Minutes(minutes):
return Duration(minutes * 60 * 1000)
+ """
+ return Duration(seconds * 1000)
+def Minites(minites):
+ """
+ Helper function that creates instance of [[pysparkstreaming.duration]] representing
+ a given number of minutes.
+ """
+ return Duration(minutes * 60000)
+if __name__ == "__main__":
+ d = Duration(1)
+ print d
+ print d.milliseconds()
+>>>>>>> initial commit for pySparkStreaming
diff --git a/python/pyspark/streaming/jtime.py b/python/pyspark/streaming/jtime.py
index 32ef741051283..9295c4ee27705 100644
--- a/python/pyspark/streaming/jtime.py
+++ b/python/pyspark/streaming/jtime.py
@@ -1,3 +1,4 @@
+<<<<<<< HEAD
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -14,10 +15,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+__author__ = 'ktakagiw'
+>>>>>>> initial commit for pySparkStreaming
from pyspark.streaming import utils
from pyspark.streaming.duration import Duration
+<<<<<<< HEAD
The name of this file, time is not good naming for python
because if we do import time when we want to use native python time package, it does
@@ -25,6 +30,8 @@
+>>>>>>> initial commit for pySparkStreaming
class Time(object):
Time for Spark Streaming application. Used to set Time
diff --git a/python/pyspark/streaming/pyprint.py b/python/pyspark/streaming/pyprint.py
index 49517b3e5c247..4beb66950d851 100644
--- a/python/pyspark/streaming/pyprint.py
+++ b/python/pyspark/streaming/pyprint.py
@@ -1,3 +1,4 @@
+<<<<<<< HEAD
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -26,17 +27,31 @@ def collect(binary_file_path):
Read pickled file written by SparkStreaming
+import sys
+from itertools import chain
+from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
+def collect(binary_file_path):
+>>>>>>> initial commit for pySparkStreaming
dse = PickleSerializer()
with open(binary_file_path, 'rb') as tempFile:
for item in dse.load_stream(tempFile):
yield item
+<<<<<<< HEAD
+>>>>>>> initial commit for pySparkStreaming
def main():
binary_file_path = sys.argv[1]
+<<<<<<< HEAD
print "Missed FilePath in argements"
+ print "Missed FilePath in argement"
+>>>>>>> initial commit for pySparkStreaming
if not binary_file_path:
@@ -49,6 +64,9 @@ def main():
print "..."
+<<<<<<< HEAD
+>>>>>>> initial commit for pySparkStreaming
if __name__ =="__main__":
diff --git a/python/pyspark/streaming/utils.py b/python/pyspark/streaming/utils.py
index 9178577743e0b..56bb0ca1e9620 100644
--- a/python/pyspark/streaming/utils.py
+++ b/python/pyspark/streaming/utils.py
@@ -1,3 +1,4 @@
+<<<<<<< HEAD
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -36,6 +37,9 @@ def __str__(self):
class Java:
implements = ['org.apache.spark.streaming.api.python.PythonRDDFunction']
+__author__ = 'ktakagiw'
+>>>>>>> initial commit for pySparkStreaming
def msDurationToString(ms):
@@ -49,6 +53,7 @@ def msDurationToString(ms):
return "%d ms" % ms
elif ms < minute:
return "%.1f s" % (float(ms) / second)
+<<<<<<< HEAD
elif ms < hour:
return "%.1f m" % (float(ms) / minute)
@@ -59,3 +64,9 @@ def rddToFileName(prefix, suffix, time):
return prefix + "-" + str(time) + "." + suffix
return prefix + "-" + str(time)
+ elif ms < hout:
+ return "%.1f m" % (float(ms) / minute)
+ else:
+ return "%.2f h" % (float(ms) / hour)
+>>>>>>> initial commit for pySparkStreaming
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 12f900c91eb98..cb38015c24622 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -21,7 +21,11 @@
+<<<<<<< HEAD
+ 1.0.0
+>>>>>>> initial commit for pySparkStreaming
@@ -77,9 +81,9 @@