Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18251][SQL] the type of Dataset can't be Option of non-flat type #15979

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,19 @@ object ScalaReflection extends ScalaReflection {

}

/**
* Returns true if the given type is option of product type, e.g. `Option[Tuple2]`. Note that,
* we also treat [[DefinedByConstructorParams]] as product type.
*/
def optionOfProductType(tpe: `Type`): Boolean = ScalaReflectionLock.synchronized {
tpe match {
case t if t <:< localTypeOf[Option[_]] =>
val TypeRef(_, _, Seq(optType)) = t
definedByConstructorParams(optType)
case _ => false
}
}

/**
* Returns the parameter names and types for the primary constructor of this class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,26 @@ object ExpressionEncoder {
// We convert the not-serializable TypeTag into StructType and ClassTag.
val mirror = typeTag[T].mirror
val tpe = typeTag[T].tpe

if (ScalaReflection.optionOfProductType(tpe)) {
throw new UnsupportedOperationException(
"Cannot create encoder for Option of Product type, because Product type is represented " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also means an Aggregator cannot use an Option of Product Type for its intermediate type. e.g.
Aggregator[Int, Option[(Int, Int)], Int] is now invalid. but i see no good reason why such an Aggregator wouldnt exist?

Copy link
Contributor

@koertkuipers koertkuipers Dec 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this strikes me more as a limitation on Dataset[X] than on Encoder[X]

Copy link
Contributor

@koertkuipers koertkuipers Dec 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and now that i think about it more, i also think Dataset[Option[(Int, Int)]] should be valid too if possible.

it should not be represented by a top level Row object, so the schema should be
StructType(StructField("_1", StructType(StructField("_1", IntegerType, false), StructField("_2", IntegerType, false)), true))

we do this trick where we nest top-level non-struct types inside a row, why not do the same thing for Option[X <: Product]?

"as a row, and the entire row can not be null in Spark SQL like normal databases. " +
"You can wrap your type with Tuple1 if you do want top level null Product objects, " +
"e.g. instead of creating `Dataset[Option[MyClass]]`, you can do something like " +
"`val ds: Dataset[Tuple1[MyClass]] = Seq(Tuple1(MyClass(...)), Tuple1(null)).toDS`")
}

val cls = mirror.runtimeClass(tpe)
val flat = !ScalaReflection.definedByConstructorParams(tpe)

val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], nullable = true)
val nullSafeInput = if (flat) {
inputObject
} else {
// For input object of non-flat type, we can't encode it to row if it's null, as Spark SQL
// For input object of Product type, we can't encode it to row if it's null, as Spark SQL
// doesn't allow top-level row to be null, only its columns can be null.
AssertNotNull(inputObject, Seq("top level non-flat input object"))
AssertNotNull(inputObject, Seq("top level Product input object"))
}
val serializer = ScalaReflection.serializerFor[T](nullSafeInput)
val deserializer = ScalaReflection.deserializerFor[T]
Expand Down
13 changes: 11 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -867,10 +867,10 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
checkDataset(Seq("a", null).toDS(), "a", null)
}

test("Dataset should throw RuntimeException if non-flat input object is null") {
test("Dataset should throw RuntimeException if top-level product input object is null") {
val e = intercept[RuntimeException](Seq(ClassData("a", 1), null).toDS())
assert(e.getMessage.contains("Null value appeared in non-nullable field"))
assert(e.getMessage.contains("top level non-flat input object"))
assert(e.getMessage.contains("top level Product input object"))
}

test("dropDuplicates") {
Expand Down Expand Up @@ -1051,6 +1051,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
checkDataset(dsDouble, arrayDouble)
checkDataset(dsString, arrayString)
}

test("SPARK-18251: the type of Dataset can't be Option of Product type") {
checkDataset(Seq(Some(1), None).toDS(), Some(1), None)

val e = intercept[UnsupportedOperationException] {
Seq(Some(1 -> "a"), None).toDS()
}
assert(e.getMessage.contains("Cannot create encoder for Option of Product type"))
}
}

case class Generic[T](id: T, value: Double)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
}

test("roundtrip in to_json and from_json") {
val dfOne = Seq(Some(Tuple1(Tuple1(1))), None).toDF("struct")
val dfOne = Seq(Tuple1(Tuple1(1)), Tuple1(null)).toDF("struct")
val schemaOne = dfOne.schema(0).dataType.asInstanceOf[StructType]
val readBackOne = dfOne.select(to_json($"struct").as("json"))
.select(from_json($"json", schemaOne).as("struct"))
Expand Down