Skip to content

Commit

Permalink
More documentation updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jun 3, 2015
1 parent c9902fa commit f081d47
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 140 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,4 @@ local-1426633911242/*
local-1430917381534/*
DESCRIPTION
NAMESPACE
test_support/*
1 change: 1 addition & 0 deletions python/pyspark/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,5 @@ def deco(f):
__all__ = [
'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row',
'DataFrameNaFunctions', 'DataFrameStatFunctions', 'Window', 'WindowSpec',
'DataFrameReader', 'DataFrameWriter'
]
83 changes: 30 additions & 53 deletions python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ def getConf(self, key, defaultValue):
@property
@since("1.3.1")
def udf(self):
"""Returns a :class:`UDFRegistration` for UDF registration."""
"""Returns a :class:`UDFRegistration` for UDF registration.
:return: :class:`UDFRegistration`
"""
return UDFRegistration(self)

@since(1.4)
Expand All @@ -138,7 +141,7 @@ def range(self, start, end, step=1, numPartitions=None):
:param end: the end value (exclusive)
:param step: the incremental step (default: 1)
:param numPartitions: the number of partitions of the DataFrame
:return: A new DataFrame
:return: :class:`DataFrame`
>>> sqlContext.range(1, 7, 2).collect()
[Row(id=1), Row(id=3), Row(id=5)]
Expand Down Expand Up @@ -196,7 +199,7 @@ def _inferSchema(self, rdd, samplingRatio=None):
"can not infer schema")
if type(first) is dict:
warnings.warn("Using RDD of dict to inferSchema is deprecated,"
"please use pyspark.sql.Row instead")
"please use pyspark.sql.Row instead", DeprecationWarning)

if samplingRatio is None:
schema = _infer_schema(first)
Expand All @@ -219,7 +222,8 @@ def inferSchema(self, rdd, samplingRatio=None):
"""
.. note:: Deprecated in 1.3, use :func:`createDataFrame` instead.
"""
warnings.warn("inferSchema is deprecated, please use createDataFrame instead")
warnings.warn(
"inferSchema is deprecated, please use createDataFrame instead.", DeprecationWarning)

if isinstance(rdd, DataFrame):
raise TypeError("Cannot apply schema to DataFrame")
Expand All @@ -231,7 +235,8 @@ def applySchema(self, rdd, schema):
"""
.. note:: Deprecated in 1.3, use :func:`createDataFrame` instead.
"""
warnings.warn("applySchema is deprecated, please use createDataFrame instead")
warnings.warn(
"applySchema is deprecated, please use createDataFrame instead", DeprecationWarning)

if isinstance(rdd, DataFrame):
raise TypeError("Cannot apply schema to DataFrame")
Expand Down Expand Up @@ -262,6 +267,7 @@ def createDataFrame(self, data, schema=None, samplingRatio=None):
:class:`list`, or :class:`pandas.DataFrame`.
:param schema: a :class:`StructType` or list of column names. default None.
:param samplingRatio: the sample ratio of rows used for inferring
:return: :class:`DataFrame`
>>> l = [('Alice', 1)]
>>> sqlContext.createDataFrame(l).collect()
Expand Down Expand Up @@ -359,58 +365,25 @@ def registerDataFrameAsTable(self, df, tableName):
else:
raise ValueError("Can only register DataFrame as table")

@since(1.0)
def parquetFile(self, *paths):
"""Loads a Parquet file, returning the result as a :class:`DataFrame`.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> df.saveAsParquetFile(parquetFile)
>>> df2 = sqlContext.parquetFile(parquetFile)
>>> sorted(df.collect()) == sorted(df2.collect())
True
.. note:: Deprecated in 1.4, use :func:`DataFrameReader.parquet` instead.
"""
warnings.warn("parquetFile is deprecated. Use read.parquet() instead.", DeprecationWarning)
gateway = self._sc._gateway
jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths))
for i in range(0, len(paths)):
jpaths[i] = paths[i]
jdf = self._ssql_ctx.parquetFile(jpaths)
return DataFrame(jdf, self)

@since(1.0)
def jsonFile(self, path, schema=None, samplingRatio=1.0):
"""Loads a text file storing one JSON object per line as a :class:`DataFrame`.
If the schema is provided, applies the given schema to this JSON dataset.
Otherwise, it samples the dataset with ratio ``samplingRatio`` to determine the schema.
>>> import tempfile, shutil
>>> jsonFile = tempfile.mkdtemp()
>>> shutil.rmtree(jsonFile)
>>> with open(jsonFile, 'w') as f:
... f.writelines(jsonStrings)
>>> df1 = sqlContext.jsonFile(jsonFile)
>>> df1.printSchema()
root
|-- field1: long (nullable = true)
|-- field2: string (nullable = true)
|-- field3: struct (nullable = true)
| |-- field4: long (nullable = true)
>>> from pyspark.sql.types import *
>>> schema = StructType([
... StructField("field2", StringType()),
... StructField("field3",
... StructType([StructField("field5", ArrayType(IntegerType()))]))])
>>> df2 = sqlContext.jsonFile(jsonFile, schema)
>>> df2.printSchema()
root
|-- field2: string (nullable = true)
|-- field3: struct (nullable = true)
| |-- field5: array (nullable = true)
| | |-- element: integer (containsNull = true)
.. note:: Deprecated in 1.4, use :func:`DataFrameReader.json` instead.
"""
warnings.warn("jsonFile is deprecated. Use read.json() instead.", DeprecationWarning)
if schema is None:
df = self._ssql_ctx.jsonFile(path, samplingRatio)
else:
Expand Down Expand Up @@ -462,21 +435,16 @@ def func(iterator):
df = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype)
return DataFrame(df, self)

@since(1.3)
def load(self, path=None, source=None, schema=None, **options):
"""Returns the dataset in a data source as a :class:`DataFrame`.
The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
Optionally, a schema can be provided as the schema of the returned DataFrame.
.. note:: Deprecated in 1.4, use :func:`DataFrameReader.load` instead.
"""
warnings.warn("load is deprecated. Use read.load() instead.", DeprecationWarning)
return self.read.load(path, source, schema, **options)

@since(1.3)
def createExternalTable(self, tableName, path=None, source=None,
schema=None, **options):
def createExternalTable(self, tableName, path=None, source=None, schema=None, **options):
"""Creates an external table based on the dataset in a data source.
It returns the DataFrame associated with the external table.
Expand All @@ -487,6 +455,8 @@ def createExternalTable(self, tableName, path=None, source=None,
Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created external table.
:return: :class:`DataFrame`
"""
if path is not None:
options["path"] = path
Expand All @@ -508,6 +478,8 @@ def createExternalTable(self, tableName, path=None, source=None,
def sql(self, sqlQuery):
"""Returns a :class:`DataFrame` representing the result of the given query.
:return: :class:`DataFrame`
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
Expand All @@ -519,6 +491,8 @@ def sql(self, sqlQuery):
def table(self, tableName):
"""Returns the specified table as a :class:`DataFrame`.
:return: :class:`DataFrame`
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
Expand All @@ -536,6 +510,9 @@ def tables(self, dbName=None):
The returned DataFrame has two columns: ``tableName`` and ``isTemporary``
(a column with :class:`BooleanType` indicating if a table is a temporary one or not).
:param dbName: string, name of the database to use.
:return: :class:`DataFrame`
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.tables()
>>> df2.filter("tableName = 'table1'").first()
Expand All @@ -550,7 +527,8 @@ def tables(self, dbName=None):
def tableNames(self, dbName=None):
"""Returns a list of names of tables in the database ``dbName``.
If ``dbName`` is not specified, the current database will be used.
:param dbName: string, name of the database to use. Default to the current database.
:return: list of table names, in string
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlContext.tableNames()
Expand Down Expand Up @@ -585,8 +563,7 @@ def read(self):
Returns a :class:`DataFrameReader` that can be used to read data
in as a :class:`DataFrame`.
>>> sqlContext.read
<pyspark.sql.readwriter.DataFrameReader object at ...>
:return: :class:`DataFrameReader`
"""
return DataFrameReader(self)

Expand Down
69 changes: 20 additions & 49 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class DataFrame(object):
A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
and can be created using various functions in :class:`SQLContext`::
people = sqlContext.parquetFile("...")
people = sqlContext.read.parquet("...")
Once created, it can be manipulated using the various domain-specific-language
(DSL) functions defined in: :class:`DataFrame`, :class:`Column`.
Expand All @@ -56,8 +56,8 @@ class DataFrame(object):
A more concrete example::
# To create DataFrame using SQLContext
people = sqlContext.parquetFile("...")
department = sqlContext.parquetFile("...")
people = sqlContext.read.parquet("...")
department = sqlContext.read.parquet("...")
people.filter(people.age > 30).join(department, people.deptId == department.id)) \
.groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
Expand Down Expand Up @@ -120,21 +120,13 @@ def toJSON(self, use_unicode=True):
rdd = self._jdf.toJSON()
return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))

@since(1.3)
def saveAsParquetFile(self, path):
"""Saves the contents as a Parquet file, preserving the schema.
Files that are written out using this method can be read back in as
a :class:`DataFrame` using :func:`SQLContext.parquetFile`.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> df.saveAsParquetFile(parquetFile)
>>> df2 = sqlContext.parquetFile(parquetFile)
>>> sorted(df2.collect()) == sorted(df.collect())
True
.. note:: Deprecated in 1.4, use :func:`DataFrameWriter.parquet` instead.
"""
warnings.warn(
"saveAsParquetFile is deprecated. Use write.parquet() instead.", DeprecationWarning)
self._jdf.saveAsParquetFile(path)

@since(1.3)
Expand All @@ -151,69 +143,48 @@ def registerTempTable(self, name):
"""
self._jdf.registerTempTable(name)

@since(1.3)
def registerAsTable(self, name):
"""DEPRECATED: use :func:`registerTempTable` instead"""
"""
.. note:: Deprecated in 1.4, use :func:`registerTempTable` instead.
"""
warnings.warn("Use registerTempTable instead of registerAsTable.", DeprecationWarning)
self.registerTempTable(name)

@since(1.3)
def insertInto(self, tableName, overwrite=False):
"""Inserts the contents of this :class:`DataFrame` into the specified table.
Optionally overwriting any existing data.
.. note:: Deprecated in 1.4, use :func:`DataFrameWriter.insertInto` instead.
"""
warnings.warn(
"insertInto is deprecated. Use write.insertInto() instead.", DeprecationWarning)
self.write.insertInto(tableName, overwrite)

@since(1.3)
def saveAsTable(self, tableName, source=None, mode="error", **options):
"""Saves the contents of this :class:`DataFrame` to a data source as a table.
The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
Additionally, mode is used to specify the behavior of the saveAsTable operation when
table already exists in the data source. There are four modes:
* `append`: Append contents of this :class:`DataFrame` to existing data.
* `overwrite`: Overwrite existing data.
* `error`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.
.. note:: Deprecated in 1.4, use :func:`DataFrameWriter.saveAsTable` instead.
"""
warnings.warn(
"insertInto is deprecated. Use write.saveAsTable() instead.", DeprecationWarning)
self.write.saveAsTable(tableName, source, mode, **options)

@since(1.3)
def save(self, path=None, source=None, mode="error", **options):
"""Saves the contents of the :class:`DataFrame` to a data source.
The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
Additionally, mode is used to specify the behavior of the save operation when
data already exists in the data source. There are four modes:
* `append`: Append contents of this :class:`DataFrame` to existing data.
* `overwrite`: Overwrite existing data.
* `error`: Throw an exception if data already exists.
* `ignore`: Silently ignore this operation if data already exists.
.. note:: Deprecated in 1.4, use :func:`DataFrameWriter.save` instead.
"""
warnings.warn(
"insertInto is deprecated. Use write.save() instead.", DeprecationWarning)
return self.write.save(path, source, mode, **options)

@property
@since(1.4)
def write(self):
"""
Interface for saving the content of the :class:`DataFrame` out
into external storage.
:return :class:`DataFrameWriter`
.. note:: Experimental
Interface for saving the content of the :class:`DataFrame` out into external storage.
>>> df.write
<pyspark.sql.readwriter.DataFrameWriter object at ...>
:return: :class:`DataFrameWriter`
"""
return DataFrameWriter(self)

Expand Down
Loading

0 comments on commit f081d47

Please sign in to comment.