Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23577][SQL] Supports custom line separator for text datasource #20727

Closed
wants to merge 7 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Mar 3, 2018

What changes were proposed in this pull request?

This PR proposes to add lineSep option for a configurable line separator in text datasource.

It supports this option by using LineRecordReader's functionality with passing it to the constructor.

How was this patch tested?

Manual tests and unit tests were added.

@@ -400,6 +400,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <ul>
* <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
* considered in every trigger.</li>
* <li>`wholetext` (default `false`): If true, read a file as a single row and not split by "\n".
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added some changes missed for wholetext too while I am here.

@SparkQA
Copy link

SparkQA commented Mar 3, 2018

Test build #87929 has finished for PR 20727 at commit 93652b3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti
*/
val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean

val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n")
require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This require looks redundant after a getOrElse call

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it's nonEmpty on String.

@cloud-fan
Copy link
Contributor

looks good, also cc @MaxGekk

*/
class HadoopFileLinesReader(
file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable {
file: PartitionedFile,
lineSeparator: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A constructor of LineRecordReader has the same param but its type is array of bytes. I would propose to expose the same type here - Array[Byte]. Also you handle special value - '\n'. If you define the param as lineSeparator: Option[Array[Byte]], None would indicate default behavior - '\n', '\r' or '\r\n'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to use String here and control the encoding stuff in one place here. For option thing, see #20727 (comment). Will bring it back.

@@ -42,7 +52,12 @@ class HadoopFileLinesReader(
Array.empty)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
val reader = new LineRecordReader()
val reader = if (lineSeparator != "\n") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it has type Option[Array[Byte]]:

val reader = lineSeparator match {
  case Some(sep) => new LineRecordReader(sep)
  case _ => new LineRecordReader()
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for option thing. Will bring it back.

@@ -42,7 +52,12 @@ class HadoopFileLinesReader(
Array.empty)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
val reader = new LineRecordReader()
val reader = if (lineSeparator != "\n") {
new LineRecordReader(lineSeparator.getBytes("UTF-8"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to not depend on particular charset

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you have a suggestion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion is to pass Array[Byte] into the class. If charsets different from UTF-8 will be supported in the future, this place should be changed for sure. You can make this class more tolerant to input charsets right now. Just for an example, json reader (jackson json parser) is able to read json in any standard charsets. To fix its per-line mode, need to support lineSep in any charset and convert lineSep to array of byte before using the class. If you restrict charset of lineSep to UTF-8, you just make the wall for other datasources.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, it's initially an unicode string via datasource interface and we need to somehow convert it to bytes once as it takes bytes. Do you mean adding another option for specifying charset or did I maybe miss something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you think this class is responsible for converting string separator to array of bytes? Especially restriction by one charset is not clear. The purpose of the class is to provide the Iterator interface of records/lines to datasources. And this class doesn't have to know about datasource's charset. I would not stick on particular charset here and expose the separator parameter with Option[Array[Byte]] like the LineReader provides a constructor with byte[] recordDelimiter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let me try to address this one.

val reader = if (lineSeparator != "\n") {
new LineRecordReader(lineSeparator.getBytes("UTF-8"))
} else {
// This behavior follows Hive. `\n` covers `\r`, `\r\n` and `\n`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case for lineSeparator = '\n' covers '\r' and '\r\n', it looks not so good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. It seems cleaner to use None as default, which indicates \r, \r\n and \n.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially did this but reverted it back - #18581 (comment). Let me bring the option back.

file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable {
file: PartitionedFile,
lineSeparator: String,
conf: Configuration) extends Iterator[Text] with Closeable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot predict where the class could be used but I believe we should keep backward compatibility by sources. I mean it would be better to add new parameter after conf: Configuration with default value which preserve old behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's preserved by def this(file: PartitionedFile, conf: Configuration) = this(file, "\n", conf) I believe. execution package is meant to be an internal package anyway.

}
}

Seq("|", "^", "::", "!!!@3").foreach { lineSep =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, check invisible and control chars

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, sounds a good idea.

}

private[text] object TextOptions {
val COMPRESSION = "compression"
val WHOLETEXT = "wholetext"
val LINE_SEPARATOR = "lineSep"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it not "lineSeparator"? I would propose another name for the option: recordSeparator. Could you image you have the text file:

id: 123
cmd: ls -l
---
id: 456
cmd: rm -rf

where the separator is ---. If the separator is not new line delimiter, records don't looks like lines. And recordSeparator would be closer to Hadoop's terminology. Besides of that, probably, we introduce similar option for another datasources like json. recordSeparator of json records (not lines) sounds better from my point of view.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was taken after Univocity parser, setLineSeparator.

I think Hadoop calls it textinputformat.record.delimiter but the class name is LineRecordReader or LineReader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is not so matter as having the same name for the option across all supported datasources - text, csv and json to don't confuse users.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, the name "lineSep" is taken after Python, R's sep and our supporting option sep. Here was another discussion - #18581 (comment)
"line" is taken after Univocity CSV parser and seems making sense in Hadoop too. I think we documented JSON is in lines, and I think it makes sense to CSV and text.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One example might sound counterintuitive to you but it looks less consistent with other places at least I usually refer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really care what we are going to call it this. I think it is important that we are consistent across datasources. IMO Since we can define anything to be a separator, not only newlines, records seems to fit a bit better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The definition of records was introduced in the classical paper: https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf . I don't see any reasons for replacing it by line right now.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Mar 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reason is to refer other places so that practically other users feel comfortable practically, which I usually put more importances. I really don't want to spend time on researching why the other references used the term "line".

If we think about the plain text, CSV or JSON, the term "line" can be correct in a way. We documented http://jsonlines.org/ (even this reference used the term "line"). I think, for example, the line can be defined by its separator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already used the term "line" everywhere in the doc. We could just say lines are separated by a character and minimise the doc fix and etc.

@SparkQA
Copy link

SparkQA commented Mar 6, 2018

Test build #88001 has finished for PR 20727 at commit 242b0f9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 6, 2018

Test build #88002 has finished for PR 20727 at commit e529d0a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -42,7 +52,12 @@ class HadoopFileLinesReader(
Array.empty)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
val reader = new LineRecordReader()
val reader = if (lineSeparator != "\n") {
new LineRecordReader(lineSeparator.getBytes("UTF-8"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion is to pass Array[Byte] into the class. If charsets different from UTF-8 will be supported in the future, this place should be changed for sure. You can make this class more tolerant to input charsets right now. Just for an example, json reader (jackson json parser) is able to read json in any standard charsets. To fix its per-line mode, need to support lineSep in any charset and convert lineSep to array of byte before using the class. If you restrict charset of lineSep to UTF-8, you just make the wall for other datasources.

}

private[text] object TextOptions {
val COMPRESSION = "compression"
val WHOLETEXT = "wholetext"
val LINE_SEPARATOR = "lineSep"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is not so matter as having the same name for the option across all supported datasources - text, csv and json to don't confuse users.

file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable {
file: PartitionedFile,
lineSeparator: Option[String],
conf: Configuration) extends Iterator[Text] with Closeable {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that it's an internal API for datasources and Hadoop's Text already has an assumption for utf8. I don't think we should call getBytes with utf8 at each caller side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some methods of Hadoop's Text have such assumption about UTF-8 encoding. In general a datasource could eliminate the restriction by using the Text class as container of raw bytes and calling methods like getBytes() and getLength().

Copy link
Member Author

@HyukjinKwon HyukjinKwon Mar 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I am sorry if I wasn't clear. I mean the doc describes:

This class stores text using standard UTF8 encoding.

I was wondering if that's a official way to use Text without the restriction because that sounds rather an informal workaround.

@SparkQA
Copy link

SparkQA commented Mar 11, 2018

Test build #88158 has finished for PR 20727 at commit 97a8422.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 11, 2018

Test build #88159 has finished for PR 20727 at commit d6e9160.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

@cloud-fan and @MaxGekk, I believe this is ready for another look.

Copy link
Member

@MaxGekk MaxGekk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM besides of the option name

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 20, 2018

Test build #88408 has finished for PR 20727 at commit d6e9160.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 20, 2018

Test build #88415 has finished for PR 20727 at commit d6e9160.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*
* @param file A part (i.e. "block") of a single file that should be read line by line.
* @param lineSeparator A line separator that should be used for each line. If the value is `None`,
* it covers `\r`, `\r\n` and `\n`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention that this default rule is not defined by us, but by hadoop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

@cloud-fan
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Mar 21, 2018

Test build #88451 has finished for PR 20727 at commit f1c951f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 21, 2018

Test build #88460 has finished for PR 20727 at commit f1c951f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 21, 2018

Test build #88467 has finished for PR 20727 at commit f1c951f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 8d79113 Mar 21, 2018
@HyukjinKwon
Copy link
Member Author

Thanks for reviewing/merging this @cloud-fan, @MaxGekk and @hvanhovell.

asfgit pushed a commit that referenced this pull request Mar 28, 2018
## What changes were proposed in this pull request?

This PR proposes to add lineSep option for a configurable line separator in text datasource.
It supports this option by using `LineRecordReader`'s functionality with passing it to the constructor.

The approach is similar with #20727; however, one main difference is, it uses text datasource's `lineSep` option to parse line by line in JSON's schema inference.

## How was this patch tested?

Manually tested and unit tests were added.

Author: hyukjinkwon <[email protected]>
Author: hyukjinkwon <[email protected]>

Closes #20877 from HyukjinKwon/linesep-json.
mshtelma pushed a commit to mshtelma/spark that referenced this pull request Apr 5, 2018
## What changes were proposed in this pull request?

This PR proposes to add lineSep option for a configurable line separator in text datasource.
It supports this option by using `LineRecordReader`'s functionality with passing it to the constructor.

The approach is similar with apache#20727; however, one main difference is, it uses text datasource's `lineSep` option to parse line by line in JSON's schema inference.

## How was this patch tested?

Manually tested and unit tests were added.

Author: hyukjinkwon <[email protected]>
Author: hyukjinkwon <[email protected]>

Closes apache#20877 from HyukjinKwon/linesep-json.
@HyukjinKwon HyukjinKwon deleted the linesep-text branch October 16, 2018 12:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants