Skip to content

Commit

Permalink
[SPARK-29444] Add configuration to support JacksonGenrator to keep fi…
Browse files Browse the repository at this point in the history
…elds with null values

### Why are the changes needed?
As mentioned in jira, sometimes we need to be able to support the retention of null columns when writing JSON.
For example, sparkmagic(used widely in jupyter with livy) will generate sql query results based on DataSet.toJSON and parse JSON to pandas DataFrame to display. If there is a null column, it is easy to have some column missing or even the query result is empty. The loss of the null column in the first row, may cause parsing exceptions or loss of entire column data.

### Does this PR introduce any user-facing change?
Example in spark-shell.
scala> spark.sql("select null as a, 1 as b").toJSON.collect.foreach(println)
{"b":1}

scala> spark.sql("set spark.sql.jsonGenerator.struct.ignore.null=false")
res2: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> spark.sql("select null as a, 1 as b").toJSON.collect.foreach(println)
{"a":null,"b":1}

### How was this patch tested?
Add new test to JacksonGeneratorSuite

Closes apache#26098 from stczwd/json.

Lead-authored-by: stczwd <[email protected]>
Co-authored-by: Jackey Lee <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
jackylee-ch authored and cloud-fan committed Oct 18, 2019
1 parent ec5d698 commit 78b0cbe
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf

/**
* Options for parsing JSON data into Spark SQL rows.
Expand Down Expand Up @@ -76,6 +77,10 @@ private[sql] class JSONOptions(
// Whether to ignore column of all null values or empty array/struct during schema inference
val dropFieldIfAllNull = parameters.get("dropFieldIfAllNull").map(_.toBoolean).getOrElse(false)

// Whether to ignore null fields during json generating
val ignoreNullFields = parameters.getOrElse("ignoreNullFields",
SQLConf.get.jsonGeneratorIgnoreNullFields).toBoolean

// A language tag in IETF BCP 47 format
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ private[sql] class JacksonGenerator(
if (!row.isNullAt(i)) {
gen.writeFieldName(field.name)
fieldWriters(i).apply(row, i)
} else if (!options.ignoreNullFields) {
gen.writeFieldName(field.name)
gen.writeNull()
}
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val JSON_GENERATOR_IGNORE_NULL_FIELDS =
buildConf("spark.sql.jsonGenerator.ignoreNullFields")
.doc("If false, JacksonGenerator will generate null for null fields in Struct.")
.stringConf
.createWithDefault("true")

val FILE_SINK_LOG_DELETION = buildConf("spark.sql.streaming.fileSink.log.deletion")
.internal()
.doc("Whether to delete the expired log files in file stream sink.")
Expand Down Expand Up @@ -2379,6 +2385,8 @@ class SQLConf extends Serializable with Logging {

def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)

def jsonGeneratorIgnoreNullFields: String = getConf(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS)

def parallelFileListingInStatsComputation: Boolean =
getConf(SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,33 @@ class JacksonGeneratorSuite extends SparkFunSuite {
assert(writer.toString === """{"a":1}""")
}

test("SPARK-29444: initial with StructType and write out an empty row " +
"with ignoreNullFields=false") {
val dataType = StructType(StructField("a", IntegerType) :: Nil)
val input = InternalRow(null)
val writer = new CharArrayWriter()
val allowNullOption =
new JSONOptions(Map("ignoreNullFields" -> "false"), gmtId)
val gen = new JacksonGenerator(dataType, writer, allowNullOption)
gen.write(input)
gen.flush()
assert(writer.toString === """{"a":null}""")
}

test("SPARK-29444: initial with StructType field and write out a row " +
"with ignoreNullFields=false and struct inner null") {
val fieldType = StructType(StructField("b", IntegerType) :: Nil)
val dataType = StructType(StructField("a", fieldType) :: Nil)
val input = InternalRow(InternalRow(null))
val writer = new CharArrayWriter()
val allowNullOption =
new JSONOptions(Map("ignoreNullFields" -> "false"), gmtId)
val gen = new JacksonGenerator(dataType, writer, allowNullOption)
gen.write(input)
gen.flush()
assert(writer.toString === """{"a":{"b":null}}""")
}

test("initial with StructType and write out rows") {
val dataType = StructType(StructField("a", IntegerType) :: Nil)
val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil)
Expand Down

0 comments on commit 78b0cbe

Please sign in to comment.