Skip to content

Commit

Permalink
[SPARK-21247][SQL] Type comparison should respect case-sensitive SQL …
Browse files Browse the repository at this point in the history
…conf

## What changes were proposed in this pull request?

This is an effort to reduce the difference between Hive and Spark. Spark supports case-sensitivity in columns. Especially, for Struct types, with `spark.sql.caseSensitive=true`, the following is supported.

```scala
scala> sql("select named_struct('a', 1, 'A', 2).a").show
+--------------------------+
|named_struct(a, 1, A, 2).a|
+--------------------------+
|                         1|
+--------------------------+

scala> sql("select named_struct('a', 1, 'A', 2).A").show
+--------------------------+
|named_struct(a, 1, A, 2).A|
+--------------------------+
|                         2|
+--------------------------+
```

And vice versa, with `spark.sql.caseSensitive=false`, the following is supported.
```scala
scala> sql("select named_struct('a', 1).A, named_struct('A', 1).a").show
+--------------------+--------------------+
|named_struct(a, 1).A|named_struct(A, 1).a|
+--------------------+--------------------+
|                   1|                   1|
+--------------------+--------------------+
```

However, types are considered different. For example, SET operations fail.
```scala
scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<A:int> <> struct<a:int> at the first column of the second table;;
'Union
:- Project [named_struct(a, 1) AS named_struct(a, 1)apache#57]
:  +- OneRowRelation$
+- Project [named_struct(A, 2) AS named_struct(A, 2)apache#58]
   +- OneRowRelation$
```

This PR aims to support case-insensitive type equality. For example, in Set operation, the above operation succeed when `spark.sql.caseSensitive=false`.

```scala
scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show
+------------------+
|named_struct(a, 1)|
+------------------+
|               [1]|
|               [2]|
+------------------+
```

## How was this patch tested?

Pass the Jenkins with a newly add test case.

Author: Dongjoon Hyun <[email protected]>

Closes apache#18460 from dongjoon-hyun/SPARK-21247.
  • Loading branch information
dongjoon-hyun authored and cloud-fan committed Oct 13, 2017
1 parent e6e3600 commit 6412ea1
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ object TypeCoercion {
case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) =>
Some(TimestampType)

case (t1 @ StructType(fields1), t2 @ StructType(fields2)) if t1.sameType(t2) =>
Some(StructType(fields1.zip(fields2).map { case (f1, f2) =>
// Since `t1.sameType(t2)` is true, two StructTypes have the same DataType
// except `name` (in case of `spark.sql.caseSensitive=false`) and `nullable`.
// - Different names: use f1.name
// - Different nullabilities: `nullable` is true iff one of them is nullable.
val dataType = findTightestCommonType(f1.dataType, f2.dataType).get
StructField(f1.name, dataType, nullable = f1.nullable || f2.nullable)
}))

case _ => None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -80,7 +81,11 @@ abstract class DataType extends AbstractDataType {
* (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
*/
private[spark] def sameType(other: DataType): Boolean =
DataType.equalsIgnoreNullability(this, other)
if (SQLConf.get.caseSensitiveAnalysis) {
DataType.equalsIgnoreNullability(this, other)
} else {
DataType.equalsIgnoreCaseAndNullability(this, other)
}

/**
* Returns the same data type but set all nullability fields are true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,17 @@ class TypeCoercionSuite extends AnalysisTest {
widenFunc: (DataType, DataType) => Option[DataType],
t1: DataType,
t2: DataType,
expected: Option[DataType]): Unit = {
expected: Option[DataType],
isSymmetric: Boolean = true): Unit = {
var found = widenFunc(t1, t2)
assert(found == expected,
s"Expected $expected as wider common type for $t1 and $t2, found $found")
// Test both directions to make sure the widening is symmetric.
found = widenFunc(t2, t1)
assert(found == expected,
s"Expected $expected as wider common type for $t2 and $t1, found $found")
if (isSymmetric) {
found = widenFunc(t2, t1)
assert(found == expected,
s"Expected $expected as wider common type for $t2 and $t1, found $found")
}
}

test("implicit type cast - ByteType") {
Expand Down Expand Up @@ -385,6 +388,47 @@ class TypeCoercionSuite extends AnalysisTest {
widenTest(NullType, StructType(Seq()), Some(StructType(Seq())))
widenTest(StringType, MapType(IntegerType, StringType, true), None)
widenTest(ArrayType(IntegerType), StructType(Seq()), None)

widenTest(
StructType(Seq(StructField("a", IntegerType))),
StructType(Seq(StructField("b", IntegerType))),
None)
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = false))),
StructType(Seq(StructField("a", DoubleType, nullable = false))),
None)

widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = false))),
StructType(Seq(StructField("a", IntegerType, nullable = false))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = false)))))
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = false))),
StructType(Seq(StructField("a", IntegerType, nullable = true))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = true))),
StructType(Seq(StructField("a", IntegerType, nullable = false))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = true))),
StructType(Seq(StructField("a", IntegerType, nullable = true))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
widenTest(
StructType(Seq(StructField("a", IntegerType))),
StructType(Seq(StructField("A", IntegerType))),
None)
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
checkWidenType(
TypeCoercion.findTightestCommonType,
StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType))),
StructType(Seq(StructField("A", IntegerType), StructField("b", IntegerType))),
Some(StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType)))),
isSymmetric = false)
}
}

test("wider common type for decimal and array") {
Expand Down
38 changes: 38 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2646,6 +2646,44 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("SPARK-21247: Allow case-insensitive type equality in Set operation") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))")
sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))")

withTable("t", "S") {
sql("CREATE TABLE t(c struct<f:int>) USING parquet")
sql("CREATE TABLE S(C struct<F:int>) USING parquet")
Seq(("c", "C"), ("C", "c"), ("c.f", "C.F"), ("C.F", "c.f")).foreach {
case (left, right) =>
checkAnswer(sql(s"SELECT * FROM t, S WHERE t.$left = S.$right"), Seq.empty)
}
}
}

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val m1 = intercept[AnalysisException] {
sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))")
}.message
assert(m1.contains("Union can only be performed on tables with the compatible column types"))

val m2 = intercept[AnalysisException] {
sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))")
}.message
assert(m2.contains("Except can only be performed on tables with the compatible column types"))

withTable("t", "S") {
sql("CREATE TABLE t(c struct<f:int>) USING parquet")
sql("CREATE TABLE S(C struct<F:int>) USING parquet")
checkAnswer(sql("SELECT * FROM t, S WHERE t.c.f = S.C.F"), Seq.empty)
val m = intercept[AnalysisException] {
sql("SELECT * FROM t, S WHERE c = C")
}.message
assert(m.contains("cannot resolve '(t.`c` = S.`C`)' due to data type mismatch"))
}
}
}

test("SPARK-21335: support un-aliased subquery") {
withTempView("v") {
Seq(1 -> "a").toDF("i", "j").createOrReplaceTempView("v")
Expand Down

0 comments on commit 6412ea1

Please sign in to comment.