Skip to content

Commit

Permalink
[SPARK-13509][SPARK-13507][SQL] Support for writing CSV with a single…
Browse files Browse the repository at this point in the history
… function call

https://issues.apache.org/jira/browse/SPARK-13507
https://issues.apache.org/jira/browse/SPARK-13509

## What changes were proposed in this pull request?
This PR adds the support to write CSV data directly by a single call to the given path.

Several unitests were added for each functionality.
## How was this patch tested?

This was tested with unittests and with `dev/run_tests` for coding style

Author: hyukjinkwon <[email protected]>
Author: Hyukjin Kwon <[email protected]>

Closes apache#11389 from HyukjinKwon/SPARK-13507-13509.
  • Loading branch information
HyukjinKwon authored and roygao94 committed Mar 22, 2016
1 parent d4f6f23 commit 6bec27e
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 10 deletions.
50 changes: 50 additions & 0 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,23 @@ def text(self, paths):
paths = [paths]
return self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))

@since(2.0)
def csv(self, paths):
"""Loads a CSV file and returns the result as a [[DataFrame]].
This function goes through the input once to determine the input schema. To avoid going
through the entire data once, specify the schema explicitly using [[schema]].
:param paths: string, or list of strings, for input path(s).
>>> df = sqlContext.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes
[('C0', 'string'), ('C1', 'string')]
"""
if isinstance(paths, basestring):
paths = [paths]
return self._df(self._jreader.csv(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))

@since(1.5)
def orc(self, path):
"""Loads an ORC file, returning the result as a :class:`DataFrame`.
Expand Down Expand Up @@ -448,6 +465,11 @@ def json(self, path, mode=None):
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
You can set the following JSON-specific option(s) for writing JSON files:
* ``compression`` (default ``None``): compression codec to use when saving to file.
This can be one of the known case-insensitive shorten names
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)._jwrite.json(path)
Expand Down Expand Up @@ -476,11 +498,39 @@ def parquet(self, path, mode=None, partitionBy=None):
def text(self, path):
"""Saves the content of the DataFrame in a text file at the specified path.
:param path: the path in any Hadoop supported file system
The DataFrame must have only one column that is of string type.
Each row becomes a new line in the output file.
You can set the following option(s) for writing text files:
* ``compression`` (default ``None``): compression codec to use when saving to file.
This can be one of the known case-insensitive shorten names
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
"""
self._jwrite.text(path)

@since(2.0)
def csv(self, path, mode=None):
"""Saves the content of the [[DataFrame]] in CSV format at the specified path.
:param path: the path in any Hadoop supported file system
:param mode: specifies the behavior of the save operation when data already exists.
* ``append``: Append contents of this :class:`DataFrame` to existing data.
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
You can set the following CSV-specific option(s) for writing CSV files:
* ``compression`` (default ``None``): compression codec to use when saving to file.
This can be one of the known case-insensitive shorten names
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)._jwrite.csv(path)

@since(1.5)
def orc(self, path, mode=None, partitionBy=None):
"""Saves the content of the :class:`DataFrame` in ORC format at the specified path.
Expand Down
4 changes: 4 additions & 0 deletions python/test_support/sql/ages.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Joe,20
Tom,30
Hyukjin,25

23 changes: 23 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,10 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* format("json").save(path)
* }}}
*
* You can set the following JSON-specific option(s) for writing JSON files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
*
* @since 1.4.0
*/
def json(path: String): Unit = format("json").save(path)
Expand Down Expand Up @@ -492,10 +496,29 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* df.write().text("/path/to/output")
* }}}
*
* You can set the following option(s) for writing text files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
*
* @since 1.6.0
*/
def text(path: String): Unit = format("text").save(path)

/**
* Saves the content of the [[DataFrame]] in CSV format at the specified path.
* This is equivalent to:
* {{{
* format("csv").save(path)
* }}}
*
* You can set the following CSV-specific option(s) for writing CSV files:
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
*
* @since 2.0.0
*/
def csv(path: String): Unit = format("csv").save(path)

///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ private[sql] class JSONOptions(
parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
val allowBackslashEscapingAnyCharacter =
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
val compressionCodec = {
val name = parameters.get("compression").orElse(parameters.get("codec"))
name.map(CompressionCodecs.getCodecClassName)
}
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)

/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,7 @@ private[sql] class TextRelation(
/** Write path. */
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = job.getConfiguration
val compressionCodec = {
val name = parameters.get("compression").orElse(parameters.get("codec"))
name.map(CompressionCodecs.getCodecClassName)
}
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
compressionCodec.foreach { codec =>
CompressionCodecs.setCodecConfiguration(conf, codec)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.load(testFile(carsFile))

cars.coalesce(1).write
.format("csv")
.option("header", "true")
.save(csvDir)
.csv(csvDir)

val carsCopy = sqlContext.read
.format("csv")
Expand Down

0 comments on commit 6bec27e

Please sign in to comment.