Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Nov 30, 2016
1 parent 01b072d commit 876e5c7
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,10 @@ object ScalaReflection extends ScalaReflection {
}

/**
* Returns true if the given type is option of non flat type, e.g. `Option[Tuple2]`.
* Returns true if the given type is option of product type, e.g. `Option[Tuple2]`. Note that,
* we also treat [[DefinedByConstructorParams]] as product type.
*/
def optionOfNonFlatType(tpe: `Type`): Boolean = ScalaReflectionLock.synchronized {
def optionOfProductType(tpe: `Type`): Boolean = ScalaReflectionLock.synchronized {
tpe match {
case t if t <:< localTypeOf[Option[_]] =>
val TypeRef(_, _, Seq(optType)) = t
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ object ExpressionEncoder {
val mirror = typeTag[T].mirror
val tpe = typeTag[T].tpe

if (ScalaReflection.optionOfNonFlatType(tpe)) {
if (ScalaReflection.optionOfProductType(tpe)) {
throw new UnsupportedOperationException(
"Cannot create encoder for Option of non-flat type, as non-flat type is represented " +
"Cannot create encoder for Option of Product type, because Product type is represented " +
"as a row, and the entire row can not be null in Spark SQL like normal databases. " +
"You can wrap your type with Tuple1 if you do want top level null objects, e.g. " +
"val ds: Dataset[Tuple1[MyClass]] = Seq(Tuple1(MyClass(...)), Tuple1(null)).toDS")
"You can wrap your type with Tuple1 if you do want top level null Product objects, " +
"e.g. instead of creating `Dataset[Option[MyClass]]`, you can do something like " +
"`val ds: Dataset[Tuple1[MyClass]] = Seq(Tuple1(MyClass(...)), Tuple1(null)).toDS`")
}

val cls = mirror.runtimeClass(tpe)
Expand All @@ -63,9 +64,9 @@ object ExpressionEncoder {
val nullSafeInput = if (flat) {
inputObject
} else {
// For input object of non-flat type, we can't encode it to row if it's null, as Spark SQL
// For input object of Product type, we can't encode it to row if it's null, as Spark SQL
// doesn't allow top-level row to be null, only its columns can be null.
AssertNotNull(inputObject, Seq("top level non-flat input object"))
AssertNotNull(inputObject, Seq("top level Product input object"))
}
val serializer = ScalaReflection.serializerFor[T](nullSafeInput)
val deserializer = ScalaReflection.deserializerFor[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,10 +867,10 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
checkDataset(Seq("a", null).toDS(), "a", null)
}

test("Dataset should throw RuntimeException if non-flat input object is null") {
test("Dataset should throw RuntimeException if top-level product input object is null") {
val e = intercept[RuntimeException](Seq(ClassData("a", 1), null).toDS())
assert(e.getMessage.contains("Null value appeared in non-nullable field"))
assert(e.getMessage.contains("top level non-flat input object"))
assert(e.getMessage.contains("top level Product input object"))
}

test("dropDuplicates") {
Expand Down Expand Up @@ -1052,13 +1052,13 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
checkDataset(dsString, arrayString)
}

test("SPARK-18251: the type of Dataset can't be Option of non-flat type") {
test("SPARK-18251: the type of Dataset can't be Option of Product type") {
checkDataset(Seq(Some(1), None).toDS(), Some(1), None)

val e = intercept[UnsupportedOperationException] {
Seq(Some(1 -> "a"), None).toDS()
}
assert(e.getMessage.contains("Cannot create encoder for Option of non-flat type"))
assert(e.getMessage.contains("Cannot create encoder for Option of Product type"))
}
}

Expand Down

0 comments on commit 876e5c7

Please sign in to comment.