-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23577][SQL] Supports custom line separator for text datasource #20727
Conversation
@@ -400,6 +400,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo | |||
* <ul> | |||
* <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be | |||
* considered in every trigger.</li> | |||
* <li>`wholetext` (default `false`): If true, read a file as a single row and not split by "\n". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also added some changes missed for wholetext
too while I am here.
Test build #87929 has finished for PR 20727 at commit
|
@@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti | |||
*/ | |||
val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean | |||
|
|||
val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") | |||
require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This require
looks redundant after a getOrElse
call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, it's nonEmpty
on String
.
looks good, also cc @MaxGekk |
*/ | ||
class HadoopFileLinesReader( | ||
file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { | ||
file: PartitionedFile, | ||
lineSeparator: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A constructor of LineRecordReader has the same param but its type is array of bytes. I would propose to expose the same type here - Array[Byte]. Also you handle special value - '\n'. If you define the param as lineSeparator: Option[Array[Byte]], None would indicate default behavior - '\n', '\r' or '\r\n'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to use String
here and control the encoding stuff in one place here. For option thing, see #20727 (comment). Will bring it back.
@@ -42,7 +52,12 @@ class HadoopFileLinesReader( | |||
Array.empty) | |||
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) | |||
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) | |||
val reader = new LineRecordReader() | |||
val reader = if (lineSeparator != "\n") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it has type Option[Array[Byte]]:
val reader = lineSeparator match {
case Some(sep) => new LineRecordReader(sep)
case _ => new LineRecordReader()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for option thing. Will bring it back.
@@ -42,7 +52,12 @@ class HadoopFileLinesReader( | |||
Array.empty) | |||
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) | |||
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) | |||
val reader = new LineRecordReader() | |||
val reader = if (lineSeparator != "\n") { | |||
new LineRecordReader(lineSeparator.getBytes("UTF-8")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to not depend on particular charset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you have a suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My suggestion is to pass Array[Byte] into the class. If charsets different from UTF-8 will be supported in the future, this place should be changed for sure. You can make this class more tolerant to input charsets right now. Just for an example, json reader (jackson json parser) is able to read json in any standard charsets. To fix its per-line mode, need to support lineSep in any charset and convert lineSep to array of byte before using the class. If you restrict charset of lineSep to UTF-8, you just make the wall for other datasources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, it's initially an unicode string via datasource interface and we need to somehow convert it to bytes once as it takes bytes. Do you mean adding another option for specifying charset or did I maybe miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you think this class is responsible for converting string separator to array of bytes? Especially restriction by one charset is not clear. The purpose of the class is to provide the Iterator interface of records/lines to datasources. And this class doesn't have to know about datasource's charset. I would not stick on particular charset here and expose the separator parameter with Option[Array[Byte]]
like the LineReader provides a constructor with byte[] recordDelimiter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Let me try to address this one.
val reader = if (lineSeparator != "\n") { | ||
new LineRecordReader(lineSeparator.getBytes("UTF-8")) | ||
} else { | ||
// This behavior follows Hive. `\n` covers `\r`, `\r\n` and `\n`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The case for lineSeparator = '\n' covers '\r' and '\r\n', it looks not so good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. It seems cleaner to use None as default, which indicates \r
, \r\n
and \n
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially did this but reverted it back - #18581 (comment). Let me bring the option back.
file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { | ||
file: PartitionedFile, | ||
lineSeparator: String, | ||
conf: Configuration) extends Iterator[Text] with Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot predict where the class could be used but I believe we should keep backward compatibility by sources. I mean it would be better to add new parameter after conf: Configuration
with default value which preserve old behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's preserved by def this(file: PartitionedFile, conf: Configuration) = this(file, "\n", conf)
I believe. execution
package is meant to be an internal package anyway.
} | ||
} | ||
|
||
Seq("|", "^", "::", "!!!@3").foreach { lineSep => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, check invisible and control chars
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, sounds a good idea.
} | ||
|
||
private[text] object TextOptions { | ||
val COMPRESSION = "compression" | ||
val WHOLETEXT = "wholetext" | ||
val LINE_SEPARATOR = "lineSep" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it not "lineSeparator"? I would propose another name for the option: recordSeparator. Could you image you have the text file:
id: 123
cmd: ls -l
---
id: 456
cmd: rm -rf
where the separator is ---
. If the separator is not new line delimiter, records don't looks like lines. And recordSeparator would be closer to Hadoop's terminology. Besides of that, probably, we introduce similar option for another datasources like json. recordSeparator of json records (not lines) sounds better from my point of view.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was taken after Univocity parser, setLineSeparator
.
I think Hadoop calls it textinputformat.record.delimiter
but the class name is LineRecordReader
or LineReader
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is not so matter as having the same name for the option across all supported datasources - text, csv and json to don't confuse users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, the name "lineSep" is taken after Python, R's sep
and our supporting option sep
. Here was another discussion - #18581 (comment)
"line" is taken after Univocity CSV parser and seems making sense in Hadoop too. I think we documented JSON is in lines, and I think it makes sense to CSV and text.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One example might sound counterintuitive to you but it looks less consistent with other places at least I usually refer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really care what we are going to call it this. I think it is important that we are consistent across datasources. IMO Since we can define anything to be a separator, not only newlines, records
seems to fit a bit better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The definition of records was introduced in the classical paper: https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf . I don't see any reasons for replacing it by line right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My reason is to refer other places so that practically other users feel comfortable practically, which I usually put more importances. I really don't want to spend time on researching why the other references used the term "line".
If we think about the plain text, CSV or JSON, the term "line" can be correct in a way. We documented http://jsonlines.org/ (even this reference used the term "line"). I think, for example, the line can be defined by its separator.
spark/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
Line 1645 in c36fecc
|LINES TERMINATED BY '\n' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already used the term "line" everywhere in the doc. We could just say lines are separated by a character and minimise the doc fix and etc.
Test build #88001 has finished for PR 20727 at commit
|
Test build #88002 has finished for PR 20727 at commit
|
@@ -42,7 +52,12 @@ class HadoopFileLinesReader( | |||
Array.empty) | |||
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) | |||
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) | |||
val reader = new LineRecordReader() | |||
val reader = if (lineSeparator != "\n") { | |||
new LineRecordReader(lineSeparator.getBytes("UTF-8")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My suggestion is to pass Array[Byte] into the class. If charsets different from UTF-8 will be supported in the future, this place should be changed for sure. You can make this class more tolerant to input charsets right now. Just for an example, json reader (jackson json parser) is able to read json in any standard charsets. To fix its per-line mode, need to support lineSep in any charset and convert lineSep to array of byte before using the class. If you restrict charset of lineSep to UTF-8, you just make the wall for other datasources.
} | ||
|
||
private[text] object TextOptions { | ||
val COMPRESSION = "compression" | ||
val WHOLETEXT = "wholetext" | ||
val LINE_SEPARATOR = "lineSep" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is not so matter as having the same name for the option across all supported datasources - text, csv and json to don't confuse users.
file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { | ||
file: PartitionedFile, | ||
lineSeparator: Option[String], | ||
conf: Configuration) extends Iterator[Text] with Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that it's an internal API for datasources and Hadoop's Text already has an assumption for utf8. I don't think we should call getBytes with utf8 at each caller side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some methods of Hadoop's Text have such assumption about UTF-8 encoding. In general a datasource could eliminate the restriction by using the Text class as container of raw bytes and calling methods like getBytes() and getLength().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I am sorry if I wasn't clear. I mean the doc describes:
This class stores text using standard UTF8 encoding.
I was wondering if that's a official way to use Text
without the restriction because that sounds rather an informal workaround.
Test build #88158 has finished for PR 20727 at commit
|
97a8422
to
d6e9160
Compare
Test build #88159 has finished for PR 20727 at commit
|
@cloud-fan and @MaxGekk, I believe this is ready for another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM besides of the option name
retest this please |
Test build #88408 has finished for PR 20727 at commit
|
retest this please |
Test build #88415 has finished for PR 20727 at commit
|
* | ||
* @param file A part (i.e. "block") of a single file that should be read line by line. | ||
* @param lineSeparator A line separator that should be used for each line. If the value is `None`, | ||
* it covers `\r`, `\r\n` and `\n`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should mention that this default rule is not defined by us, but by hadoop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
LGTM |
Test build #88451 has finished for PR 20727 at commit
|
retest this please |
Test build #88460 has finished for PR 20727 at commit
|
retest this please |
Test build #88467 has finished for PR 20727 at commit
|
thanks, merging to master! |
Thanks for reviewing/merging this @cloud-fan, @MaxGekk and @hvanhovell. |
## What changes were proposed in this pull request? This PR proposes to add lineSep option for a configurable line separator in text datasource. It supports this option by using `LineRecordReader`'s functionality with passing it to the constructor. The approach is similar with #20727; however, one main difference is, it uses text datasource's `lineSep` option to parse line by line in JSON's schema inference. ## How was this patch tested? Manually tested and unit tests were added. Author: hyukjinkwon <[email protected]> Author: hyukjinkwon <[email protected]> Closes #20877 from HyukjinKwon/linesep-json.
## What changes were proposed in this pull request? This PR proposes to add lineSep option for a configurable line separator in text datasource. It supports this option by using `LineRecordReader`'s functionality with passing it to the constructor. The approach is similar with apache#20727; however, one main difference is, it uses text datasource's `lineSep` option to parse line by line in JSON's schema inference. ## How was this patch tested? Manually tested and unit tests were added. Author: hyukjinkwon <[email protected]> Author: hyukjinkwon <[email protected]> Closes apache#20877 from HyukjinKwon/linesep-json.
What changes were proposed in this pull request?
This PR proposes to add
lineSep
option for a configurable line separator in text datasource.It supports this option by using
LineRecordReader
's functionality with passing it to the constructor.How was this patch tested?
Manual tests and unit tests were added.