From 4c46f4b97b062cba06d5b9bb7987b3606b0dd4dc Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 16 Mar 2016 15:34:29 +0900 Subject: [PATCH 01/12] Parse modes in JSON data source --- python/pyspark/sql/readwriter.py | 3 ++ .../apache/spark/sql/DataFrameReader.scala | 3 ++ .../execution/datasources/ParseModes.scala | 41 +++++++++++++++++++ .../datasources/csv/CSVOptions.scala | 27 +----------- .../datasources/json/JSONOptions.scala | 15 ++++++- .../datasources/json/JacksonParser.scala | 22 ++++++---- .../datasources/json/JsonSuite.scala | 20 ++++++++- 7 files changed, 96 insertions(+), 35 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 438662bb157f0..fd51a77ed3807 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -162,6 +162,9 @@ def json(self, path, schema=None): (e.g. 00012) * ``allowBackslashEscapingAnyCharacter`` (default ``false``): allows accepting quoting \ of all character using backslash quoting mechanism + * ``mode`` (default ``PERMISSIVE``): allows a mode for dealing with corrupt records \ + during parsing. When fails to parse, ``PERMISSIVE`` mode sets ``null``, \ + ``DROPMALFORMED`` drops the record and ``FAILFAST`` throws an exception. >>> df1 = sqlContext.read.json('python/test_support/sql/people.json') >>> df1.dtypes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ef85f1db895cd..751680996097d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -288,6 +288,9 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * *
  • `allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers * (e.g. 00012)
  • + *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records + * during parsing. When fails to parse, `PERMISSIVE` mode sets `null`, `DROPMALFORMED` drops the + * record and `FAILFAST` throws an exception.
  • * * @since 1.4.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala new file mode 100644 index 0000000000000..468228053c964 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +private[datasources] object ParseModes { + val PERMISSIVE_MODE = "PERMISSIVE" + val DROP_MALFORMED_MODE = "DROPMALFORMED" + val FAIL_FAST_MODE = "FAILFAST" + + val DEFAULT = PERMISSIVE_MODE + + def isValidMode(mode: String): Boolean = { + mode.toUpperCase match { + case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true + case _ => false + } + } + + def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE + def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE + def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) { + mode.toUpperCase == PERMISSIVE_MODE + } else { + true // We default to permissive is the mode string is not valid + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 6a0290c11228f..cb7fe7948e2ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.StandardCharsets import org.apache.spark.Logging -import org.apache.spark.sql.execution.datasources.CompressionCodecs +import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes} private[sql] class CSVOptions( @transient private val parameters: Map[String, String]) @@ -62,7 +62,7 @@ private[sql] class CSVOptions( val delimiter = CSVTypeCast.toChar( parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) - val parseMode = parameters.getOrElse("mode", "PERMISSIVE") + private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") val charset = parameters.getOrElse("encoding", parameters.getOrElse("charset", StandardCharsets.UTF_8.name())) @@ -101,26 +101,3 @@ private[sql] class CSVOptions( val rowSeparator = "\n" } - -private[csv] object ParseModes { - val PERMISSIVE_MODE = "PERMISSIVE" - val DROP_MALFORMED_MODE = "DROPMALFORMED" - val FAIL_FAST_MODE = "FAILFAST" - - val DEFAULT = PERMISSIVE_MODE - - def isValidMode(mode: String): Boolean = { - mode.toUpperCase match { - case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true - case _ => false - } - } - - def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE - def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE - def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) { - mode.toUpperCase == PERMISSIVE_MODE - } else { - true // We default to permissive is the mode string is not valid - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala index e59dbd6b3d438..2b83a3f98d09b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.datasources.json import com.fasterxml.jackson.core.{JsonFactory, JsonParser} -import org.apache.spark.sql.execution.datasources.CompressionCodecs +import org.apache.spark.Logging +import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes} /** * Options for the JSON data source. @@ -28,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.CompressionCodecs */ private[sql] class JSONOptions( @transient private val parameters: Map[String, String]) - extends Serializable { + extends Logging with Serializable { val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) @@ -49,6 +50,16 @@ private[sql] class JSONOptions( val allowBackslashEscapingAnyCharacter = parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false) val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) + private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") + + // Parse mode flags + if (!ParseModes.isValidMode(parseMode)) { + logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.") + } + + val failFast = ParseModes.isFailFastMode(parseMode) + val dropMalformed = ParseModes.isDropMalformedMode(parseMode) + val permissive = ParseModes.isPermissiveMode(parseMode) /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index b2f5c1e96421d..f7ecd894e6090 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import com.fasterxml.jackson.core._ import org.apache.spark.rdd.RDD +import org.apache.spark.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ @@ -34,7 +35,7 @@ import org.apache.spark.util.Utils private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg) -object JacksonParser { +object JacksonParser extends Logging { def parse( input: RDD[String], @@ -244,13 +245,20 @@ object JacksonParser { def failedRecord(record: String): Seq[InternalRow] = { // create a row even if no corrupt record column is present - val row = new GenericMutableRow(schema.length) - for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) { - require(schema(corruptIndex).dataType == StringType) - row.update(corruptIndex, UTF8String.fromString(record)) + if (configOptions.failFast) { + throw new RuntimeException(s"Malformed line in FAILFAST mode: $record") + } + if (configOptions.dropMalformed) { + logWarning(s"Dropping malformed line: $record") + Nil + } else { + val row = new GenericMutableRow(schema.length) + for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) { + require(schema(corruptIndex).dataType == StringType) + row.update(corruptIndex, UTF8String.fromString(record)) + } + Seq(row) } - - Seq(row) } val factory = new JsonFactory() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 4671b2dca9940..27e60d192662e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -27,8 +27,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.scalactic.Tolerance._ +import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -969,6 +969,14 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { withTempTable("jsonTable") { val jsonDF = sqlContext.read.json(corruptRecords) jsonDF.registerTempTable("jsonTable") + val jsonDFWithDropMalformed = + sqlContext.read.option("mode", "DROPMALFORMED").json(corruptRecords) + jsonDFWithDropMalformed.registerTempTable("jsonTableWithDropMalformed") + val exception = intercept[SparkException]{ + sqlContext.read.option("mode", "FAILFAST").json(corruptRecords).collect() + } + assert(exception.getMessage.contains("Malformed line in FAILFAST mode: {")) + val schema = StructType( StructField("_unparsed", StringType, true) :: StructField("a", StringType, true) :: @@ -991,6 +999,16 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Row(null, null, null, "]") :: Nil ) + // Check if corrupt records are dropped. + checkAnswer( + sql( + """ + |SELECT a, b, c, _unparsed + |FROM jsonTableWithDropMalformed + """.stripMargin), + Row("str_a_4", "str_b_4", "str_c_4", null) :: Nil + ) + checkAnswer( sql( """ From 3675faee2450677d56720afb8fb744d7482a99dc Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 16 Mar 2016 17:21:36 +0900 Subject: [PATCH 02/12] Separate tests for parse modes --- .../datasources/json/JsonSuite.scala | 43 +++++++++++-------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 27e60d192662e..d9a4a66356a3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -963,20 +963,37 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) } + test("SPARK-13764 Parse modes in JSON data source") { + withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { + // `FAILFAST` mode should throw an exception for corrupt records. + val exception = intercept[SparkException] { + sqlContext.read.option("mode", "FAILFAST").json(corruptRecords).collect() + } + assert(exception.getMessage.contains("Malformed line in FAILFAST mode: {")) + + // `DROPMALFORMED` mode should skip corrupt records + // For `PERMISSIVE` mode, it is tested in "Corrupt records" test. + val jsonDF = sqlContext.read.option("mode", "DROPMALFORMED").json(corruptRecords) + val schema = StructType( + StructField("_unparsed", StringType, true) :: + StructField("a", StringType, true) :: + StructField("b", StringType, true) :: + StructField("c", StringType, true) :: Nil) + assert(schema === jsonDF.schema) + + checkAnswer( + jsonDF, + Row(null, "str_a_4", "str_b_4", "str_c_4") :: Nil + ) + } + } + test("Corrupt records") { // Test if we can query corrupt records. withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { withTempTable("jsonTable") { val jsonDF = sqlContext.read.json(corruptRecords) jsonDF.registerTempTable("jsonTable") - val jsonDFWithDropMalformed = - sqlContext.read.option("mode", "DROPMALFORMED").json(corruptRecords) - jsonDFWithDropMalformed.registerTempTable("jsonTableWithDropMalformed") - val exception = intercept[SparkException]{ - sqlContext.read.option("mode", "FAILFAST").json(corruptRecords).collect() - } - assert(exception.getMessage.contains("Malformed line in FAILFAST mode: {")) - val schema = StructType( StructField("_unparsed", StringType, true) :: StructField("a", StringType, true) :: @@ -999,16 +1016,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Row(null, null, null, "]") :: Nil ) - // Check if corrupt records are dropped. - checkAnswer( - sql( - """ - |SELECT a, b, c, _unparsed - |FROM jsonTableWithDropMalformed - """.stripMargin), - Row("str_a_4", "str_b_4", "str_c_4", null) :: Nil - ) - checkAnswer( sql( """ From 87c3251fe379b8f2020334fee121518ac62b69a1 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 16 Mar 2016 18:33:22 +0900 Subject: [PATCH 03/12] Do not infer _corrupt_record when the mode is DROPMALFORMED --- .../datasources/json/InferSchema.scala | 26 ++++++++----- .../datasources/json/JsonSuite.scala | 37 +++++++++---------- 2 files changed, 33 insertions(+), 30 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index 0937a213c984f..72740b27b9993 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -40,6 +40,7 @@ private[sql] object InferSchema { configOptions: JSONOptions): StructType = { require(configOptions.samplingRatio > 0, s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0") + val shouldHandleCorruptRecord = !configOptions.dropMalformed val schemaData = if (configOptions.samplingRatio > 0.99) { json } else { @@ -50,21 +51,23 @@ private[sql] object InferSchema { val rootType = schemaData.mapPartitions { iter => val factory = new JsonFactory() configOptions.setJacksonOptions(factory) - iter.map { row => + iter.flatMap { row => try { Utils.tryWithResource(factory.createParser(row)) { parser => parser.nextToken() - inferField(parser, configOptions) + Some(inferField(parser, configOptions)) } } catch { + case _: JsonParseException if shouldHandleCorruptRecord => + Some(StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))) case _: JsonParseException => - StructType(Seq(StructField(columnNameOfCorruptRecords, StringType))) + None } } }.treeAggregate[DataType]( StructType(Seq()))( - compatibleRootType(columnNameOfCorruptRecords), - compatibleRootType(columnNameOfCorruptRecords)) + compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord), + compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)) canonicalizeType(rootType) match { case Some(st: StructType) => st @@ -194,18 +197,21 @@ private[sql] object InferSchema { * Remove top-level ArrayType wrappers and merge the remaining schemas */ private def compatibleRootType( - columnNameOfCorruptRecords: String): (DataType, DataType) => DataType = { + columnNameOfCorruptRecords: String, + shouldHandleCorruptRecord: Boolean): (DataType, DataType) => DataType = { // Since we support array of json objects at the top level, // we need to check the element type and find the root level data type. - case (ArrayType(ty1, _), ty2) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2) - case (ty1, ArrayType(ty2, _)) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2) + case (ArrayType(ty1, _), ty2) => + compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)(ty1, ty2) + case (ty1, ArrayType(ty2, _)) => + compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)(ty1, ty2) // If we see any other data type at the root level, we get records that cannot be // parsed. So, we use the struct as the data type and add the corrupt field to the schema. case (struct: StructType, NullType) => struct case (NullType, struct: StructType) => struct - case (struct: StructType, o) if !o.isInstanceOf[StructType] => + case (struct: StructType, o) if !o.isInstanceOf[StructType] && shouldHandleCorruptRecord => withCorruptField(struct, columnNameOfCorruptRecords) - case (o, struct: StructType) if !o.isInstanceOf[StructType] => + case (o, struct: StructType) if !o.isInstanceOf[StructType] && shouldHandleCorruptRecord => withCorruptField(struct, columnNameOfCorruptRecords) // If we get anything else, we call compatibleType. // Usually, when we reach here, ty1 and ty2 are two StructTypes. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index d9a4a66356a3a..f80ae4a848441 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -964,28 +964,25 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-13764 Parse modes in JSON data source") { - withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { - // `FAILFAST` mode should throw an exception for corrupt records. - val exception = intercept[SparkException] { - sqlContext.read.option("mode", "FAILFAST").json(corruptRecords).collect() - } - assert(exception.getMessage.contains("Malformed line in FAILFAST mode: {")) + // `FAILFAST` mode should throw an exception for corrupt records. + val exception = intercept[SparkException] { + sqlContext.read.option("mode", "FAILFAST").json(corruptRecords).collect() + } + assert(exception.getMessage.contains("Malformed line in FAILFAST mode: {")) - // `DROPMALFORMED` mode should skip corrupt records - // For `PERMISSIVE` mode, it is tested in "Corrupt records" test. - val jsonDF = sqlContext.read.option("mode", "DROPMALFORMED").json(corruptRecords) - val schema = StructType( - StructField("_unparsed", StringType, true) :: - StructField("a", StringType, true) :: - StructField("b", StringType, true) :: - StructField("c", StringType, true) :: Nil) - assert(schema === jsonDF.schema) + // `DROPMALFORMED` mode should skip corrupt records + // For `PERMISSIVE` mode, it is tested in "Corrupt records" test. + val jsonDF = sqlContext.read.option("mode", "DROPMALFORMED").json(corruptRecords) + val schema = StructType( + StructField("a", StringType, true) :: + StructField("b", StringType, true) :: + StructField("c", StringType, true) :: Nil) + assert(schema === jsonDF.schema) - checkAnswer( - jsonDF, - Row(null, "str_a_4", "str_b_4", "str_c_4") :: Nil - ) - } + checkAnswer( + jsonDF, + Row("str_a_4", "str_b_4", "str_c_4") :: Nil + ) } test("Corrupt records") { From 4440a5556309bdfeec52f00ef151897bcdb04336 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 16 Mar 2016 19:00:44 +0900 Subject: [PATCH 04/12] Update comments for parse modes. --- python/pyspark/sql/readwriter.py | 9 +++++++-- .../org/apache/spark/sql/DataFrameReader.scala | 17 +++++++++++++++-- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index fd51a77ed3807..43a570683b954 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -163,8 +163,13 @@ def json(self, path, schema=None): * ``allowBackslashEscapingAnyCharacter`` (default ``false``): allows accepting quoting \ of all character using backslash quoting mechanism * ``mode`` (default ``PERMISSIVE``): allows a mode for dealing with corrupt records \ - during parsing. When fails to parse, ``PERMISSIVE`` mode sets ``null``, \ - ``DROPMALFORMED`` drops the record and ``FAILFAST`` throws an exception. + during parsing. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record and puts the malformed string into a new field configured by \ + ``spark.sql.columnNameOfCorruptRecord``. When a schema is set by user, it sets \ + ``null`` for extra fields. + * ``DROPMALFORMED`` : ignores the whole corrupted records and append. + * ``FAILFAST`` throws an exception when it meets corrupted records. >>> df1 = sqlContext.read.json('python/test_support/sql/people.json') >>> df1.dtypes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 751680996097d..6729d1cbb1de1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -289,8 +289,14 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { *
  • `allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers * (e.g. 00012)
  • *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records - * during parsing. When fails to parse, `PERMISSIVE` mode sets `null`, `DROPMALFORMED` drops the - * record and `FAILFAST` throws an exception.
  • + * during parsing.
  • + * * * @since 1.4.0 */ @@ -315,6 +321,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * (e.g. 00012)
  • *
  • `allowBackslashEscapingAnyCharacter` (default `false`): allows accepting quoting of all * character using backslash quoting mechanism
  • + * * * @since 1.6.0 */ From 32ae8b2b2f7eeff8233218edd281338923106948 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 16 Mar 2016 19:03:32 +0900 Subject: [PATCH 05/12] Add the missing description --- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 6729d1cbb1de1..bb6bbf02d589b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -321,6 +321,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * (e.g. 00012) *
  • `allowBackslashEscapingAnyCharacter` (default `false`): allows accepting quoting of all * character using backslash quoting mechanism
  • + *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records + * during parsing.
  • *
      *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the * malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When From de8d291c393fc5dd6182c2cf9e2675889c3cd796 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 17 Mar 2016 09:30:15 +0900 Subject: [PATCH 06/12] Update comments and add some more tests. --- python/pyspark/sql/readwriter.py | 2 +- .../apache/spark/sql/DataFrameReader.scala | 4 +- .../datasources/json/InferSchema.scala | 2 +- .../datasources/json/JsonSuite.scala | 44 ++++++++++++++----- 4 files changed, 37 insertions(+), 15 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 43a570683b954..c0e51bd2d2354 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -168,7 +168,7 @@ def json(self, path, schema=None): record and puts the malformed string into a new field configured by \ ``spark.sql.columnNameOfCorruptRecord``. When a schema is set by user, it sets \ ``null`` for extra fields. - * ``DROPMALFORMED`` : ignores the whole corrupted records and append. + * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` throws an exception when it meets corrupted records. >>> df1 = sqlContext.read.json('python/test_support/sql/people.json') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index bb6bbf02d589b..70fb58444fba8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -294,7 +294,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the * malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When * a schema is set by user, it sets `null` for extra fields.
    • - *
    • `DROPMALFORMED` : ignores the whole corrupted records and append.
    • + *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • *
    • `FAILFAST` throws an exception when it meets corrupted records.
    • *
    * @@ -327,7 +327,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { *
  • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the * malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When * a schema is set by user, it sets `null` for extra fields.
  • - *
  • `DROPMALFORMED` : ignores the whole corrupted records and append.
  • + *
  • `DROPMALFORMED` : ignores the whole corrupted records.
  • *
  • `FAILFAST` throws an exception when it meets corrupted records.
  • * * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index 72740b27b9993..945ed2c2113d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -40,7 +40,7 @@ private[sql] object InferSchema { configOptions: JSONOptions): StructType = { require(configOptions.samplingRatio > 0, s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0") - val shouldHandleCorruptRecord = !configOptions.dropMalformed + val shouldHandleCorruptRecord = configOptions.permissive val schemaData = if (configOptions.samplingRatio > 0.99) { json } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index f80ae4a848441..334b76cf4e265 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -964,25 +964,47 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-13764 Parse modes in JSON data source") { - // `FAILFAST` mode should throw an exception for corrupt records. - val exception = intercept[SparkException] { - sqlContext.read.option("mode", "FAILFAST").json(corruptRecords).collect() - } - assert(exception.getMessage.contains("Malformed line in FAILFAST mode: {")) - - // `DROPMALFORMED` mode should skip corrupt records - // For `PERMISSIVE` mode, it is tested in "Corrupt records" test. - val jsonDF = sqlContext.read.option("mode", "DROPMALFORMED").json(corruptRecords) val schema = StructType( StructField("a", StringType, true) :: StructField("b", StringType, true) :: StructField("c", StringType, true) :: Nil) - assert(schema === jsonDF.schema) + val malformedSchema = StructType( + StructField("a", StringType, true) :: Nil) + + // `FAILFAST` mode should throw an exception for corrupt records. + val exceptionOne = intercept[SparkException] { + sqlContext.read + .option("mode", "FAILFAST") + .json(corruptRecords) + .collect() + } + assert(exceptionOne.getMessage.contains("Malformed line in FAILFAST mode: {")) + val exceptionTwo = intercept[SparkException] { + sqlContext.read + .option("mode", "FAILFAST") + .schema(malformedSchema) + .json(corruptRecords) + .collect() + } + assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode: {")) + + // `DROPMALFORMED` mode should skip corrupt records + // For `PERMISSIVE` mode, it is tested in "Corrupt records" test. + val jsonDFOne = sqlContext.read + .option("mode", "DROPMALFORMED") + .json(corruptRecords) checkAnswer( - jsonDF, + jsonDFOne, Row("str_a_4", "str_b_4", "str_c_4") :: Nil ) + val jsonDFTwo = sqlContext.read + .option("mode", "DROPMALFORMED") + .schema(malformedSchema) + .json(corruptRecords) + checkAnswer( + jsonDFTwo, + Row("str_a_4") :: Nil) } test("Corrupt records") { From a563534a958c5f3e3faf31ba95da822c30d284c6 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 17 Mar 2016 10:15:03 +0900 Subject: [PATCH 07/12] Correct missing ":" for comments and add missing schema comparison tests --- python/pyspark/sql/readwriter.py | 2 +- .../sql/execution/datasources/json/JsonSuite.scala | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index c0e51bd2d2354..e3f883f89a938 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -169,7 +169,7 @@ def json(self, path, schema=None): ``spark.sql.columnNameOfCorruptRecord``. When a schema is set by user, it sets \ ``null`` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. - * ``FAILFAST`` throws an exception when it meets corrupted records. + * ``FAILFAST``: throws an exception when it meets corrupted records. >>> df1 = sqlContext.read.json('python/test_support/sql/people.json') >>> df1.dtypes diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 334b76cf4e265..e0558b3bad623 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -964,12 +964,12 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-13764 Parse modes in JSON data source") { - val schema = StructType( + val schemaOne = StructType( StructField("a", StringType, true) :: StructField("b", StringType, true) :: StructField("c", StringType, true) :: Nil) - val malformedSchema = StructType( + val schemaTwo = StructType( StructField("a", StringType, true) :: Nil) // `FAILFAST` mode should throw an exception for corrupt records. @@ -983,7 +983,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val exceptionTwo = intercept[SparkException] { sqlContext.read .option("mode", "FAILFAST") - .schema(malformedSchema) + .schema(schemaTwo) .json(corruptRecords) .collect() } @@ -998,13 +998,16 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { jsonDFOne, Row("str_a_4", "str_b_4", "str_c_4") :: Nil ) + assert(jsonDFOne.schema === schemaOne) + val jsonDFTwo = sqlContext.read .option("mode", "DROPMALFORMED") - .schema(malformedSchema) + .schema(schemaTwo) .json(corruptRecords) checkAnswer( jsonDFTwo, Row("str_a_4") :: Nil) + assert(jsonDFTwo.schema === schemaTwo) } test("Corrupt records") { From 29a8f68cc5b3ba686f56ffc664766920eb3c2824 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 17 Mar 2016 10:16:03 +0900 Subject: [PATCH 08/12] Extra space in comments --- python/pyspark/sql/readwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index e3f883f89a938..bae9e69df8e2b 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -169,7 +169,7 @@ def json(self, path, schema=None): ``spark.sql.columnNameOfCorruptRecord``. When a schema is set by user, it sets \ ``null`` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. - * ``FAILFAST``: throws an exception when it meets corrupted records. + * ``FAILFAST`` : throws an exception when it meets corrupted records. >>> df1 = sqlContext.read.json('python/test_support/sql/people.json') >>> df1.dtypes From 551593a96edacc731f4e76e1fc3c2ec9327220f2 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 17 Mar 2016 10:17:12 +0900 Subject: [PATCH 09/12] Add extra ":" for scala one as well --- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 70fb58444fba8..cb2b76ba2c4cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -295,7 +295,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When * a schema is set by user, it sets `null` for extra fields. *
  • `DROPMALFORMED` : ignores the whole corrupted records.
  • - *
  • `FAILFAST` throws an exception when it meets corrupted records.
  • + *
  • `FAILFAST` : throws an exception when it meets corrupted records.
  • * * * @since 1.4.0 @@ -328,7 +328,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When * a schema is set by user, it sets `null` for extra fields. *
  • `DROPMALFORMED` : ignores the whole corrupted records.
  • - *
  • `FAILFAST` throws an exception when it meets corrupted records.
  • + *
  • `FAILFAST` : throws an exception when it meets corrupted records.
  • * * * @since 1.6.0 From 59e72142dd801fb5c8266785153498ae4c67d5e6 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 18 Mar 2016 14:31:00 +0900 Subject: [PATCH 10/12] Change Logging to internal.Logging --- .../spark/sql/execution/datasources/json/JSONOptions.scala | 2 +- .../spark/sql/execution/datasources/json/JacksonParser.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala index 2b83a3f98d09b..93c3d47c1dcf4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.json import com.fasterxml.jackson.core.{JsonFactory, JsonParser} -import org.apache.spark.Logging +import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes} /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index f7c5aed84a23e..dc12480a1cd0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer import com.fasterxml.jackson.core._ import org.apache.spark.rdd.RDD -import org.apache.spark.Logging +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ From 3ff900ec904991e79bf6267c16ee38dfc15660be Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 18 Mar 2016 14:37:46 +0900 Subject: [PATCH 11/12] Reorder imports --- .../spark/sql/execution/datasources/json/JacksonParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index dc12480a1cd0f..00c14adf0704b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -23,8 +23,8 @@ import scala.collection.mutable.ArrayBuffer import com.fasterxml.jackson.core._ -import org.apache.spark.rdd.RDD import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ From dec3d8167c862385a489b999feb7af6c03316cfa Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 21 Mar 2016 15:00:34 +0900 Subject: [PATCH 12/12] Separate tests for each mode --- .../datasources/json/JsonSuite.scala | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0e8080fbbbe31..0a5699b99cf0e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -964,15 +964,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) } - test("SPARK-13764 Parse modes in JSON data source") { - val schemaOne = StructType( - StructField("a", StringType, true) :: - StructField("b", StringType, true) :: - StructField("c", StringType, true) :: Nil) - - val schemaTwo = StructType( + test("Corrupt records: FAILFAST mode") { + val schema = StructType( StructField("a", StringType, true) :: Nil) - // `FAILFAST` mode should throw an exception for corrupt records. val exceptionOne = intercept[SparkException] { sqlContext.read @@ -981,17 +975,25 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .collect() } assert(exceptionOne.getMessage.contains("Malformed line in FAILFAST mode: {")) + val exceptionTwo = intercept[SparkException] { sqlContext.read .option("mode", "FAILFAST") - .schema(schemaTwo) + .schema(schema) .json(corruptRecords) .collect() } assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode: {")) + } + test("Corrupt records: DROPMALFORMED mode") { + val schemaOne = StructType( + StructField("a", StringType, true) :: + StructField("b", StringType, true) :: + StructField("c", StringType, true) :: Nil) + val schemaTwo = StructType( + StructField("a", StringType, true) :: Nil) // `DROPMALFORMED` mode should skip corrupt records - // For `PERMISSIVE` mode, it is tested in "Corrupt records" test. val jsonDFOne = sqlContext.read .option("mode", "DROPMALFORMED") .json(corruptRecords) @@ -1011,7 +1013,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(jsonDFTwo.schema === schemaTwo) } - test("Corrupt records") { + test("Corrupt records: PERMISSIVE mode") { // Test if we can query corrupt records. withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { withTempTable("jsonTable") {