Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13764][SQL] Parse modes in JSON data source #11756

Closed
wants to merge 13 commits into from
8 changes: 8 additions & 0 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ def json(self, path, schema=None):
(e.g. 00012)
* ``allowBackslashEscapingAnyCharacter`` (default ``false``): allows accepting quoting \
of all character using backslash quoting mechanism
* ``mode`` (default ``PERMISSIVE``): allows a mode for dealing with corrupt records \
during parsing.
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
record and puts the malformed string into a new field configured by \
``spark.sql.columnNameOfCorruptRecord``. When a schema is set by user, it sets \
``null`` for extra fields.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.

>>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
Expand Down
18 changes: 18 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* </li>
* <li>`allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers
* (e.g. 00012)</li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing.<li>
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the
* malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When
* a schema is set by user, it sets `null` for extra fields.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
* </ul>
*
* @since 1.4.0
*/
Expand All @@ -313,6 +322,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* (e.g. 00012)</li>
* <li>`allowBackslashEscapingAnyCharacter` (default `false`): allows accepting quoting of all
* character using backslash quoting mechanism</li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing.<li>
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the
* malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When
* a schema is set by user, it sets `null` for extra fields.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
* </ul>
*
* @since 1.6.0
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

private[datasources] object ParseModes {
val PERMISSIVE_MODE = "PERMISSIVE"
val DROP_MALFORMED_MODE = "DROPMALFORMED"
val FAIL_FAST_MODE = "FAILFAST"

val DEFAULT = PERMISSIVE_MODE

def isValidMode(mode: String): Boolean = {
mode.toUpperCase match {
case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
case _ => false
}
}

def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE
def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE
def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) {
mode.toUpperCase == PERMISSIVE_MODE
} else {
true // We default to permissive is the mode string is not valid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we log a warning for this case?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.csv
import java.nio.charset.StandardCharsets

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.CompressionCodecs
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}

private[sql] class CSVOptions(
@transient private val parameters: Map[String, String])
Expand Down Expand Up @@ -62,7 +62,7 @@ private[sql] class CSVOptions(

val delimiter = CSVTypeCast.toChar(
parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val charset = parameters.getOrElse("encoding",
parameters.getOrElse("charset", StandardCharsets.UTF_8.name()))

Expand Down Expand Up @@ -101,26 +101,3 @@ private[sql] class CSVOptions(

val rowSeparator = "\n"
}

private[csv] object ParseModes {
val PERMISSIVE_MODE = "PERMISSIVE"
val DROP_MALFORMED_MODE = "DROPMALFORMED"
val FAIL_FAST_MODE = "FAILFAST"

val DEFAULT = PERMISSIVE_MODE

def isValidMode(mode: String): Boolean = {
mode.toUpperCase match {
case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
case _ => false
}
}

def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE
def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE
def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) {
mode.toUpperCase == PERMISSIVE_MODE
} else {
true // We default to permissive is the mode string is not valid
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private[sql] object InferSchema {
configOptions: JSONOptions): StructType = {
require(configOptions.samplingRatio > 0,
s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0")
val shouldHandleCorruptRecord = configOptions.permissive
val schemaData = if (configOptions.samplingRatio > 0.99) {
json
} else {
Expand All @@ -50,21 +51,23 @@ private[sql] object InferSchema {
val rootType = schemaData.mapPartitions { iter =>
val factory = new JsonFactory()
configOptions.setJacksonOptions(factory)
iter.map { row =>
iter.flatMap { row =>
try {
Utils.tryWithResource(factory.createParser(row)) { parser =>
parser.nextToken()
inferField(parser, configOptions)
Some(inferField(parser, configOptions))
}
} catch {
case _: JsonParseException if shouldHandleCorruptRecord =>
Some(StructType(Seq(StructField(columnNameOfCorruptRecords, StringType))))
case _: JsonParseException =>
StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
None
}
}
}.treeAggregate[DataType](
StructType(Seq()))(
compatibleRootType(columnNameOfCorruptRecords),
compatibleRootType(columnNameOfCorruptRecords))
compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord),
compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord))

canonicalizeType(rootType) match {
case Some(st: StructType) => st
Expand Down Expand Up @@ -194,18 +197,21 @@ private[sql] object InferSchema {
* Remove top-level ArrayType wrappers and merge the remaining schemas
*/
private def compatibleRootType(
columnNameOfCorruptRecords: String): (DataType, DataType) => DataType = {
columnNameOfCorruptRecords: String,
shouldHandleCorruptRecord: Boolean): (DataType, DataType) => DataType = {
// Since we support array of json objects at the top level,
// we need to check the element type and find the root level data type.
case (ArrayType(ty1, _), ty2) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
case (ty1, ArrayType(ty2, _)) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
case (ArrayType(ty1, _), ty2) =>
compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)(ty1, ty2)
case (ty1, ArrayType(ty2, _)) =>
compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)(ty1, ty2)
// If we see any other data type at the root level, we get records that cannot be
// parsed. So, we use the struct as the data type and add the corrupt field to the schema.
case (struct: StructType, NullType) => struct
case (NullType, struct: StructType) => struct
case (struct: StructType, o) if !o.isInstanceOf[StructType] =>
case (struct: StructType, o) if !o.isInstanceOf[StructType] && shouldHandleCorruptRecord =>
withCorruptField(struct, columnNameOfCorruptRecords)
case (o, struct: StructType) if !o.isInstanceOf[StructType] =>
case (o, struct: StructType) if !o.isInstanceOf[StructType] && shouldHandleCorruptRecord =>
withCorruptField(struct, columnNameOfCorruptRecords)
// If we get anything else, we call compatibleType.
// Usually, when we reach here, ty1 and ty2 are two StructTypes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.datasources.json

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}

import org.apache.spark.sql.execution.datasources.CompressionCodecs
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}

/**
* Options for the JSON data source.
Expand All @@ -28,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.CompressionCodecs
*/
private[sql] class JSONOptions(
@transient private val parameters: Map[String, String])
extends Serializable {
extends Logging with Serializable {

val samplingRatio =
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
Expand All @@ -49,6 +50,16 @@ private[sql] class JSONOptions(
val allowBackslashEscapingAnyCharacter =
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")

// Parse mode flags
if (!ParseModes.isValidMode(parseMode)) {
logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#11756 (comment), That warning should be dealt with here.

}

val failFast = ParseModes.isFailFastMode(parseMode)
val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
val permissive = ParseModes.isPermissiveMode(parseMode)

/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer

import com.fasterxml.jackson.core._

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -34,7 +35,7 @@ import org.apache.spark.util.Utils

private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)

object JacksonParser {
object JacksonParser extends Logging {

def parse(
input: RDD[String],
Expand Down Expand Up @@ -257,13 +258,20 @@ object JacksonParser {

def failedRecord(record: String): Seq[InternalRow] = {
// create a row even if no corrupt record column is present
val row = new GenericMutableRow(schema.length)
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
require(schema(corruptIndex).dataType == StringType)
row.update(corruptIndex, UTF8String.fromString(record))
if (configOptions.failFast) {
throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
}
if (configOptions.dropMalformed) {
logWarning(s"Dropping malformed line: $record")
Nil
} else {
val row = new GenericMutableRow(schema.length)
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
require(schema(corruptIndex).dataType == StringType)
row.update(corruptIndex, UTF8String.fromString(record))
}
Seq(row)
}

Seq(row)
}

val factory = new JsonFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand Down Expand Up @@ -963,7 +964,56 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
)
}

test("Corrupt records") {
test("Corrupt records: FAILFAST mode") {
val schema = StructType(
StructField("a", StringType, true) :: Nil)
// `FAILFAST` mode should throw an exception for corrupt records.
val exceptionOne = intercept[SparkException] {
sqlContext.read
.option("mode", "FAILFAST")
.json(corruptRecords)
.collect()
}
assert(exceptionOne.getMessage.contains("Malformed line in FAILFAST mode: {"))

val exceptionTwo = intercept[SparkException] {
sqlContext.read
.option("mode", "FAILFAST")
.schema(schema)
.json(corruptRecords)
.collect()
}
assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode: {"))
}

test("Corrupt records: DROPMALFORMED mode") {
val schemaOne = StructType(
StructField("a", StringType, true) ::
StructField("b", StringType, true) ::
StructField("c", StringType, true) :: Nil)
val schemaTwo = StructType(
StructField("a", StringType, true) :: Nil)
// `DROPMALFORMED` mode should skip corrupt records
val jsonDFOne = sqlContext.read
.option("mode", "DROPMALFORMED")
.json(corruptRecords)
checkAnswer(
jsonDFOne,
Row("str_a_4", "str_b_4", "str_c_4") :: Nil
)
assert(jsonDFOne.schema === schemaOne)

val jsonDFTwo = sqlContext.read
.option("mode", "DROPMALFORMED")
.schema(schemaTwo)
.json(corruptRecords)
checkAnswer(
jsonDFTwo,
Row("str_a_4") :: Nil)
assert(jsonDFTwo.schema === schemaTwo)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add more test cases with user specified schema?


test("Corrupt records: PERMISSIVE mode") {
// Test if we can query corrupt records.
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
withTempTable("jsonTable") {
Expand Down