From 0b903446adea16b54f239bd0c2e07c64a2060c0a Mon Sep 17 00:00:00 2001 From: Jackey Lee Date: Sat, 12 Oct 2019 10:10:37 +0800 Subject: [PATCH 1/6] Add configuration to support JacksonGenrator to keep fields with null values --- .../apache/spark/sql/catalyst/json/JSONOptions.scala | 3 +++ .../spark/sql/catalyst/json/JacksonGenerator.scala | 3 +++ .../org/apache/spark/sql/internal/SQLConf.scala | 8 ++++++++ .../sql/catalyst/json/JacksonGeneratorSuite.scala | 12 ++++++++++++ .../main/scala/org/apache/spark/sql/Dataset.scala | 3 ++- .../execution/datasources/json/JsonFileFormat.scala | 5 ++++- 6 files changed, 32 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index dc26a28c74f11..1ed2a2dab8674 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -76,6 +76,9 @@ private[sql] class JSONOptions( // Whether to ignore column of all null values or empty array/struct during schema inference val dropFieldIfAllNull = parameters.get("dropFieldIfAllNull").map(_.toBoolean).getOrElse(false) + // Whether to ignore column of all null during json generating + val structIngoreNull = parameters.getOrElse("structIngoreNull", "true").toBoolean + // A language tag in IETF BCP 47 format val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 3ee7e484690d5..736147f3c4bb6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -184,6 +184,9 @@ private[sql] class JacksonGenerator( if (!row.isNullAt(i)) { gen.writeFieldName(field.name) fieldWriters(i).apply(row, i) + } else if (!options.structIngoreNull) { + gen.writeFieldName(field.name) + gen.writeNull() } i += 1 } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0ec661fc16c88..6b3fe83e2834e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1153,6 +1153,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val JSON_GENERATOR_STRUCT_IGNORE_NULL = + buildConf("spark.sql.jsonGenerator.struct.ignore.null") + .doc("If false, JacksonGenerator will generate null for null value in StructType.") + .booleanConf + .createWithDefault(true) + val FILE_SINK_LOG_DELETION = buildConf("spark.sql.streaming.fileSink.log.deletion") .internal() .doc("Whether to delete the expired log files in file stream sink.") @@ -2323,6 +2329,8 @@ class SQLConf extends Serializable with Logging { def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE) + def jsonGeneratorStructIngoreNull: Boolean = getConf(SQLConf.JSON_GENERATOR_STRUCT_IGNORE_NULL) + def parallelFileListingInStatsComputation: Boolean = getConf(SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala index 9b27490ed0e35..e65e434393b94 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala @@ -39,6 +39,18 @@ class JacksonGeneratorSuite extends SparkFunSuite { assert(writer.toString === """{"a":1}""") } + test("initial with StructType and write out an empty row with allowStructIncludeNull=true") { + val dataType = StructType(StructField("a", IntegerType) :: Nil) + val input = InternalRow(null) + val writer = new CharArrayWriter() + val allowNullOption = + new JSONOptions(Map("structIngoreNull" -> "false"), gmtId) + val gen = new JacksonGenerator(dataType, writer, allowNullOption) + gen.write(input) + gen.flush() + assert(writer.toString === """{"a":null}""") + } + test("initial with StructType and write out rows") { val dataType = StructType(StructField("a", IntegerType) :: Nil) val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 9a2d80030ee66..ef8b18a86d006 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3170,11 +3170,12 @@ class Dataset[T] private[sql]( def toJSON: Dataset[String] = { val rowSchema = this.schema val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone + val structIngoreNull = sparkSession.sessionState.conf.jsonGeneratorStructIngoreNull.toString mapPartitions { iter => val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records val gen = new JacksonGenerator(rowSchema, writer, - new JSONOptions(Map.empty[String, String], sessionLocalTimeZone)) + new JSONOptions(Map("structIngoreNull" -> structIngoreNull), sessionLocalTimeZone)) new Iterator[String] { override def hasNext: Boolean = iter.hasNext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 95a63c3d1e2d9..579ee9c0c3827 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -67,8 +67,11 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { val conf = job.getConfiguration + val optionsFromConf = + Map("structIngoreNull" -> + sparkSession.sessionState.conf.jsonGeneratorStructIngoreNull.toString) val parsedOptions = new JSONOptions( - options, + optionsFromConf ++ options, sparkSession.sessionState.conf.sessionLocalTimeZone, sparkSession.sessionState.conf.columnNameOfCorruptRecord) parsedOptions.compressionCodec.foreach { codec => From 4f3c83f4463c984db6691fba34b5cac7a022d676 Mon Sep 17 00:00:00 2001 From: Jackey Lee Date: Sun, 13 Oct 2019 16:24:33 +0800 Subject: [PATCH 2/6] [SPARK-29444] add spark jira id in test --- .../apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala index e65e434393b94..7d8bb2ac79251 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala @@ -39,7 +39,7 @@ class JacksonGeneratorSuite extends SparkFunSuite { assert(writer.toString === """{"a":1}""") } - test("initial with StructType and write out an empty row with allowStructIncludeNull=true") { + test("SPARK-29444: initial with StructType and write out an empty row with allowStructIncludeNull=true") { val dataType = StructType(StructField("a", IntegerType) :: Nil) val input = InternalRow(null) val writer = new CharArrayWriter() From 143b9e7b29d712ea2a440a2b2414071ef6a5824a Mon Sep 17 00:00:00 2001 From: stczwd Date: Sun, 13 Oct 2019 18:04:58 +0800 Subject: [PATCH 3/6] [SPARK-29444] Fix suite tests --- .../spark/sql/catalyst/json/JacksonGeneratorSuite.scala | 3 ++- .../sql/execution/datasources/json/JsonFileFormat.scala | 5 +---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala index 7d8bb2ac79251..d0f790abf5747 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala @@ -39,7 +39,8 @@ class JacksonGeneratorSuite extends SparkFunSuite { assert(writer.toString === """{"a":1}""") } - test("SPARK-29444: initial with StructType and write out an empty row with allowStructIncludeNull=true") { + test("SPARK-29444: initial with StructType and write out an empty row " + + "with allowStructIncludeNull=true") { val dataType = StructType(StructField("a", IntegerType) :: Nil) val input = InternalRow(null) val writer = new CharArrayWriter() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 579ee9c0c3827..95a63c3d1e2d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -67,11 +67,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { val conf = job.getConfiguration - val optionsFromConf = - Map("structIngoreNull" -> - sparkSession.sessionState.conf.jsonGeneratorStructIngoreNull.toString) val parsedOptions = new JSONOptions( - optionsFromConf ++ options, + options, sparkSession.sessionState.conf.sessionLocalTimeZone, sparkSession.sessionState.conf.columnNameOfCorruptRecord) parsedOptions.compressionCodec.foreach { codec => From f57bea21e2761811782beaed19aff043364dc06d Mon Sep 17 00:00:00 2001 From: stczwd Date: Wed, 16 Oct 2019 14:45:22 +0800 Subject: [PATCH 4/6] [SPARK-29444] add a suite test for struct inner null --- .../catalyst/json/JacksonGeneratorSuite.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala index d0f790abf5747..e1c2550bdbd60 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala @@ -40,7 +40,7 @@ class JacksonGeneratorSuite extends SparkFunSuite { } test("SPARK-29444: initial with StructType and write out an empty row " + - "with allowStructIncludeNull=true") { + "with structIngoreNull=false") { val dataType = StructType(StructField("a", IntegerType) :: Nil) val input = InternalRow(null) val writer = new CharArrayWriter() @@ -52,6 +52,20 @@ class JacksonGeneratorSuite extends SparkFunSuite { assert(writer.toString === """{"a":null}""") } + test("SPARK-29444: initial with StructType field and write out a row " + + "with structIngoreNull=false and struct inner null") { + val fieldType = StructType(StructField("b", IntegerType) :: Nil) + val dataType = StructType(StructField("a", fieldType) :: Nil) + val input = InternalRow(InternalRow(null)) + val writer = new CharArrayWriter() + val allowNullOption = + new JSONOptions(Map("structIngoreNull" -> "false"), gmtId) + val gen = new JacksonGenerator(dataType, writer, allowNullOption) + gen.write(input) + gen.flush() + assert(writer.toString === """{"a":{"b":null}}""") + } + test("initial with StructType and write out rows") { val dataType = StructType(StructField("a", IntegerType) :: Nil) val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil) From eb123bf9e3fc7d70c72235782469a1664665cc93 Mon Sep 17 00:00:00 2001 From: stczwd Date: Wed, 16 Oct 2019 21:15:20 +0800 Subject: [PATCH 5/6] [SPARK-29444] change config from structIgnoreNull to ignoreNullFields --- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 4 ++-- .../apache/spark/sql/catalyst/json/JacksonGenerator.scala | 2 +- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 8 ++++---- .../spark/sql/catalyst/json/JacksonGeneratorSuite.scala | 8 ++++---- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 4 ++-- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 1ed2a2dab8674..b953bc19f99cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -76,8 +76,8 @@ private[sql] class JSONOptions( // Whether to ignore column of all null values or empty array/struct during schema inference val dropFieldIfAllNull = parameters.get("dropFieldIfAllNull").map(_.toBoolean).getOrElse(false) - // Whether to ignore column of all null during json generating - val structIngoreNull = parameters.getOrElse("structIngoreNull", "true").toBoolean + // Whether to ignore null fields during json generating + val ignoreNullFields = parameters.getOrElse("ignoreNullFields", "true").toBoolean // A language tag in IETF BCP 47 format val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 736147f3c4bb6..aaf2ecf7923ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -184,7 +184,7 @@ private[sql] class JacksonGenerator( if (!row.isNullAt(i)) { gen.writeFieldName(field.name) fieldWriters(i).apply(row, i) - } else if (!options.structIngoreNull) { + } else if (!options.ignoreNullFields) { gen.writeFieldName(field.name) gen.writeNull() } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6b3fe83e2834e..a07f5e514e1f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1153,9 +1153,9 @@ object SQLConf { .booleanConf .createWithDefault(true) - val JSON_GENERATOR_STRUCT_IGNORE_NULL = - buildConf("spark.sql.jsonGenerator.struct.ignore.null") - .doc("If false, JacksonGenerator will generate null for null value in StructType.") + val JSON_GENERATOR_IGNORE_NULL_FIELDS = + buildConf("spark.sql.jsonGenerator.nullFields.ignore") + .doc("If false, JacksonGenerator will generate null for null fields in Struct.") .booleanConf .createWithDefault(true) @@ -2329,7 +2329,7 @@ class SQLConf extends Serializable with Logging { def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE) - def jsonGeneratorStructIngoreNull: Boolean = getConf(SQLConf.JSON_GENERATOR_STRUCT_IGNORE_NULL) + def jsonGeneratorIgnoreNullFields: Boolean = getConf(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS) def parallelFileListingInStatsComputation: Boolean = getConf(SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala index e1c2550bdbd60..2bb948ec24fb3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala @@ -40,12 +40,12 @@ class JacksonGeneratorSuite extends SparkFunSuite { } test("SPARK-29444: initial with StructType and write out an empty row " + - "with structIngoreNull=false") { + "with ignoreNullFields=false") { val dataType = StructType(StructField("a", IntegerType) :: Nil) val input = InternalRow(null) val writer = new CharArrayWriter() val allowNullOption = - new JSONOptions(Map("structIngoreNull" -> "false"), gmtId) + new JSONOptions(Map("ignoreNullFields" -> "false"), gmtId) val gen = new JacksonGenerator(dataType, writer, allowNullOption) gen.write(input) gen.flush() @@ -53,13 +53,13 @@ class JacksonGeneratorSuite extends SparkFunSuite { } test("SPARK-29444: initial with StructType field and write out a row " + - "with structIngoreNull=false and struct inner null") { + "with ignoreNullFields=false and struct inner null") { val fieldType = StructType(StructField("b", IntegerType) :: Nil) val dataType = StructType(StructField("a", fieldType) :: Nil) val input = InternalRow(InternalRow(null)) val writer = new CharArrayWriter() val allowNullOption = - new JSONOptions(Map("structIngoreNull" -> "false"), gmtId) + new JSONOptions(Map("ignoreNullFields" -> "false"), gmtId) val gen = new JacksonGenerator(dataType, writer, allowNullOption) gen.write(input) gen.flush() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index ef8b18a86d006..683799be0ce1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3170,12 +3170,12 @@ class Dataset[T] private[sql]( def toJSON: Dataset[String] = { val rowSchema = this.schema val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone - val structIngoreNull = sparkSession.sessionState.conf.jsonGeneratorStructIngoreNull.toString + val ignoreNullFields = sparkSession.sessionState.conf.jsonGeneratorIgnoreNullFields.toString mapPartitions { iter => val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records val gen = new JacksonGenerator(rowSchema, writer, - new JSONOptions(Map("structIngoreNull" -> structIngoreNull), sessionLocalTimeZone)) + new JSONOptions(Map("ignoreNullFields" -> ignoreNullFields), sessionLocalTimeZone)) new Iterator[String] { override def hasNext: Boolean = iter.hasNext From f8aea252e3b3d2888b044d13d98d5422aa0e1079 Mon Sep 17 00:00:00 2001 From: stczwd Date: Thu, 17 Oct 2019 13:46:56 +0800 Subject: [PATCH 6/6] [SPARK-29444] add SQLConf to JSONOptions --- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 4 +++- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 8 ++++---- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 3 +-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index b953bc19f99cd..e7bfb77e46c26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.internal.SQLConf /** * Options for parsing JSON data into Spark SQL rows. @@ -77,7 +78,8 @@ private[sql] class JSONOptions( val dropFieldIfAllNull = parameters.get("dropFieldIfAllNull").map(_.toBoolean).getOrElse(false) // Whether to ignore null fields during json generating - val ignoreNullFields = parameters.getOrElse("ignoreNullFields", "true").toBoolean + val ignoreNullFields = parameters.getOrElse("ignoreNullFields", + SQLConf.get.jsonGeneratorIgnoreNullFields).toBoolean // A language tag in IETF BCP 47 format val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a07f5e514e1f9..69d5a552c611c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1154,10 +1154,10 @@ object SQLConf { .createWithDefault(true) val JSON_GENERATOR_IGNORE_NULL_FIELDS = - buildConf("spark.sql.jsonGenerator.nullFields.ignore") + buildConf("spark.sql.jsonGenerator.ignoreNullFields") .doc("If false, JacksonGenerator will generate null for null fields in Struct.") - .booleanConf - .createWithDefault(true) + .stringConf + .createWithDefault("true") val FILE_SINK_LOG_DELETION = buildConf("spark.sql.streaming.fileSink.log.deletion") .internal() @@ -2329,7 +2329,7 @@ class SQLConf extends Serializable with Logging { def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE) - def jsonGeneratorIgnoreNullFields: Boolean = getConf(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS) + def jsonGeneratorIgnoreNullFields: String = getConf(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS) def parallelFileListingInStatsComputation: Boolean = getConf(SQLConf.PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 683799be0ce1c..9a2d80030ee66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3170,12 +3170,11 @@ class Dataset[T] private[sql]( def toJSON: Dataset[String] = { val rowSchema = this.schema val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone - val ignoreNullFields = sparkSession.sessionState.conf.jsonGeneratorIgnoreNullFields.toString mapPartitions { iter => val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records val gen = new JacksonGenerator(rowSchema, writer, - new JSONOptions(Map("ignoreNullFields" -> ignoreNullFields), sessionLocalTimeZone)) + new JSONOptions(Map.empty[String, String], sessionLocalTimeZone)) new Iterator[String] { override def hasNext: Boolean = iter.hasNext