Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48413][SQL] ALTER COLUMN with collation #46734

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,37 @@ object DataType {
}
}

/**
* Check if `from` is equal to `to` type except for collations and nullability, which are
* both checked to be compatible so that data of type `from` can be interpreted as of type `to`.
*/
private[sql] def equalsIgnoreCompatibleCollationAndNullability(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's already give this a name that reflects that is returns true of the it is allowed to evolve the type. This will be true for more cases than collations in the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check #46734 (comment). Part of the effort was to change the name of this method to equalsIgnoreCompatibleCollation.

from: DataType,
to: DataType): Boolean = {
(from, to) match {
case (_: StringType, _: StringType) => true

case (ArrayType(fromElement, fn), ArrayType(toElement, tn)) =>
(tn || !fn) && equalsIgnoreCompatibleCollationAndNullability(fromElement, toElement)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to allow setting the type nullable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the approach, we now only check for possible collation difference; nullability must remain the same. In this way by checking only for possible collation difference we scope down the changes whereby previously we were checking for complete data type equality (only comment changes were allowed in alter column command).


case (MapType(fromKey, fromValue, fn), MapType(toKey, toValue, tn)) =>
(tn || !fn) &&
// Map keys cannot change collation.
equalsIgnoreCompatibleNullability(fromKey, toKey) &&
equalsIgnoreCompatibleCollationAndNullability(fromValue, toValue)

case (StructType(fromFields), StructType(toFields)) =>
fromFields.length == toFields.length &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or do we use a map to not depend on the order of fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check #46734 (comment).

fromFields.zip(toFields).forall { case (fromField, toField) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather do a forAll on fromFields and look up each field by name in toFields. That way we do not depend on the order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check #46734 (comment).

fromField.name == toField.name &&
(toField.nullable || !fromField.nullable) &&
equalsIgnoreCompatibleCollationAndNullability(fromField.dataType, toField.dataType)
}

case (fromDataType, toDataType) => fromDataType == toDataType
}
}

/**
* Returns true if the two data types share the same "shape", i.e. the types
* are the same, but the field names don't need to be the same.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,115 @@ class DataTypeSuite extends SparkFunSuite {
false,
caseSensitive = true)

def checkEqualsIgnoreCompatibleCollationAndNullability(
from: DataType,
to: DataType,
expected: Boolean): Unit = {
val testName = s"equalsIgnoreCompatibleCollationAndNullability: (from: $from, to: $to)"

test(testName) {
assert(DataType.equalsIgnoreCompatibleCollationAndNullability(from, to) === expected)
}
}

// Simple types.
checkEqualsIgnoreCompatibleCollationAndNullability(IntegerType, IntegerType, expected = true)
checkEqualsIgnoreCompatibleCollationAndNullability(BooleanType, BooleanType, expected = true)
checkEqualsIgnoreCompatibleCollationAndNullability(StringType, StringType, expected = true)
checkEqualsIgnoreCompatibleCollationAndNullability(IntegerType, BooleanType, expected = false)
checkEqualsIgnoreCompatibleCollationAndNullability(BooleanType, IntegerType, expected = false)
checkEqualsIgnoreCompatibleCollationAndNullability(StringType, BooleanType, expected = false)
checkEqualsIgnoreCompatibleCollationAndNullability(BooleanType, StringType, expected = false)
checkEqualsIgnoreCompatibleCollationAndNullability(StringType, IntegerType, expected = false)
checkEqualsIgnoreCompatibleCollationAndNullability(IntegerType, StringType, expected = false)
// Collated `StringType`.
checkEqualsIgnoreCompatibleCollationAndNullability(StringType, StringType("UTF8_BINARY_LCASE"),
expected = true)
checkEqualsIgnoreCompatibleCollationAndNullability(
StringType("UTF8_BINARY"), StringType("UTF8_BINARY_LCASE"), expected = true)
// Complex types.
checkEqualsIgnoreCompatibleCollationAndNullability(
ArrayType(StringType),
ArrayType(StringType("UTF8_BINARY_LCASE")),
expected = true
)
checkEqualsIgnoreCompatibleCollationAndNullability(
ArrayType(StringType),
ArrayType(ArrayType(StringType("UTF8_BINARY_LCASE"))),
expected = false
)
checkEqualsIgnoreCompatibleCollationAndNullability(
ArrayType(ArrayType(StringType)),
ArrayType(ArrayType(StringType("UTF8_BINARY_LCASE"))),
expected = true
)
checkEqualsIgnoreCompatibleCollationAndNullability(
MapType(StringType, StringType),
MapType(StringType, StringType("UTF8_BINARY_LCASE")),
expected = true
)
checkEqualsIgnoreCompatibleCollationAndNullability(
MapType(StringType("UTF8_BINARY_LCASE"), StringType),
MapType(StringType, StringType),
expected = false
)
checkEqualsIgnoreCompatibleCollationAndNullability(
MapType(StringType("UTF8_BINARY_LCASE"), ArrayType(StringType)),
MapType(StringType("UTF8_BINARY_LCASE"), ArrayType(StringType("UTF8_BINARY_LCASE"))),
expected = true
)
checkEqualsIgnoreCompatibleCollationAndNullability(
MapType(ArrayType(StringType), IntegerType),
MapType(ArrayType(StringType("UTF8_BINARY_LCASE")), IntegerType),
expected = false
)
checkEqualsIgnoreCompatibleCollationAndNullability(
MapType(ArrayType(StringType("UTF8_BINARY_LCASE")), IntegerType),
MapType(ArrayType(StringType("UTF8_BINARY_LCASE")), IntegerType),
expected = true
)
checkEqualsIgnoreCompatibleCollationAndNullability(
StructType(StructField("a", StringType) :: Nil),
StructType(StructField("a", StringType("UTF8_BINARY_LCASE")) :: Nil),
expected = true
)
checkEqualsIgnoreCompatibleCollationAndNullability(
StructType(StructField("a", ArrayType(StringType)) :: Nil),
StructType(StructField("a", ArrayType(StringType("UTF8_BINARY_LCASE"))) :: Nil),
expected = true
)
checkEqualsIgnoreCompatibleCollationAndNullability(
StructType(StructField("a", MapType(StringType, IntegerType)) :: Nil),
StructType(StructField("a", MapType(StringType("UTF8_BINARY_LCASE"), IntegerType)) :: Nil),
expected = false
)
checkEqualsIgnoreCompatibleCollationAndNullability(
StructType(StructField("a", StringType) :: Nil),
StructType(StructField("b", StringType("UTF8_BINARY_LCASE")) :: Nil),
expected = false
)
// Null compatibility checks.
checkEqualsIgnoreCompatibleCollationAndNullability(
ArrayType(StringType, containsNull = true),
ArrayType(StringType, containsNull = false),
expected = false
)
checkEqualsIgnoreCompatibleCollationAndNullability(
ArrayType(StringType, containsNull = true),
ArrayType(StringType("UTF8_BINARY_LCASE"), containsNull = false),
expected = false
)
checkEqualsIgnoreCompatibleCollationAndNullability(
MapType(StringType, StringType, valueContainsNull = true),
MapType(StringType, StringType, valueContainsNull = false),
expected = false
)
checkEqualsIgnoreCompatibleCollationAndNullability(
StructType(StructField("a", StringType) :: Nil),
StructType(StructField("a", StringType, nullable = false) :: Nil),
expected = false
)

test("SPARK-25031: MapType should produce current formatted string for complex types") {
val keyType: DataType = StructType(Seq(
StructField("a", DataTypes.IntegerType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,9 @@ case class AlterTableChangeColumnCommand(
val newDataSchema = table.dataSchema.fields.map { field =>
if (field.name == originColumn.name) {
// Create a new column from the origin column with the new comment.
val newField = field.copy(dataType = newColumn.dataType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make it more explicit that we effectively allow the data type to change now but we only allow type changes that differ by collation?
E.g. by splitting columnEqual into checking that names match on one side and that the type change is supported on the other + add a withNewType method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarified in comment, split the columnEqual to two functions and added withNewType in StructField, please check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines 359-367: the comment needs to be updated to be clear we now allow changing collation.

Side note: it gives the hive-style syntax for ALTER COLUMN but that's only one of the two syntaxes, see:
https://github.com/apache/spark/blob/master/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4#L130

The other one is the one (partially) documented and the more capable/preferred one: https://spark.apache.org/docs/3.5.1/sql-ref-syntax-ddl-alter-table.html#parameters-4

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment, please check.

val withNewComment: StructField =
addComment(field, newColumn.getComment())
addComment(newField, newColumn.getComment())
// Create a new column from the origin column with the new current default value.
if (newColumn.getCurrentDefaultValue().isDefined) {
if (newColumn.getCurrentDefaultValue().get.nonEmpty) {
Expand Down Expand Up @@ -445,7 +446,8 @@ case class AlterTableChangeColumnCommand(
// name(by resolver) and dataType.
private def columnEqual(
field: StructField, other: StructField, resolver: Resolver): Boolean = {
resolver(field.name, other.name) && field.dataType == other.dataType
resolver(field.name, other.name) &&
DataType.equalsIgnoreCompatibleCollationAndNullability(field.dataType, other.dataType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment of the command in line 359 is not outdated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, please check.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2437,6 +2437,77 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
)
}
}

test("Change column collation") {
withTable("t1", "t2", "t3", "t4") {
// Plain `StringType`.
sql("CREATE TABLE t1(col STRING) USING parquet")
sql("INSERT INTO t1 VALUES ('a')")
checkAnswer(sql("SELECT COLLATION(col) FROM t1"), Row("UTF8_BINARY"))
sql("ALTER TABLE t1 ALTER COLUMN col TYPE STRING COLLATE UTF8_BINARY_LCASE")
checkAnswer(sql("SELECT COLLATION(col) FROM t1"), Row("UTF8_BINARY_LCASE"))

// Invalid "ALTER COLUMN" to Integer.
val alterInt = "ALTER TABLE t1 ALTER COLUMN col TYPE INTEGER"
checkError(
exception = intercept[AnalysisException] {
sql(alterInt)
},
errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
parameters = Map(
"originType" -> "\"STRING COLLATE UTF8_BINARY_LCASE\"",
"originName" -> "`col`",
"table" -> "`spark_catalog`.`default`.`t1`",
"newType" -> "\"INT\"",
"newName" -> "`col`"
),
context = ExpectedContext(fragment = alterInt, start = 0, stop = alterInt.length - 1)
)

// `ArrayType` with collation.
sql("CREATE TABLE t2(col ARRAY<STRING>) USING parquet")
sql("INSERT INTO t2 VALUES (ARRAY('a'))")
checkAnswer(sql("SELECT COLLATION(col[0]) FROM t2"), Row("UTF8_BINARY"))
sql("ALTER TABLE t2 ALTER COLUMN col TYPE ARRAY<STRING COLLATE UTF8_BINARY_LCASE>")
checkAnswer(sql("SELECT COLLATION(col[0]) FROM t2"), Row("UTF8_BINARY_LCASE"))

// `MapType` with collation.
sql("CREATE TABLE t3(col MAP<STRING, STRING>) USING parquet")
sql("INSERT INTO t3 VALUES (MAP('k', 'v'))")
checkAnswer(sql("SELECT COLLATION(col['k']) FROM t3"), Row("UTF8_BINARY"))
sql(
"""
|ALTER TABLE t3 ALTER COLUMN col TYPE
|MAP<STRING, STRING COLLATE UTF8_BINARY_LCASE>""".stripMargin)
checkAnswer(sql("SELECT COLLATION(col['k']) FROM t3"), Row("UTF8_BINARY_LCASE"))

// Invalid change of map key collation.
val alterMap =
"ALTER TABLE t3 ALTER COLUMN col TYPE " +
"MAP<STRING COLLATE UTF8_BINARY_LCASE, STRING>"
checkError(
exception = intercept[AnalysisException] {
sql(alterMap)
},
errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
parameters = Map(
"originType" -> "\"MAP<STRING, STRING COLLATE UTF8_BINARY_LCASE>\"",
"originName" -> "`col`",
"table" -> "`spark_catalog`.`default`.`t3`",
"newType" -> "\"MAP<STRING COLLATE UTF8_BINARY_LCASE, STRING>\"",
"newName" -> "`col`"
),
context = ExpectedContext(fragment = alterMap, start = 0, stop = alterMap.length - 1)
)

// `StructType` with collation.
sql("CREATE TABLE t4(col STRUCT<a:STRING>) USING parquet")
sql("INSERT INTO t4 VALUES (NAMED_STRUCT('a', 'value'))")
checkAnswer(sql("SELECT COLLATION(col.a) FROM t4"), Row("UTF8_BINARY"))
sql("ALTER TABLE t4 ALTER COLUMN col TYPE STRUCT<a:STRING COLLATE UTF8_BINARY_LCASE>")
checkAnswer(sql("SELECT COLLATION(col.a) FROM t4"), Row("UTF8_BINARY_LCASE"))
}
}
}

object FakeLocalFsFileSystem {
Expand Down