From a8eeef80bf349e3290955d656de37be57a1ca60f Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 3 Mar 2018 20:58:03 +0900 Subject: [PATCH 1/7] Supports custom line separator for text datasource --- python/pyspark/sql/readwriter.py | 14 ++++--- python/pyspark/sql/streaming.py | 8 +++- python/pyspark/sql/tests.py | 24 +++++++++++- .../apache/spark/sql/DataFrameReader.scala | 30 +++++++------- .../apache/spark/sql/DataFrameWriter.scala | 2 + .../datasources/HadoopFileLinesReader.scala | 19 ++++++++- .../datasources/text/TextFileFormat.scala | 16 +++++--- .../datasources/text/TextOptions.scala | 3 ++ .../sql/streaming/DataStreamReader.scala | 12 +++++- .../datasources/text/TextSuite.scala | 39 +++++++++++++++++++ 10 files changed, 136 insertions(+), 31 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 803f561ece67b..926014ad89be5 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -304,16 +304,18 @@ def parquet(self, *paths): @ignore_unicode_prefix @since(1.6) - def text(self, paths, wholetext=False): + def text(self, paths, wholetext=False, lineSep=None): """ Loads text files and returns a :class:`DataFrame` whose schema starts with a string column named "value", and followed by partitioned columns if there are any. - Each line in the text file is a new row in the resulting DataFrame. + By default, each line in the text file is a new row in the resulting DataFrame. :param paths: string, or list of strings, for input path(s). :param wholetext: if true, read each file from input path(s) as a single row. + :param lineSep: defines the line separator that should be used for parsing. If None is + set, it uses ``\\n`` by default, covering ``\\r``, ``\\r\\n`` and ``\\n``. >>> df = spark.read.text('python/test_support/sql/text-test.txt') >>> df.collect() @@ -322,7 +324,7 @@ def text(self, paths, wholetext=False): >>> df.collect() [Row(value=u'hello\\nthis')] """ - self._set_opts(wholetext=wholetext) + self._set_opts(wholetext=wholetext, lineSep=lineSep) if isinstance(paths, basestring): paths = [paths] return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths))) @@ -804,18 +806,20 @@ def parquet(self, path, mode=None, partitionBy=None, compression=None): self._jwrite.parquet(path) @since(1.6) - def text(self, path, compression=None): + def text(self, path, compression=None, lineSep=None): """Saves the content of the DataFrame in a text file at the specified path. :param path: the path in any Hadoop supported file system :param compression: compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate). + :param lineSep: defines the line separator that should be used for writing. If None is + set, it uses the default value, ``\\n``. The DataFrame must have only one column that is of string type. Each row becomes a new line in the output file. """ - self._set_opts(compression=compression) + self._set_opts(compression=compression, lineSep=lineSep) self._jwrite.text(path) @since(2.0) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index e8966c20a8f42..34abd33926609 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -531,17 +531,20 @@ def parquet(self, path): @ignore_unicode_prefix @since(2.0) - def text(self, path): + def text(self, path, wholetext=False, lineSep=None): """ Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a string column named "value", and followed by partitioned columns if there are any. - Each line in the text file is a new row in the resulting DataFrame. + By default, each line in the text file is a new row in the resulting DataFrame. .. note:: Evolving. :param paths: string, or list of strings, for input path(s). + :param wholetext: if true, read each file from input path(s) as a single row. + :param lineSep: defines the line separator that should be used for parsing. If None is + set, it uses ``\\n`` by default, covering ``\\r``, ``\\r\\n`` and ``\\n``. >>> text_sdf = spark.readStream.text(tempfile.mkdtemp()) >>> text_sdf.isStreaming @@ -549,6 +552,7 @@ def text(self, path): >>> "value" in str(text_sdf.schema) True """ + self._set_opts(wholetext=wholetext, lineSep=lineSep) if isinstance(path, basestring): return self._df(self._jreader.text(path)) else: diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 480815d27333f..6a3c580e5ba07 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -648,7 +648,29 @@ def test_non_existed_udaf(self): self.assertRaisesRegexp(AnalysisException, "Can not load class non_existed_udaf", lambda: spark.udf.registerJavaUDAF("udaf1", "non_existed_udaf")) - def test_multiLine_json(self): + def test_linesep_text(self): + df = self.spark.read.text("python/test_support/sql/ages_newlines.csv", lineSep=",") + expected = [Row(value=u'Joe'), Row(value=u'20'), Row(value=u'"Hi'), + Row(value=u'\nI am Jeo"\nTom'), Row(value=u'30'), + Row(value=u'"My name is Tom"\nHyukjin'), Row(value=u'25'), + Row(value=u'"I am Hyukjin\n\nI love Spark!"\n')] + self.assertEqual(df.collect(), expected) + + tpath = tempfile.mkdtemp() + shutil.rmtree(tpath) + try: + df.write.text(tpath, lineSep="!") + expected = [Row(value=u'Joe!20!"Hi!'), Row(value=u'I am Jeo"'), + Row(value=u'Tom!30!"My name is Tom"'), + Row(value=u'Hyukjin!25!"I am Hyukjin'), + Row(value=u''), Row(value=u'I love Spark!"'), + Row(value=u'!')] + readback = self.spark.read.text(tpath) + self.assertEqual(readback.collect(), expected) + finally: + shutil.rmtree(tpath) + + def test_multiline_json(self): people1 = self.spark.read.json("python/test_support/sql/people.json") people_array = self.spark.read.json("python/test_support/sql/people_array.json", multiLine=True) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 0139913aaa4e2..ef772fbe89b03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -647,14 +647,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * Loads text files and returns a `DataFrame` whose schema starts with a string column named * "value", and followed by partitioned columns if there are any. * - * You can set the following text-specific option(s) for reading text files: - * - * By default, each line in the text files is a new row in the resulting DataFrame. - * - * Usage example: + * By default, each line in the text files is a new row in the resulting DataFrame. For example: * {{{ * // Scala: * spark.read.text("/path/to/spark/README.md") @@ -663,6 +656,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * spark.read().text("/path/to/spark/README.md") * }}} * + * You can set the following text-specific option(s) for reading text files: + * + * * @param paths input paths * @since 1.6.0 */ @@ -686,11 +687,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * If the directory structure of the text files contains partitioning information, those are * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. * - * You can set the following textFile-specific option(s) for reading text files: - * * By default, each line in the text files is a new row in the resulting DataFrame. For example: * {{{ * // Scala: @@ -700,6 +696,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * spark.read().textFile("/path/to/spark/README.md") * }}} * + * You can set the following textFile-specific option(s) for reading text files: + * + * * @param paths input path * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ed7a9100cc7f1..e3e21d025f0ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -587,6 +587,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `compression` (default `null`): compression codec to use when saving to file. This can be * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, * `snappy` and `deflate`).
  • + *
  • `lineSep` (default is `\n`): defines the line separator that should + * be used for writing.
  • * * * @since 1.6.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala index 83cf26c63a175..707e3351f67c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala @@ -30,9 +30,19 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl /** * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines * in that file. + * + * @param file A part (i.e. "block") of a single file that should be read line by line. + * @param lineSeparator A line separator that should be used for each line. If the value is `\n`, + * it covers `\r`, `\r\n` and `\n`. + * @param conf Hadoop configuration */ class HadoopFileLinesReader( - file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { + file: PartitionedFile, + lineSeparator: String, + conf: Configuration) extends Iterator[Text] with Closeable { + + def this(file: PartitionedFile, conf: Configuration) = this(file, "\n", conf) + private val iterator = { val fileSplit = new FileSplit( new Path(new URI(file.filePath)), @@ -42,7 +52,12 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - val reader = new LineRecordReader() + val reader = if (lineSeparator != "\n") { + new LineRecordReader(lineSeparator.getBytes("UTF-8")) + } else { + // This behavior follows Hive. `\n` covers `\r`, `\r\n` and `\n`. + new LineRecordReader() + } reader.initialize(fileSplit, hadoopAttemptContext) new RecordReaderIterator(reader) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index c661e9bd3b94c..8a6ec5d870383 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.text import java.io.Closeable +import java.nio.charset.StandardCharsets import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} @@ -89,7 +90,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new TextOutputWriter(path, dataSchema, context) + new TextOutputWriter(path, dataSchema, textOptions.lineSeparator, context) } override def getFileExtension(context: TaskAttemptContext): String = { @@ -113,18 +114,18 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - readToUnsafeMem(broadcastedHadoopConf, requiredSchema, textOptions.wholeText) + readToUnsafeMem(broadcastedHadoopConf, requiredSchema, textOptions) } private def readToUnsafeMem( conf: Broadcast[SerializableConfiguration], requiredSchema: StructType, - wholeTextMode: Boolean): (PartitionedFile) => Iterator[UnsafeRow] = { + textOptions: TextOptions): (PartitionedFile) => Iterator[UnsafeRow] = { (file: PartitionedFile) => { val confValue = conf.value.value - val reader = if (!wholeTextMode) { - new HadoopFileLinesReader(file, confValue) + val reader = if (!textOptions.wholeText) { + new HadoopFileLinesReader(file, textOptions.lineSeparator, confValue) } else { new HadoopFileWholeTextReader(file, confValue) } @@ -152,9 +153,12 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { class TextOutputWriter( path: String, dataSchema: StructType, + lineSeparator: String, context: TaskAttemptContext) extends OutputWriter { + private val lineSep = lineSeparator.getBytes(StandardCharsets.UTF_8) + private val writer = CodecStreams.createOutputStream(context, new Path(path)) override def write(row: InternalRow): Unit = { @@ -162,7 +166,7 @@ class TextOutputWriter( val utf8string = row.getUTF8String(0) utf8string.writeTo(writer) } - writer.write('\n') + writer.write(lineSep) } override def close(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index 2a661561ab51e..877580988af3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val LINE_SEPARATOR = "lineSep" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index c393dcdfdd7e5..e37226c363da2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -387,7 +387,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * Loads text files and returns a `DataFrame` whose schema starts with a string column named * "value", and followed by partitioned columns if there are any. * - * Each line in the text files is a new row in the resulting DataFrame. For example: + * By default, each line in the text files is a new row in the resulting DataFrame. For example: * {{{ * // Scala: * spark.readStream.text("/path/to/directory/") @@ -400,6 +400,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * * * @since 2.0.0 @@ -413,7 +417,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * If the directory structure of the text files contains partitioning information, those are * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. * - * Each line in the text file is a new element in the resulting Dataset. For example: + * By default, each line in the text file is a new element in the resulting Dataset. For example: * {{{ * // Scala: * spark.readStream.textFile("/path/to/spark/README.md") @@ -426,6 +430,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * * * @param path input path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 33287044f279e..9c8fa564b8648 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.datasources.text import java.io.File +import java.nio.charset.StandardCharsets +import java.nio.file.Files import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec @@ -172,6 +174,43 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + def testLineSeparator(lineSep: String): Unit = { + test(s"SPARK-23577: Support line separator - lineSep: '$lineSep'") { + // Read + val values = Seq("a", "b", "\nc") + val data = values.mkString(lineSep) + val dataWithTrailingLineSep = s"$data$lineSep" + Seq(data, dataWithTrailingLineSep).foreach { lines => + withTempPath { path => + Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) + val df = spark.read.option("lineSep", lineSep).text(path.getAbsolutePath) + checkAnswer(df, Seq("a", "b", "\nc").toDF()) + } + } + + // Write + withTempPath { path => + values.toDF().coalesce(1) + .write.option("lineSep", lineSep).text(path.getAbsolutePath) + val partFile = Utils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head + val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8) + assert(readBack === s"a${lineSep}b${lineSep}\nc${lineSep}") + } + + // Roundtrip + withTempPath { path => + val df = values.toDF() + df.write.option("lineSep", lineSep).text(path.getAbsolutePath) + val readBack = spark.read.option("lineSep", lineSep).text(path.getAbsolutePath) + checkAnswer(df, readBack) + } + } + } + + Seq("|", "^", "::", "!!!@3").foreach { lineSep => + testLineSeparator(lineSep) + } + private def testFile: String = { Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString } From 84d2cef822556b45d39f56f242677b8bd05e42ad Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 6 Mar 2018 19:59:00 +0900 Subject: [PATCH 2/7] Address comments --- python/pyspark/sql/readwriter.py | 2 +- python/pyspark/sql/streaming.py | 2 +- .../apache/spark/sql/DataFrameReader.scala | 4 ++-- .../datasources/HadoopFileLinesReader.scala | 19 +++++++++++-------- .../datasources/text/TextFileFormat.scala | 4 ++-- .../datasources/text/TextOptions.scala | 6 ++++-- .../sql/streaming/DataStreamReader.scala | 4 ++-- .../datasources/text/TextSuite.scala | 2 +- 8 files changed, 24 insertions(+), 19 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 926014ad89be5..df56bb762863a 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -315,7 +315,7 @@ def text(self, paths, wholetext=False, lineSep=None): :param paths: string, or list of strings, for input path(s). :param wholetext: if true, read each file from input path(s) as a single row. :param lineSep: defines the line separator that should be used for parsing. If None is - set, it uses ``\\n`` by default, covering ``\\r``, ``\\r\\n`` and ``\\n``. + set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. >>> df = spark.read.text('python/test_support/sql/text-test.txt') >>> df.collect() diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 34abd33926609..07f9ac1b5aa9e 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -544,7 +544,7 @@ def text(self, path, wholetext=False, lineSep=None): :param paths: string, or list of strings, for input path(s). :param wholetext: if true, read each file from input path(s) as a single row. :param lineSep: defines the line separator that should be used for parsing. If None is - set, it uses ``\\n`` by default, covering ``\\r``, ``\\r\\n`` and ``\\n``. + set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. >>> text_sdf = spark.readStream.text(tempfile.mkdtemp()) >>> text_sdf.isStreaming diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ef772fbe89b03..1a5e47508c070 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -660,7 +660,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * @@ -700,7 +700,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala index 707e3351f67c3..345245fa75e22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import java.io.Closeable import java.net.URI +import java.nio.charset.StandardCharsets import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -32,16 +33,16 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl * in that file. * * @param file A part (i.e. "block") of a single file that should be read line by line. - * @param lineSeparator A line separator that should be used for each line. If the value is `\n`, + * @param lineSeparator A line separator that should be used for each line. If the value is `None`, * it covers `\r`, `\r\n` and `\n`. * @param conf Hadoop configuration */ class HadoopFileLinesReader( file: PartitionedFile, - lineSeparator: String, + lineSeparator: Option[String], conf: Configuration) extends Iterator[Text] with Closeable { - def this(file: PartitionedFile, conf: Configuration) = this(file, "\n", conf) + def this(file: PartitionedFile, conf: Configuration) = this(file, None, conf) private val iterator = { val fileSplit = new FileSplit( @@ -52,12 +53,14 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - val reader = if (lineSeparator != "\n") { - new LineRecordReader(lineSeparator.getBytes("UTF-8")) - } else { - // This behavior follows Hive. `\n` covers `\r`, `\r\n` and `\n`. - new LineRecordReader() + val reader = lineSeparator match { + case Some(sep) => + new LineRecordReader(sep.getBytes(StandardCharsets.UTF_8)) + case _ => + // If the line separator is `None`, it covers `\r`, `\r\n` and `\n`. + new LineRecordReader() } + reader.initialize(fileSplit, hadoopAttemptContext) new RecordReaderIterator(reader) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 8a6ec5d870383..63821aff2e51f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -153,11 +153,11 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { class TextOutputWriter( path: String, dataSchema: StructType, - lineSeparator: String, + lineSeparator: Option[String], context: TaskAttemptContext) extends OutputWriter { - private val lineSep = lineSeparator.getBytes(StandardCharsets.UTF_8) + private val lineSep = lineSeparator.getOrElse("\n").getBytes(StandardCharsets.UTF_8) private val writer = CodecStreams.createOutputStream(context, new Path(path)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index 877580988af3e..605dbbe5f141a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -39,8 +39,10 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean - val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") - require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") + val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { sep => + require(sep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") + sep + } } private[text] object TextOptions { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index e37226c363da2..9b17406a816b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -402,7 +402,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * considered in every trigger. *
  • `wholetext` (default `false`): If true, read a file as a single row and not split by "\n". *
  • - *
  • `lineSep` (default is `\n`, covering `\r`, `\r\n` and `\n`): defines the line separator + *
  • `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator * that should be used for parsing.
  • * * @@ -432,7 +432,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * considered in every trigger. *
  • `wholetext` (default `false`): If true, read a file as a single row and not split by "\n". *
  • - *
  • `lineSep` (default is `\n`, covering `\r`, `\r\n` and `\n`): defines the line separator + *
  • `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator * that should be used for parsing.
  • * * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 9c8fa564b8648..4bb96cda7de25 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -207,7 +207,7 @@ class TextSuite extends QueryTest with SharedSQLContext { } } - Seq("|", "^", "::", "!!!@3").foreach { lineSep => + Seq("|", "^", "::", "!!!@3", 0x1E.toChar.toString).foreach { lineSep => testLineSeparator(lineSep) } From b5c82469b2ed94bdb69dd936ad5b2d98a17d9dd6 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 6 Mar 2018 20:03:10 +0900 Subject: [PATCH 3/7] Fix a nit --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e3e21d025f0ce..bb93889dc55e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -587,7 +587,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `compression` (default `null`): compression codec to use when saving to file. This can be * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, * `snappy` and `deflate`).
  • - *
  • `lineSep` (default is `\n`): defines the line separator that should + *
  • `lineSep` (default `\n`): defines the line separator that should * be used for writing.
  • * * From 6f1d3177fa10e98908950f2c0bddea16f390a99b Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 11 Mar 2018 12:33:13 +0900 Subject: [PATCH 4/7] Address comment --- .../datasources/HadoopFileLinesReader.scala | 11 +++++------ .../datasources/text/TextFileFormat.scala | 14 ++++---------- .../execution/datasources/text/TextOptions.scala | 9 ++++++++- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala index 345245fa75e22..ba46d4980333a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala @@ -39,7 +39,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl */ class HadoopFileLinesReader( file: PartitionedFile, - lineSeparator: Option[String], + lineSeparator: Option[Array[Byte]], conf: Configuration) extends Iterator[Text] with Closeable { def this(file: PartitionedFile, conf: Configuration) = this(file, None, conf) @@ -53,12 +53,11 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + val reader = lineSeparator match { - case Some(sep) => - new LineRecordReader(sep.getBytes(StandardCharsets.UTF_8)) - case _ => - // If the line separator is `None`, it covers `\r`, `\r\n` and `\n`. - new LineRecordReader() + case Some(sep) => new LineRecordReader(sep) + // If the line separator is `None`, it covers `\r`, `\r\n` and `\n`. + case _ => new LineRecordReader() } reader.initialize(fileSplit, hadoopAttemptContext) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 63821aff2e51f..9647f09867643 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -17,12 +17,8 @@ package org.apache.spark.sql.execution.datasources.text -import java.io.Closeable -import java.nio.charset.StandardCharsets - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.TaskContext @@ -90,7 +86,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new TextOutputWriter(path, dataSchema, textOptions.lineSeparator, context) + new TextOutputWriter(path, dataSchema, textOptions.lineSeparatorInWrite, context) } override def getFileExtension(context: TaskAttemptContext): String = { @@ -125,7 +121,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { (file: PartitionedFile) => { val confValue = conf.value.value val reader = if (!textOptions.wholeText) { - new HadoopFileLinesReader(file, textOptions.lineSeparator, confValue) + new HadoopFileLinesReader(file, textOptions.lineSeparatorInRead, confValue) } else { new HadoopFileWholeTextReader(file, confValue) } @@ -153,12 +149,10 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { class TextOutputWriter( path: String, dataSchema: StructType, - lineSeparator: Option[String], + lineSeparator: Array[Byte], context: TaskAttemptContext) extends OutputWriter { - private val lineSep = lineSeparator.getOrElse("\n").getBytes(StandardCharsets.UTF_8) - private val writer = CodecStreams.createOutputStream(context, new Path(path)) override def write(row: InternalRow): Unit = { @@ -166,7 +160,7 @@ class TextOutputWriter( val utf8string = row.getUTF8String(0) utf8string.writeTo(writer) } - writer.write(lineSep) + writer.write(lineSeparator) } override def close(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index 605dbbe5f141a..18698df9fd8e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.text +import java.nio.charset.StandardCharsets + import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs} /** @@ -39,10 +41,15 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean - val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { sep => + private val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { sep => require(sep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") sep } + // Note that the option 'lineSep' uses a different default value in read and write. + val lineSeparatorInRead: Option[Array[Byte]] = + lineSeparator.map(_.getBytes(StandardCharsets.UTF_8)) + val lineSeparatorInWrite: Array[Byte] = + lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8)) } private[text] object TextOptions { From 4835632cfe4d8fe9a3071ba7dd4f0e8e481f4a44 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 11 Mar 2018 12:35:37 +0900 Subject: [PATCH 5/7] Remove unused import --- .../spark/sql/execution/datasources/HadoopFileLinesReader.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala index ba46d4980333a..51d251aa507b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources import java.io.Closeable import java.net.URI -import java.nio.charset.StandardCharsets import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path From d6e91604585b22a27fbd0b7caa0a8e96d3725400 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 11 Mar 2018 13:10:23 +0900 Subject: [PATCH 6/7] Resolve the diff with master (Utils to TestUtils) --- .../spark/sql/execution/datasources/text/TextSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 4bb96cda7de25..e8a5299d6ba9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -24,6 +24,7 @@ import java.nio.file.Files import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec +import org.apache.spark.TestUtils import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -192,7 +193,7 @@ class TextSuite extends QueryTest with SharedSQLContext { withTempPath { path => values.toDF().coalesce(1) .write.option("lineSep", lineSep).text(path.getAbsolutePath) - val partFile = Utils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head + val partFile = TestUtils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8) assert(readBack === s"a${lineSep}b${lineSep}\nc${lineSep}") } From f1c951f0c84e334e185a0bcc810c08d48ca726e8 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 21 Mar 2018 14:27:33 +0900 Subject: [PATCH 7/7] Add a comment that the default behaviour of line separator is from Hadoop --- .../sql/execution/datasources/HadoopFileLinesReader.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala index 51d251aa507b1..00a78f7343c59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala @@ -35,6 +35,9 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl * @param lineSeparator A line separator that should be used for each line. If the value is `None`, * it covers `\r`, `\r\n` and `\n`. * @param conf Hadoop configuration + * + * @note The behavior when `lineSeparator` is `None` (covering `\r`, `\r\n` and `\n`) is defined + * by [[LineRecordReader]], not within Spark. */ class HadoopFileLinesReader( file: PartitionedFile,