Skip to content

Commit

Permalink
[SPARK-8093] [SQL] Remove empty structs inferred from JSON documents
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Howell committed Jun 13, 2015
1 parent 19834fa commit 76ac3e8
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 17 deletions.
52 changes: 35 additions & 17 deletions sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[sql] object InferSchema {
}

// perform schema inference on each row and merge afterwards
schemaData.mapPartitions { iter =>
val rootType = schemaData.mapPartitions { iter =>
val factory = new JsonFactory()
iter.map { row =>
try {
Expand All @@ -55,8 +55,13 @@ private[sql] object InferSchema {
StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
}
}
}.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType) match {
case st: StructType => nullTypeToStringType(st)
}.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType)

canonicalizeType(rootType) match {
case Some(st: StructType) => st
case _ =>
// canonicalizeType erases all empty structs, including the only one we want to keep
StructType(Seq())
}
}

Expand Down Expand Up @@ -116,22 +121,35 @@ private[sql] object InferSchema {
}
}

private def nullTypeToStringType(struct: StructType): StructType = {
val fields = struct.fields.map {
case StructField(fieldName, dataType, nullable, _) =>
val newType = dataType match {
case NullType => StringType
case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull)
case ArrayType(struct: StructType, containsNull) =>
ArrayType(nullTypeToStringType(struct), containsNull)
case struct: StructType => nullTypeToStringType(struct)
case other: DataType => other
}
/**
* Convert NullType to StringType and remove StructTypes with no fields
*/
private def canonicalizeType: DataType => Option[DataType] = {
case at@ArrayType(elementType, _) =>
for {
canonicalType <- canonicalizeType(elementType)
} yield {
at.copy(canonicalType)
}

StructField(fieldName, newType, nullable)
}
case StructType(fields) =>
val canonicalFields = for {
field <- fields
if field.name.nonEmpty
canonicalType <- canonicalizeType(field.dataType)
} yield {
field.copy(dataType = canonicalType)
}

if (canonicalFields.nonEmpty) {
Some(StructType(canonicalFields))
} else {
// per SPARK-8093: empty structs should be deleted
None
}

StructType(fields)
case NullType => Some(StringType)
case other => Some(other)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1103,4 +1103,8 @@ class JsonSuite extends QueryTest with TestJsonData {
}
}

test("SPARK-8093 Erase empty structs") {
val emptySchema = InferSchema(emptyRecords, 1.0, "")
assert(StructType(Seq()) === emptySchema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,5 +189,14 @@ trait TestJsonData {
"""{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
"""]""" :: Nil)

def emptyRecords: RDD[String] =
ctx.sparkContext.parallelize(
"""{""" ::
"""""" ::
"""{"a": {}}""" ::
"""{"a": {"b": {}}}""" ::
"""{"b": [{"c": {}}]}""" ::
"""]""" :: Nil)

def empty: RDD[String] = ctx.sparkContext.parallelize(Seq[String]())
}

0 comments on commit 76ac3e8

Please sign in to comment.