diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst index 80c6f02a9df41..e03379e521a07 100644 --- a/python/docs/pyspark.sql.rst +++ b/python/docs/pyspark.sql.rst @@ -16,3 +16,11 @@ pyspark.sql.types module :members: :undoc-members: :show-inheritance: + + +pyspark.sql.functions module +------------------------ +.. automodule:: pyspark.sql.functions + :members: + :undoc-members: + :show-inheritance: diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py index 0a5ba00393aab..b9ffd6945ea7e 100644 --- a/python/pyspark/sql/__init__.py +++ b/python/pyspark/sql/__init__.py @@ -34,9 +34,8 @@ from pyspark.sql.context import SQLContext, HiveContext from pyspark.sql.types import Row -from pyspark.sql.dataframe import DataFrame, GroupedData, Column, Dsl, SchemaRDD +from pyspark.sql.dataframe import DataFrame, GroupedData, Column, SchemaRDD __all__ = [ 'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row', - 'Dsl', ] diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 082f1b691b196..bb1868e8960d0 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -38,6 +38,25 @@ __all__ = ["SQLContext", "HiveContext"] +def _monkey_patch_RDD(sqlCtx): + def toDF(self, schema=None, sampleRatio=None): + """ + Convert current :class:`RDD` into a :class:`DataFrame` + + This is a shorthand for `sqlCtx.createDataFrame(rdd, schema, sampleRatio)` + + :param schema: a StructType or list of names of columns + :param samplingRatio: the sample ratio of rows used for inferring + :return: a DataFrame + + >>> rdd.toDF().collect() + [Row(name=u'Alice', age=1)] + """ + return sqlCtx.createDataFrame(self, schema, sampleRatio) + + RDD.toDF = toDF + + class SQLContext(object): """Main entry point for Spark SQL functionality. @@ -70,6 +89,7 @@ def __init__(self, sparkContext, sqlContext=None): self._jsc = self._sc._jsc self._jvm = self._sc._jvm self._scala_SQLContext = sqlContext + _monkey_patch_RDD(self) @property def _ssql_ctx(self): @@ -800,7 +820,8 @@ def _test(): Row(field1=2, field2="row2"), Row(field1=3, field2="row3")] ) - globs['df'] = sqlCtx.createDataFrame(rdd) + _monkey_patch_RDD(sqlCtx) + globs['df'] = rdd.toDF() jsonStrings = [ '{"field1": 1, "field2": "row1", "field3":{"field4":11}}', '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},' diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index b6f052ee44ae2..f650db8373535 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -21,21 +21,19 @@ import random import os from tempfile import NamedTemporaryFile -from itertools import imap from py4j.java_collections import ListConverter, MapConverter from pyspark.context import SparkContext -from pyspark.rdd import RDD, _prepare_for_python_RDD -from pyspark.serializers import BatchedSerializer, AutoBatchedSerializer, PickleSerializer, \ - UTF8Deserializer +from pyspark.rdd import RDD +from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer from pyspark.storagelevel import StorageLevel from pyspark.traceback_utils import SCCallSiteSync from pyspark.sql.types import * from pyspark.sql.types import _create_cls, _parse_datatype_json_string -__all__ = ["DataFrame", "GroupedData", "Column", "Dsl", "SchemaRDD"] +__all__ = ["DataFrame", "GroupedData", "Column", "SchemaRDD"] class DataFrame(object): @@ -310,8 +308,9 @@ def take(self, num): return self.limit(num).collect() def map(self, f): - """ Return a new RDD by applying a function to each Row, it's a - shorthand for df.rdd.map() + """ Return a new RDD by applying a function to each Row + + It's a shorthand for df.rdd.map() >>> df.map(lambda p: p.name).collect() [u'Alice', u'Bob'] @@ -586,8 +585,8 @@ def agg(self, *exprs): >>> df.agg({"age": "max"}).collect() [Row(MAX(age#0)=5)] - >>> from pyspark.sql import Dsl - >>> df.agg(Dsl.min(df.age)).collect() + >>> from pyspark.sql import functions as F + >>> df.agg(F.min(df.age)).collect() [Row(MIN(age#0)=2)] """ return self.groupBy().agg(*exprs) @@ -616,18 +615,18 @@ def subtract(self, other): """ return DataFrame(getattr(self._jdf, "except")(other._jdf), self.sql_ctx) - def addColumn(self, colName, col): + def withColumn(self, colName, col): """ Return a new :class:`DataFrame` by adding a column. - >>> df.addColumn('age2', df.age + 2).collect() + >>> df.withColumn('age2', df.age + 2).collect() [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)] """ return self.select('*', col.alias(colName)) - def renameColumn(self, existing, new): + def withColumnRenamed(self, existing, new): """ Rename an existing column to a new name - >>> df.renameColumn('age', 'age2').collect() + >>> df.withColumnRenamed('age', 'age2').collect() [Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')] """ cols = [Column(_to_java_column(c), self.sql_ctx).alias(new) @@ -689,8 +688,9 @@ def agg(self, *exprs): >>> gdf = df.groupBy(df.name) >>> gdf.agg({"age": "max"}).collect() [Row(name=u'Bob', MAX(age#0)=5), Row(name=u'Alice', MAX(age#0)=2)] - >>> from pyspark.sql import Dsl - >>> gdf.agg(Dsl.min(df.age)).collect() + + >>> from pyspark.sql import functions as F + >>> gdf.agg(F.min(df.age)).collect() [Row(MIN(age#0)=5), Row(MIN(age#0)=2)] """ assert exprs, "exprs should not be empty" @@ -742,12 +742,12 @@ def sum(self): def _create_column_from_literal(literal): sc = SparkContext._active_spark_context - return sc._jvm.Dsl.lit(literal) + return sc._jvm.functions.lit(literal) def _create_column_from_name(name): sc = SparkContext._active_spark_context - return sc._jvm.Dsl.col(name) + return sc._jvm.functions.col(name) def _to_java_column(col): @@ -767,9 +767,9 @@ def _(self): return _ -def _dsl_op(name, doc=''): +def _func_op(name, doc=''): def _(self): - jc = getattr(self._sc._jvm.Dsl, name)(self._jc) + jc = getattr(self._sc._jvm.functions, name)(self._jc) return Column(jc, self.sql_ctx) _.__doc__ = doc return _ @@ -818,7 +818,7 @@ def __init__(self, jc, sql_ctx=None): super(Column, self).__init__(jc, sql_ctx) # arithmetic operators - __neg__ = _dsl_op("negate") + __neg__ = _func_op("negate") __add__ = _bin_op("plus") __sub__ = _bin_op("minus") __mul__ = _bin_op("multiply") @@ -842,7 +842,7 @@ def __init__(self, jc, sql_ctx=None): # so use bitwise operators as boolean operators __and__ = _bin_op('and') __or__ = _bin_op('or') - __invert__ = _dsl_op('not') + __invert__ = _func_op('not') __rand__ = _bin_op("and") __ror__ = _bin_op("or") @@ -934,123 +934,6 @@ def to_pandas(self): return pd.Series(data) -def _aggregate_func(name, doc=""): - """ Create a function for aggregator by name""" - def _(col): - sc = SparkContext._active_spark_context - jc = getattr(sc._jvm.Dsl, name)(_to_java_column(col)) - return Column(jc) - _.__name__ = name - _.__doc__ = doc - return staticmethod(_) - - -class UserDefinedFunction(object): - def __init__(self, func, returnType): - self.func = func - self.returnType = returnType - self._broadcast = None - self._judf = self._create_judf() - - def _create_judf(self): - f = self.func # put it in closure `func` - func = lambda _, it: imap(lambda x: f(*x), it) - ser = AutoBatchedSerializer(PickleSerializer()) - command = (func, None, ser, ser) - sc = SparkContext._active_spark_context - pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self) - ssql_ctx = sc._jvm.SQLContext(sc._jsc.sc()) - jdt = ssql_ctx.parseDataType(self.returnType.json()) - judf = sc._jvm.UserDefinedPythonFunction(f.__name__, bytearray(pickled_command), env, - includes, sc.pythonExec, broadcast_vars, - sc._javaAccumulator, jdt) - return judf - - def __del__(self): - if self._broadcast is not None: - self._broadcast.unpersist() - self._broadcast = None - - def __call__(self, *cols): - sc = SparkContext._active_spark_context - jcols = ListConverter().convert([_to_java_column(c) for c in cols], - sc._gateway._gateway_client) - jc = self._judf.apply(sc._jvm.PythonUtils.toSeq(jcols)) - return Column(jc) - - -class Dsl(object): - """ - A collections of builtin aggregators - """ - DSLS = { - 'lit': 'Creates a :class:`Column` of literal value.', - 'col': 'Returns a :class:`Column` based on the given column name.', - 'column': 'Returns a :class:`Column` based on the given column name.', - 'upper': 'Converts a string expression to upper case.', - 'lower': 'Converts a string expression to upper case.', - 'sqrt': 'Computes the square root of the specified float value.', - 'abs': 'Computes the absolutle value.', - - 'max': 'Aggregate function: returns the maximum value of the expression in a group.', - 'min': 'Aggregate function: returns the minimum value of the expression in a group.', - 'first': 'Aggregate function: returns the first value in a group.', - 'last': 'Aggregate function: returns the last value in a group.', - 'count': 'Aggregate function: returns the number of items in a group.', - 'sum': 'Aggregate function: returns the sum of all values in the expression.', - 'avg': 'Aggregate function: returns the average of the values in a group.', - 'mean': 'Aggregate function: returns the average of the values in a group.', - 'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.', - } - - for _name, _doc in DSLS.items(): - locals()[_name] = _aggregate_func(_name, _doc) - del _name, _doc - - @staticmethod - def countDistinct(col, *cols): - """ Return a new Column for distinct count of (col, *cols) - - >>> from pyspark.sql import Dsl - >>> df.agg(Dsl.countDistinct(df.age, df.name).alias('c')).collect() - [Row(c=2)] - - >>> df.agg(Dsl.countDistinct("age", "name").alias('c')).collect() - [Row(c=2)] - """ - sc = SparkContext._active_spark_context - jcols = ListConverter().convert([_to_java_column(c) for c in cols], - sc._gateway._gateway_client) - jc = sc._jvm.Dsl.countDistinct(_to_java_column(col), - sc._jvm.PythonUtils.toSeq(jcols)) - return Column(jc) - - @staticmethod - def approxCountDistinct(col, rsd=None): - """ Return a new Column for approxiate distinct count of (col, *cols) - - >>> from pyspark.sql import Dsl - >>> df.agg(Dsl.approxCountDistinct(df.age).alias('c')).collect() - [Row(c=2)] - """ - sc = SparkContext._active_spark_context - if rsd is None: - jc = sc._jvm.Dsl.approxCountDistinct(_to_java_column(col)) - else: - jc = sc._jvm.Dsl.approxCountDistinct(_to_java_column(col), rsd) - return Column(jc) - - @staticmethod - def udf(f, returnType=StringType()): - """Create a user defined function (UDF) - - >>> slen = Dsl.udf(lambda s: len(s), IntegerType()) - >>> df.select(slen(df.name).alias('slen')).collect() - [Row(slen=5), Row(slen=3)] - """ - return UserDefinedFunction(f, returnType) - - def _test(): import doctest from pyspark.context import SparkContext diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py new file mode 100644 index 0000000000000..231de81d0488a --- /dev/null +++ b/python/pyspark/sql/functions.py @@ -0,0 +1,170 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A collections of builtin functions +""" + +from itertools import imap + +from py4j.java_collections import ListConverter + +from pyspark import SparkContext +from pyspark.rdd import _prepare_for_python_RDD +from pyspark.serializers import PickleSerializer, AutoBatchedSerializer +from pyspark.sql.types import StringType +from pyspark.sql.dataframe import Column, _to_java_column + + +__all__ = ['countDistinct', 'approxCountDistinct', 'udf'] + + +def _create_function(name, doc=""): + """ Create a function for aggregator by name""" + def _(col): + sc = SparkContext._active_spark_context + jc = getattr(sc._jvm.functions, name)(_to_java_column(col)) + return Column(jc) + _.__name__ = name + _.__doc__ = doc + return _ + + +_functions = { + 'lit': 'Creates a :class:`Column` of literal value.', + 'col': 'Returns a :class:`Column` based on the given column name.', + 'column': 'Returns a :class:`Column` based on the given column name.', + 'upper': 'Converts a string expression to upper case.', + 'lower': 'Converts a string expression to upper case.', + 'sqrt': 'Computes the square root of the specified float value.', + 'abs': 'Computes the absolutle value.', + + 'max': 'Aggregate function: returns the maximum value of the expression in a group.', + 'min': 'Aggregate function: returns the minimum value of the expression in a group.', + 'first': 'Aggregate function: returns the first value in a group.', + 'last': 'Aggregate function: returns the last value in a group.', + 'count': 'Aggregate function: returns the number of items in a group.', + 'sum': 'Aggregate function: returns the sum of all values in the expression.', + 'avg': 'Aggregate function: returns the average of the values in a group.', + 'mean': 'Aggregate function: returns the average of the values in a group.', + 'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.', +} + + +for _name, _doc in _functions.items(): + globals()[_name] = _create_function(_name, _doc) +del _name, _doc +__all__ += _functions.keys() + + +def countDistinct(col, *cols): + """ Return a new Column for distinct count of `col` or `cols` + + >>> df.agg(countDistinct(df.age, df.name).alias('c')).collect() + [Row(c=2)] + + >>> df.agg(countDistinct("age", "name").alias('c')).collect() + [Row(c=2)] + """ + sc = SparkContext._active_spark_context + jcols = ListConverter().convert([_to_java_column(c) for c in cols], sc._gateway._gateway_client) + jc = sc._jvm.functions.countDistinct(_to_java_column(col), sc._jvm.PythonUtils.toSeq(jcols)) + return Column(jc) + + +def approxCountDistinct(col, rsd=None): + """ Return a new Column for approximate distinct count of `col` + + >>> df.agg(approxCountDistinct(df.age).alias('c')).collect() + [Row(c=2)] + """ + sc = SparkContext._active_spark_context + if rsd is None: + jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col)) + else: + jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col), rsd) + return Column(jc) + + +class UserDefinedFunction(object): + """ + User defined function in Python + """ + def __init__(self, func, returnType): + self.func = func + self.returnType = returnType + self._broadcast = None + self._judf = self._create_judf() + + def _create_judf(self): + f = self.func # put it in closure `func` + func = lambda _, it: imap(lambda x: f(*x), it) + ser = AutoBatchedSerializer(PickleSerializer()) + command = (func, None, ser, ser) + sc = SparkContext._active_spark_context + pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self) + ssql_ctx = sc._jvm.SQLContext(sc._jsc.sc()) + jdt = ssql_ctx.parseDataType(self.returnType.json()) + judf = sc._jvm.UserDefinedPythonFunction(f.__name__, bytearray(pickled_command), env, + includes, sc.pythonExec, broadcast_vars, + sc._javaAccumulator, jdt) + return judf + + def __del__(self): + if self._broadcast is not None: + self._broadcast.unpersist() + self._broadcast = None + + def __call__(self, *cols): + sc = SparkContext._active_spark_context + jcols = ListConverter().convert([_to_java_column(c) for c in cols], + sc._gateway._gateway_client) + jc = self._judf.apply(sc._jvm.PythonUtils.toSeq(jcols)) + return Column(jc) + + +def udf(f, returnType=StringType()): + """Create a user defined function (UDF) + + >>> slen = udf(lambda s: len(s), IntegerType()) + >>> df.select(slen(df.name).alias('slen')).collect() + [Row(slen=5), Row(slen=3)] + """ + return UserDefinedFunction(f, returnType) + + +def _test(): + import doctest + from pyspark.context import SparkContext + from pyspark.sql import Row, SQLContext + import pyspark.sql.dataframe + globs = pyspark.sql.dataframe.__dict__.copy() + sc = SparkContext('local[4]', 'PythonTest') + globs['sc'] = sc + globs['sqlCtx'] = sqlCtx = SQLContext(sc) + rdd2 = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]) + globs['df'] = sqlCtx.inferSchema(rdd2) + (failure_count, test_count) = doctest.testmod( + pyspark.sql.dataframe, globs=globs, + optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) + globs['sc'].stop() + if failure_count: + exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 43e5c3a1b00fa..5b591c8bed1ef 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -96,7 +96,7 @@ def setUpClass(cls): cls.sqlCtx = SQLContext(cls.sc) cls.testData = [Row(key=i, value=str(i)) for i in range(100)] rdd = cls.sc.parallelize(cls.testData) - cls.df = cls.sqlCtx.createDataFrame(rdd) + cls.df = rdd.toDF() @classmethod def tearDownClass(cls): @@ -204,8 +204,7 @@ def test_infer_schema(self): def test_struct_in_map(self): d = [Row(m={Row(i=1): Row(s="")})] - rdd = self.sc.parallelize(d) - df = self.sqlCtx.createDataFrame(rdd) + df = self.sc.parallelize(d).toDF() k, v = df.head().m.items()[0] self.assertEqual(1, k.i) self.assertEqual("", v.s) @@ -213,8 +212,7 @@ def test_struct_in_map(self): def test_convert_row_to_dict(self): row = Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")}) self.assertEqual(1, row.asDict()['l'][0].a) - rdd = self.sc.parallelize([row]) - df = self.sqlCtx.createDataFrame(rdd) + df = self.sc.parallelize([row]).toDF() df.registerTempTable("test") row = self.sqlCtx.sql("select l, d from test").head() self.assertEqual(1, row.asDict()["l"][0].a) @@ -223,8 +221,7 @@ def test_convert_row_to_dict(self): def test_infer_schema_with_udt(self): from pyspark.sql.tests import ExamplePoint, ExamplePointUDT row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) - rdd = self.sc.parallelize([row]) - df = self.sqlCtx.createDataFrame(rdd) + df = self.sc.parallelize([row]).toDF() schema = df.schema() field = [f for f in schema.fields if f.name == "point"][0] self.assertEqual(type(field.dataType), ExamplePointUDT) @@ -238,15 +235,14 @@ def test_apply_schema_with_udt(self): rdd = self.sc.parallelize([row]) schema = StructType([StructField("label", DoubleType(), False), StructField("point", ExamplePointUDT(), False)]) - df = self.sqlCtx.createDataFrame(rdd, schema) + df = rdd.toDF(schema) point = df.head().point self.assertEquals(point, ExamplePoint(1.0, 2.0)) def test_parquet_with_udt(self): from pyspark.sql.tests import ExamplePoint row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) - rdd = self.sc.parallelize([row]) - df0 = self.sqlCtx.createDataFrame(rdd) + df0 = self.sc.parallelize([row]).toDF() output_dir = os.path.join(self.tempdir.name, "labeled_point") df0.saveAsParquetFile(output_dir) df1 = self.sqlCtx.parquetFile(output_dir) @@ -280,10 +276,11 @@ def test_aggregator(self): self.assertEqual([99, 100], sorted(g.agg({'key': 'max', 'value': 'count'}).collect()[0])) self.assertEqual([Row(**{"AVG(key#0)": 49.5})], g.mean().collect()) - from pyspark.sql import Dsl - self.assertEqual((0, u'99'), tuple(g.agg(Dsl.first(df.key), Dsl.last(df.value)).first())) - self.assertTrue(95 < g.agg(Dsl.approxCountDistinct(df.key)).first()[0]) - self.assertEqual(100, g.agg(Dsl.countDistinct(df.value)).first()[0]) + from pyspark.sql import functions + self.assertEqual((0, u'99'), + tuple(g.agg(functions.first(df.key), functions.last(df.value)).first())) + self.assertTrue(95 < g.agg(functions.approxCountDistinct(df.key)).first()[0]) + self.assertEqual(100, g.agg(functions.countDistinct(df.value)).first()[0]) def test_save_and_load(self): df = self.df diff --git a/python/run-tests b/python/run-tests index 077ad60d764a3..9c53d56601785 100755 --- a/python/run-tests +++ b/python/run-tests @@ -67,6 +67,7 @@ function run_sql_tests() { run_test "pyspark/sql/types.py" run_test "pyspark/sql/context.py" run_test "pyspark/sql/dataframe.py" + run_test "pyspark/sql/functions.py" run_test "pyspark/sql/tests.py" }