-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23577][SQL] Supports custom line separator for text datasource #20727
Changes from 6 commits
a8eeef8
84d2cef
b5c8246
6f1d317
4835632
d6e9160
f1c951f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,9 +30,19 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl | |
/** | ||
* An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines | ||
* in that file. | ||
* | ||
* @param file A part (i.e. "block") of a single file that should be read line by line. | ||
* @param lineSeparator A line separator that should be used for each line. If the value is `None`, | ||
* it covers `\r`, `\r\n` and `\n`. | ||
* @param conf Hadoop configuration | ||
*/ | ||
class HadoopFileLinesReader( | ||
file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { | ||
file: PartitionedFile, | ||
lineSeparator: Option[Array[Byte]], | ||
conf: Configuration) extends Iterator[Text] with Closeable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I cannot predict where the class could be used but I believe we should keep backward compatibility by sources. I mean it would be better to add new parameter after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's preserved by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that it's an internal API for datasources and Hadoop's Text already has an assumption for utf8. I don't think we should call getBytes with utf8 at each caller side. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some methods of Hadoop's Text have such assumption about UTF-8 encoding. In general a datasource could eliminate the restriction by using the Text class as container of raw bytes and calling methods like getBytes() and getLength(). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, I am sorry if I wasn't clear. I mean the doc describes:
I was wondering if that's a official way to use |
||
|
||
def this(file: PartitionedFile, conf: Configuration) = this(file, None, conf) | ||
|
||
private val iterator = { | ||
val fileSplit = new FileSplit( | ||
new Path(new URI(file.filePath)), | ||
|
@@ -42,7 +52,13 @@ class HadoopFileLinesReader( | |
Array.empty) | ||
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) | ||
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) | ||
val reader = new LineRecordReader() | ||
|
||
val reader = lineSeparator match { | ||
case Some(sep) => new LineRecordReader(sep) | ||
// If the line separator is `None`, it covers `\r`, `\r\n` and `\n`. | ||
case _ => new LineRecordReader() | ||
} | ||
|
||
reader.initialize(fileSplit, hadoopAttemptContext) | ||
new RecordReaderIterator(reader) | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -17,6 +17,8 @@ | |||
|
||||
package org.apache.spark.sql.execution.datasources.text | ||||
|
||||
import java.nio.charset.StandardCharsets | ||||
|
||||
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs} | ||||
|
||||
/** | ||||
|
@@ -39,9 +41,19 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti | |||
*/ | ||||
val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean | ||||
|
||||
private val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { sep => | ||||
require(sep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") | ||||
sep | ||||
} | ||||
// Note that the option 'lineSep' uses a different default value in read and write. | ||||
val lineSeparatorInRead: Option[Array[Byte]] = | ||||
lineSeparator.map(_.getBytes(StandardCharsets.UTF_8)) | ||||
val lineSeparatorInWrite: Array[Byte] = | ||||
lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8)) | ||||
} | ||||
|
||||
private[text] object TextOptions { | ||||
val COMPRESSION = "compression" | ||||
val WHOLETEXT = "wholetext" | ||||
val LINE_SEPARATOR = "lineSep" | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is it not "lineSeparator"? I would propose another name for the option: recordSeparator. Could you image you have the text file:
where the separator is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was taken after Univocity parser, I think Hadoop calls it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The name is not so matter as having the same name for the option across all supported datasources - text, csv and json to don't confuse users. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be clear, the name "lineSep" is taken after Python, R's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One example might sound counterintuitive to you but it looks less consistent with other places at least I usually refer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't really care what we are going to call it this. I think it is important that we are consistent across datasources. IMO Since we can define anything to be a separator, not only newlines, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The definition of records was introduced in the classical paper: https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf . I don't see any reasons for replacing it by line right now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My reason is to refer other places so that practically other users feel comfortable practically, which I usually put more importances. I really don't want to spend time on researching why the other references used the term "line". If we think about the plain text, CSV or JSON, the term "line" can be correct in a way. We documented http://jsonlines.org/ (even this reference used the term "line"). I think, for example, the line can be defined by its separator. spark/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala Line 1645 in c36fecc
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We already used the term "line" everywhere in the doc. We could just say lines are separated by a character and minimise the doc fix and etc. |
||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -387,7 +387,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo | |
* Loads text files and returns a `DataFrame` whose schema starts with a string column named | ||
* "value", and followed by partitioned columns if there are any. | ||
* | ||
* Each line in the text files is a new row in the resulting DataFrame. For example: | ||
* By default, each line in the text files is a new row in the resulting DataFrame. For example: | ||
* {{{ | ||
* // Scala: | ||
* spark.readStream.text("/path/to/directory/") | ||
|
@@ -400,6 +400,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo | |
* <ul> | ||
* <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be | ||
* considered in every trigger.</li> | ||
* <li>`wholetext` (default `false`): If true, read a file as a single row and not split by "\n". | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also added some changes missed for |
||
* </li> | ||
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator | ||
* that should be used for parsing.</li> | ||
* </ul> | ||
* | ||
* @since 2.0.0 | ||
|
@@ -413,7 +417,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo | |
* If the directory structure of the text files contains partitioning information, those are | ||
* ignored in the resulting Dataset. To include partitioning information as columns, use `text`. | ||
* | ||
* Each line in the text file is a new element in the resulting Dataset. For example: | ||
* By default, each line in the text file is a new element in the resulting Dataset. For example: | ||
* {{{ | ||
* // Scala: | ||
* spark.readStream.textFile("/path/to/spark/README.md") | ||
|
@@ -426,6 +430,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo | |
* <ul> | ||
* <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be | ||
* considered in every trigger.</li> | ||
* <li>`wholetext` (default `false`): If true, read a file as a single row and not split by "\n". | ||
* </li> | ||
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator | ||
* that should be used for parsing.</li> | ||
* </ul> | ||
* | ||
* @param path input path | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should mention that this default rule is not defined by us, but by hadoop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.