Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5683] [SQL] Avoid multiple json generator created #4468

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import java.io.CharArrayWriter

import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -409,8 +411,26 @@ private[sql] class DataFrameImpl protected[sql](
override def toJSON: RDD[String] = {
val rowSchema = this.schema
this.mapPartitions { iter =>
val jsonFactory = new JsonFactory()
iter.map(JsonRDD.rowToJSON(rowSchema, jsonFactory))
val writer = new CharArrayWriter()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better than StringWriter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to use the StringWriter, but it doesn't have the method like reset, hence we may not able to reuse the object for for each of the records. That's why I changed it as CharArrayWriter.

// create the Generator without separator inserted between 2 records
val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we expect any test result change after using setRootValueSeparator(null)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A (single space) will be inserted between 2 json records (in string) by default;
Previously, we create thegeneratorobject for each of record in serialization, hence it's OK with default object separator.
However, in this PR, we are trying to using a singlegenerator for all of the records, that's why we need to set it as null, which means nothing will be inserted between records.


new Iterator[String] {
override def hasNext() = iter.hasNext
override def next(): String = {
JsonRDD.rowToJSON(rowSchema, gen)(iter.next())
gen.flush()

val json = writer.toString
if (hasNext) {
writer.reset()
} else {
gen.close()
}

json
}
}
}
}

Expand Down
13 changes: 3 additions & 10 deletions sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import java.sql.{Date, Timestamp}
import scala.collection.Map
import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper}

import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.core.{JsonGenerator, JsonProcessingException, JsonFactory}
import com.fasterxml.jackson.databind.ObjectMapper

import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -430,14 +429,11 @@ private[sql] object JsonRDD extends Logging {

/** Transforms a single Row to JSON using Jackson
*
* @param jsonFactory a JsonFactory object to construct a JsonGenerator
* @param rowSchema the schema object used for conversion
* @param gen a JsonGenerator object
* @param row The row to convert
*/
private[sql] def rowToJSON(rowSchema: StructType, jsonFactory: JsonFactory)(row: Row): String = {
val writer = new StringWriter()
val gen = jsonFactory.createGenerator(writer)

private[sql] def rowToJSON(rowSchema: StructType, gen: JsonGenerator)(row: Row) = {
def valWriter: (DataType, Any) => Unit = {
case (_, null) | (NullType, _) => gen.writeNull()
case (StringType, v: String) => gen.writeString(v)
Expand Down Expand Up @@ -479,8 +475,5 @@ private[sql] object JsonRDD extends Logging {
}

valWriter(rowSchema, row)
gen.close()
writer.toString
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -826,8 +826,8 @@ class JsonSuite extends QueryTest {
df1.registerTempTable("applySchema1")
val df2 = df1.toDataFrame
val result = df2.toJSON.collect()
assert(result(0) == "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}")
assert(result(3) == "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")
assert(result(0) === "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}")
assert(result(3) === "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")

val schema2 = StructType(
StructField("f1", StructType(
Expand All @@ -848,8 +848,8 @@ class JsonSuite extends QueryTest {
val df4 = df3.toDataFrame
val result2 = df4.toJSON.collect()

assert(result2(1) == "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
assert(result2(3) == "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
assert(result2(1) === "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")

val jsonDF = jsonRDD(primitiveFieldAndType)
val primTable = jsonRDD(jsonDF.toJSON)
Expand Down