Skip to content

Commit

Permalink
[Spark] Relax type check in `SchemaUtils.normalizeColumnNamesInDataTy…
Browse files Browse the repository at this point in the history
…pe` (#4143)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

This PR modifies the nested filed name normalization code. This code is
used when we write to a Delta table. It makes sure that the names of all
the columns in the source data (meaning, the data that we are writing to
the table) match the target table column names in terms of case. For
example, if the source contains column `address.CITY`, while the table
contains column `address.city`, the source column is renamed to match
the table.

If the schema of the source data and the table data differs enough to
interfere with the name normalization, we log
`delta.assertions.schemaNormalization.nonNestedTypeMismatch`. However,
this check is too restrictive, and as a result, we log this assertion
too often. This PR relaxes the check. We now assume that any two atomic
columns can be matched to each other.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

New tests in `SchemaUtilsSuite`.

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->

No.
  • Loading branch information
ala authored Feb 13, 2025
1 parent 35e47fd commit 0b818bf
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,10 @@ def normalizeColumnNamesInDataType(
// When schema evolution adds a new column during MERGE, it can be represented with
// a NullType in the schema of the data written by the MERGE.
sourceDataType
case (_: IntegralType, _: IntegralType) =>
// The integral types can be cast to each other later on.
case (_: AtomicType, _: AtomicType) =>
// Some atomic types (e.g. integral types) can be cast to each other later on. For now,
// it's enough to know that there are no nested fields inside the atomic types that might
// require normalization.
sourceDataType
case _ =>
if (DeltaUtils.isTesting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1566,61 +1566,66 @@ class SchemaUtilsSuite extends QueryTest
/////////////////////////////////
// normalizeColumnNamesInDataType
/////////////////////////////////

private def checkNormalizedColumnNamesInDataType(
private def runNormalizeColumnNamesInDataType(
sourceDataType: DataType,
tableDataType: DataType,
expectedDataType: DataType): Unit = {
assert(normalizeColumnNamesInDataType(
tableDataType: DataType): DataType = {
normalizeColumnNamesInDataType(
deltaLog = null,
sourceDataType,
tableDataType,
sourceParentFields = Seq.empty,
tableSchema = new StructType()) == expectedDataType)
tableSchema = new StructType())
}

test("normalize column names in data type - atomic types") {
test("normalize column names in data type - top-level atomic types") {
val source = new StructType()
.add("a", IntegerType)
.add("b", StringType)
.add("c", LongType)
.add("d", DateType)
val table = new StructType()
.add("B", StringType)
.add("A", IntegerType)
.add("A", LongType) // LongType != IntegerType
.add("D", DecimalType(10, 0)) // DecimalType != DateType
.add("C", StringType) // StringType != LongType
val expected = new StructType()
.add("A", IntegerType)
.add("B", StringType)
checkNormalizedColumnNamesInDataType(source, table, expected)
.add("C", LongType)
.add("D", DateType)
assert(runNormalizeColumnNamesInDataType(source, table) == expected)
}

test("normalize column names in data type - incompatible atomic types") {
val source = new StructType()
test("normalize column names in data type - incompatible top-level types") {
val schema1a = new StructType()
.add("a", IntegerType)
.add("b", StringType)
val table = new StructType()
val schema1b = new StructType()
.add("B", StringType)
.add("A", StringType) // StringType != IntegerType
val exception = intercept[AssertionError] {
normalizeColumnNamesInDataType(
deltaLog = null,
source,
table,
sourceParentFields = Seq.empty,
tableSchema = new StructType())
.add("A", new StructType()) // StructType != IntegerType
intercept[AssertionError] {
runNormalizeColumnNamesInDataType(schema1a, schema1b)
}
intercept[AssertionError] {
runNormalizeColumnNamesInDataType(schema1b, schema1a)
}
assert(exception.getMessage.contains("Types without nesting should match"))
}

test("normalize column names in data type - different integral types") {
val source = new StructType()
.add("a", IntegerType)
.add("b", StringType)
val table = new StructType()
.add("B", StringType)
.add("A", LongType) // LongType != IntegerType
val expected = new StructType()
.add("A", IntegerType)
.add("B", StringType)
checkNormalizedColumnNamesInDataType(source, table, expected)
val schema2a = new StructType()
.add("x", StringType)
.add("y", new StructType()
.add("z", IntegerType)
)
val schema2b = new StructType()
.add("x", StringType)
.add("Y", new StructType()
.add("z", ArrayType(IntegerType)) // ArrayType != IntegerType
)
intercept[AssertionError] {
runNormalizeColumnNamesInDataType(schema2a, schema2b)
}
intercept[AssertionError] {
runNormalizeColumnNamesInDataType(schema2b, schema2a)
}
}

test("normalize column names in data type - nested structs") {
Expand Down Expand Up @@ -1669,10 +1674,10 @@ class SchemaUtilsSuite extends QueryTest
.add("D1", IntegerType)
.add("D2", LongType)
)
checkNormalizedColumnNamesInDataType(source, table, expected)
assert(runNormalizeColumnNamesInDataType(source, table) == expected)
}

test("normalize column names in data type - incompatible types in a struct") {
test("normalize column names in data type - different atomic types in a map") {
val source = new StructType()
.add("a", new StructType()
.add("b", new StructType()
Expand All @@ -1685,8 +1690,33 @@ class SchemaUtilsSuite extends QueryTest
.add("A", new StructType()
.add("B", new StructType()
.add("C", MapType(StringType, IntegerType))))
assertThrows[AssertionError] {
checkNormalizedColumnNamesInDataType(source, table, expected)
assert(runNormalizeColumnNamesInDataType(source, table) == expected)
}

test("normalize column names in data type - incompatible nested types") {
val schema1 = new StructType()
.add("a", new StructType()
.add("b", new StructType()
.add("c", IntegerType)))
val schema2 = new StructType()
.add("A", new StructType()
.add("B", new StructType()
.add("C", ArrayType(IntegerType))))
val schema3 = new StructType()
.add("A", new StructType()
.add("b", new StructType()
.add("C", new StructType())))
val schemas = Seq(schema1, schema2, schema3)

for (left <- schemas; right <- schemas) {
if (left == right) {
// Make sure there's no error when the schemas are the same.
assert(runNormalizeColumnNamesInDataType(left, right) == left)
} else {
intercept[AssertionError] {
runNormalizeColumnNamesInDataType(left, right)
}
}
}
}

Expand All @@ -1713,7 +1743,7 @@ class SchemaUtilsSuite extends QueryTest
ArrayType(new StructType()
.add("Aa", IntegerType)
.add("Bb", StringType)))
checkNormalizedColumnNamesInDataType(source, table, expected)
assert(runNormalizeColumnNamesInDataType(source, table) == expected)
}

test("normalize column names in data type - missing column") {
Expand Down Expand Up @@ -1770,14 +1800,14 @@ class SchemaUtilsSuite extends QueryTest
nullable = true, comment = "comment for b3"),
nullable = false, comment = "comment for a2"
)
checkNormalizedColumnNamesInDataType(source, table, expected)
assert(runNormalizeColumnNamesInDataType(source, table) == expected)
}

test("normalize column names in data type - empty source struct") {
val source = new StructType()
val table = new StructType().add("a", IntegerType)
val expected = new StructType()
checkNormalizedColumnNamesInDataType(source, table, expected)
assert(runNormalizeColumnNamesInDataType(source, table) == expected)
}

////////////////////////////
Expand Down

0 comments on commit 0b818bf

Please sign in to comment.