Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23577][SQL] Supports custom line separator for text datasource #20727

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,18 @@ def parquet(self, *paths):

@ignore_unicode_prefix
@since(1.6)
def text(self, paths, wholetext=False):
def text(self, paths, wholetext=False, lineSep=None):
"""
Loads text files and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
are any.

Each line in the text file is a new row in the resulting DataFrame.
By default, each line in the text file is a new row in the resulting DataFrame.

:param paths: string, or list of strings, for input path(s).
:param wholetext: if true, read each file from input path(s) as a single row.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.

>>> df = spark.read.text('python/test_support/sql/text-test.txt')
>>> df.collect()
Expand All @@ -322,7 +324,7 @@ def text(self, paths, wholetext=False):
>>> df.collect()
[Row(value=u'hello\\nthis')]
"""
self._set_opts(wholetext=wholetext)
self._set_opts(wholetext=wholetext, lineSep=lineSep)
if isinstance(paths, basestring):
paths = [paths]
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
Expand Down Expand Up @@ -804,18 +806,20 @@ def parquet(self, path, mode=None, partitionBy=None, compression=None):
self._jwrite.parquet(path)

@since(1.6)
def text(self, path, compression=None):
def text(self, path, compression=None, lineSep=None):
"""Saves the content of the DataFrame in a text file at the specified path.

:param path: the path in any Hadoop supported file system
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
:param lineSep: defines the line separator that should be used for writing. If None is
set, it uses the default value, ``\\n``.

The DataFrame must have only one column that is of string type.
Each row becomes a new line in the output file.
"""
self._set_opts(compression=compression)
self._set_opts(compression=compression, lineSep=lineSep)
self._jwrite.text(path)

@since(2.0)
Expand Down
8 changes: 6 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,24 +531,28 @@ def parquet(self, path):

@ignore_unicode_prefix
@since(2.0)
def text(self, path):
def text(self, path, wholetext=False, lineSep=None):
"""
Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
are any.

Each line in the text file is a new row in the resulting DataFrame.
By default, each line in the text file is a new row in the resulting DataFrame.

.. note:: Evolving.

:param paths: string, or list of strings, for input path(s).
:param wholetext: if true, read each file from input path(s) as a single row.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.

>>> text_sdf = spark.readStream.text(tempfile.mkdtemp())
>>> text_sdf.isStreaming
True
>>> "value" in str(text_sdf.schema)
True
"""
self._set_opts(wholetext=wholetext, lineSep=lineSep)
if isinstance(path, basestring):
return self._df(self._jreader.text(path))
else:
Expand Down
24 changes: 23 additions & 1 deletion python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,29 @@ def test_non_existed_udaf(self):
self.assertRaisesRegexp(AnalysisException, "Can not load class non_existed_udaf",
lambda: spark.udf.registerJavaUDAF("udaf1", "non_existed_udaf"))

def test_multiLine_json(self):
def test_linesep_text(self):
df = self.spark.read.text("python/test_support/sql/ages_newlines.csv", lineSep=",")
expected = [Row(value=u'Joe'), Row(value=u'20'), Row(value=u'"Hi'),
Row(value=u'\nI am Jeo"\nTom'), Row(value=u'30'),
Row(value=u'"My name is Tom"\nHyukjin'), Row(value=u'25'),
Row(value=u'"I am Hyukjin\n\nI love Spark!"\n')]
self.assertEqual(df.collect(), expected)

tpath = tempfile.mkdtemp()
shutil.rmtree(tpath)
try:
df.write.text(tpath, lineSep="!")
expected = [Row(value=u'Joe!20!"Hi!'), Row(value=u'I am Jeo"'),
Row(value=u'Tom!30!"My name is Tom"'),
Row(value=u'Hyukjin!25!"I am Hyukjin'),
Row(value=u''), Row(value=u'I love Spark!"'),
Row(value=u'!')]
readback = self.spark.read.text(tpath)
self.assertEqual(readback.collect(), expected)
finally:
shutil.rmtree(tpath)

def test_multiline_json(self):
people1 = self.spark.read.json("python/test_support/sql/people.json")
people_array = self.spark.read.json("python/test_support/sql/people_array.json",
multiLine=True)
Expand Down
30 changes: 17 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -647,14 +647,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* Loads text files and returns a `DataFrame` whose schema starts with a string column named
* "value", and followed by partitioned columns if there are any.
*
* You can set the following text-specific option(s) for reading text files:
* <ul>
* <li>`wholetext` ( default `false`): If true, read a file as a single row and not split by "\n".
* </li>
* </ul>
* By default, each line in the text files is a new row in the resulting DataFrame.
*
* Usage example:
* By default, each line in the text files is a new row in the resulting DataFrame. For example:
* {{{
* // Scala:
* spark.read.text("/path/to/spark/README.md")
Expand All @@ -663,6 +656,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* spark.read().text("/path/to/spark/README.md")
* }}}
*
* You can set the following text-specific option(s) for reading text files:
* <ul>
* <li>`wholetext` (default `false`): If true, read a file as a single row and not split by "\n".
* </li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
* that should be used for parsing.</li>
* </ul>
*
* @param paths input paths
* @since 1.6.0
*/
Expand All @@ -686,11 +687,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* If the directory structure of the text files contains partitioning information, those are
* ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
*
* You can set the following textFile-specific option(s) for reading text files:
* <ul>
* <li>`wholetext` ( default `false`): If true, read a file as a single row and not split by "\n".
* </li>
* </ul>
* By default, each line in the text files is a new row in the resulting DataFrame. For example:
* {{{
* // Scala:
Expand All @@ -700,6 +696,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* spark.read().textFile("/path/to/spark/README.md")
* }}}
*
* You can set the following textFile-specific option(s) for reading text files:
* <ul>
* <li>`wholetext` (default `false`): If true, read a file as a single row and not split by "\n".
* </li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
* that should be used for parsing.</li>
* </ul>
*
* @param paths input path
* @since 2.0.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`). </li>
* <li>`lineSep` (default `\n`): defines the line separator that should
* be used for writing.</li>
* </ul>
*
* @since 1.6.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,19 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
/**
* An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines
* in that file.
*
* @param file A part (i.e. "block") of a single file that should be read line by line.
* @param lineSeparator A line separator that should be used for each line. If the value is `None`,
* it covers `\r`, `\r\n` and `\n`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention that this default rule is not defined by us, but by hadoop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

* @param conf Hadoop configuration
*/
class HadoopFileLinesReader(
file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable {
file: PartitionedFile,
lineSeparator: Option[Array[Byte]],
conf: Configuration) extends Iterator[Text] with Closeable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot predict where the class could be used but I believe we should keep backward compatibility by sources. I mean it would be better to add new parameter after conf: Configuration with default value which preserve old behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's preserved by def this(file: PartitionedFile, conf: Configuration) = this(file, "\n", conf) I believe. execution package is meant to be an internal package anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that it's an internal API for datasources and Hadoop's Text already has an assumption for utf8. I don't think we should call getBytes with utf8 at each caller side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some methods of Hadoop's Text have such assumption about UTF-8 encoding. In general a datasource could eliminate the restriction by using the Text class as container of raw bytes and calling methods like getBytes() and getLength().

Copy link
Member Author

@HyukjinKwon HyukjinKwon Mar 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I am sorry if I wasn't clear. I mean the doc describes:

This class stores text using standard UTF8 encoding.

I was wondering if that's a official way to use Text without the restriction because that sounds rather an informal workaround.


def this(file: PartitionedFile, conf: Configuration) = this(file, None, conf)

private val iterator = {
val fileSplit = new FileSplit(
new Path(new URI(file.filePath)),
Expand All @@ -42,7 +52,13 @@ class HadoopFileLinesReader(
Array.empty)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
val reader = new LineRecordReader()

val reader = lineSeparator match {
case Some(sep) => new LineRecordReader(sep)
// If the line separator is `None`, it covers `\r`, `\r\n` and `\n`.
case _ => new LineRecordReader()
}

reader.initialize(fileSplit, hadoopAttemptContext)
new RecordReaderIterator(reader)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

package org.apache.spark.sql.execution.datasources.text

import java.io.Closeable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -89,7 +86,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
new TextOutputWriter(path, dataSchema, context)
new TextOutputWriter(path, dataSchema, textOptions.lineSeparatorInWrite, context)
}

override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -113,18 +110,18 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

readToUnsafeMem(broadcastedHadoopConf, requiredSchema, textOptions.wholeText)
readToUnsafeMem(broadcastedHadoopConf, requiredSchema, textOptions)
}

private def readToUnsafeMem(
conf: Broadcast[SerializableConfiguration],
requiredSchema: StructType,
wholeTextMode: Boolean): (PartitionedFile) => Iterator[UnsafeRow] = {
textOptions: TextOptions): (PartitionedFile) => Iterator[UnsafeRow] = {

(file: PartitionedFile) => {
val confValue = conf.value.value
val reader = if (!wholeTextMode) {
new HadoopFileLinesReader(file, confValue)
val reader = if (!textOptions.wholeText) {
new HadoopFileLinesReader(file, textOptions.lineSeparatorInRead, confValue)
} else {
new HadoopFileWholeTextReader(file, confValue)
}
Expand Down Expand Up @@ -152,6 +149,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
class TextOutputWriter(
path: String,
dataSchema: StructType,
lineSeparator: Array[Byte],
context: TaskAttemptContext)
extends OutputWriter {

Expand All @@ -162,7 +160,7 @@ class TextOutputWriter(
val utf8string = row.getUTF8String(0)
utf8string.writeTo(writer)
}
writer.write('\n')
writer.write(lineSeparator)
}

override def close(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources.text

import java.nio.charset.StandardCharsets

import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs}

/**
Expand All @@ -39,9 +41,19 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti
*/
val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean

private val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { sep =>
require(sep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.")
sep
}
// Note that the option 'lineSep' uses a different default value in read and write.
val lineSeparatorInRead: Option[Array[Byte]] =
lineSeparator.map(_.getBytes(StandardCharsets.UTF_8))
val lineSeparatorInWrite: Array[Byte] =
lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8))
}

private[text] object TextOptions {
val COMPRESSION = "compression"
val WHOLETEXT = "wholetext"
val LINE_SEPARATOR = "lineSep"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it not "lineSeparator"? I would propose another name for the option: recordSeparator. Could you image you have the text file:

id: 123
cmd: ls -l
---
id: 456
cmd: rm -rf

where the separator is ---. If the separator is not new line delimiter, records don't looks like lines. And recordSeparator would be closer to Hadoop's terminology. Besides of that, probably, we introduce similar option for another datasources like json. recordSeparator of json records (not lines) sounds better from my point of view.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was taken after Univocity parser, setLineSeparator.

I think Hadoop calls it textinputformat.record.delimiter but the class name is LineRecordReader or LineReader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is not so matter as having the same name for the option across all supported datasources - text, csv and json to don't confuse users.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, the name "lineSep" is taken after Python, R's sep and our supporting option sep. Here was another discussion - #18581 (comment)
"line" is taken after Univocity CSV parser and seems making sense in Hadoop too. I think we documented JSON is in lines, and I think it makes sense to CSV and text.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One example might sound counterintuitive to you but it looks less consistent with other places at least I usually refer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really care what we are going to call it this. I think it is important that we are consistent across datasources. IMO Since we can define anything to be a separator, not only newlines, records seems to fit a bit better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The definition of records was introduced in the classical paper: https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf . I don't see any reasons for replacing it by line right now.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Mar 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reason is to refer other places so that practically other users feel comfortable practically, which I usually put more importances. I really don't want to spend time on researching why the other references used the term "line".

If we think about the plain text, CSV or JSON, the term "line" can be correct in a way. We documented http://jsonlines.org/ (even this reference used the term "line"). I think, for example, the line can be defined by its separator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already used the term "line" everywhere in the doc. We could just say lines are separated by a character and minimise the doc fix and etc.

}
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* Loads text files and returns a `DataFrame` whose schema starts with a string column named
* "value", and followed by partitioned columns if there are any.
*
* Each line in the text files is a new row in the resulting DataFrame. For example:
* By default, each line in the text files is a new row in the resulting DataFrame. For example:
* {{{
* // Scala:
* spark.readStream.text("/path/to/directory/")
Expand All @@ -400,6 +400,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <ul>
* <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
* considered in every trigger.</li>
* <li>`wholetext` (default `false`): If true, read a file as a single row and not split by "\n".
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added some changes missed for wholetext too while I am here.

* </li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
* that should be used for parsing.</li>
* </ul>
*
* @since 2.0.0
Expand All @@ -413,7 +417,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* If the directory structure of the text files contains partitioning information, those are
* ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
*
* Each line in the text file is a new element in the resulting Dataset. For example:
* By default, each line in the text file is a new element in the resulting Dataset. For example:
* {{{
* // Scala:
* spark.readStream.textFile("/path/to/spark/README.md")
Expand All @@ -426,6 +430,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <ul>
* <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
* considered in every trigger.</li>
* <li>`wholetext` (default `false`): If true, read a file as a single row and not split by "\n".
* </li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
* that should be used for parsing.</li>
* </ul>
*
* @param path input path
Expand Down
Loading