Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34720][SQL] MERGE ... UPDATE/INSERT * should do by-name resolution #32192

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1469,14 +1469,18 @@ class Analyzer(override val catalogManager: CatalogManager)
case UpdateAction(updateCondition, assignments) =>
val resolvedUpdateCondition = updateCondition.map(
resolveExpressionByPlanChildren(_, m))
// The update value can access columns from both target and source tables.
UpdateAction(
resolvedUpdateCondition,
resolveAssignments(Some(assignments), m, resolveValuesWithSourceOnly = false))
// The update value can access columns from both target and source tables.
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = false))
case UpdateStarAction(updateCondition) =>
val assignments = targetTable.output.map { attr =>
Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
}
UpdateAction(
updateCondition.map(resolveExpressionByPlanChildren(_, m)),
resolveAssignments(assignments = None, m, resolveValuesWithSourceOnly = false))
// For UPDATE *, the value must from source table.
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid resolving UpdateAction again with resolveValuesWithSourceOnly = false in the next analysis round, I added resolveMergeExprOrFail to fail earlier if an attribute can't be resolved.

case o => o
}
val newNotMatchedActions = m.notMatchedActions.map {
Expand All @@ -1487,15 +1491,18 @@ class Analyzer(override val catalogManager: CatalogManager)
resolveExpressionByPlanChildren(_, Project(Nil, m.sourceTable)))
InsertAction(
resolvedInsertCondition,
resolveAssignments(Some(assignments), m, resolveValuesWithSourceOnly = true))
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
case InsertStarAction(insertCondition) =>
// The insert action is used when not matched, so its condition and value can only
// access columns from the source table.
val resolvedInsertCondition = insertCondition.map(
resolveExpressionByPlanChildren(_, Project(Nil, m.sourceTable)))
val assignments = targetTable.output.map { attr =>
Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
}
InsertAction(
resolvedInsertCondition,
resolveAssignments(assignments = None, m, resolveValuesWithSourceOnly = true))
resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
case o => o
}
val resolvedMergeCondition = resolveExpressionByPlanChildren(m.mergeCondition, m)
Expand All @@ -1513,33 +1520,38 @@ class Analyzer(override val catalogManager: CatalogManager)
}

def resolveAssignments(
assignments: Option[Seq[Assignment]],
assignments: Seq[Assignment],
mergeInto: MergeIntoTable,
resolveValuesWithSourceOnly: Boolean): Seq[Assignment] = {
if (assignments.isEmpty) {
val expandedColumns = mergeInto.targetTable.output
val expandedValues = mergeInto.sourceTable.output
expandedColumns.zip(expandedValues).map(kv => Assignment(kv._1, kv._2))
} else {
assignments.get.map { assign =>
val resolvedKey = assign.key match {
case c if !c.resolved =>
resolveExpressionByPlanChildren(c, Project(Nil, mergeInto.targetTable))
case o => o
}
val resolvedValue = assign.value match {
// The update values may contain target and/or source references.
case c if !c.resolved =>
if (resolveValuesWithSourceOnly) {
resolveExpressionByPlanChildren(c, Project(Nil, mergeInto.sourceTable))
} else {
resolveExpressionByPlanChildren(c, mergeInto)
}
case o => o
}
Assignment(resolvedKey, resolvedValue)
assignments.map { assign =>
val resolvedKey = assign.key match {
case c if !c.resolved =>
resolveMergeExprOrFail(c, Project(Nil, mergeInto.targetTable))
case o => o
}
val resolvedValue = assign.value match {
// The update values may contain target and/or source references.
case c if !c.resolved =>
if (resolveValuesWithSourceOnly) {
resolveMergeExprOrFail(c, Project(Nil, mergeInto.sourceTable))
} else {
resolveMergeExprOrFail(c, mergeInto)
}
case o => o
}
Assignment(resolvedKey, resolvedValue)
}
}

private def resolveMergeExprOrFail(e: Expression, p: LogicalPlan): Expression = {
val resolved = resolveExpressionByPlanChildren(e, p)
resolved.references.filter(!_.resolved).foreach { a =>
// Note: This will throw error only on unresolved attribute issues,
// not other resolution errors like mismatched data types.
val cols = p.inputSet.toSeq.map(_.sql).mkString(", ")
a.failAnalysis(s"cannot resolve ${a.sql} in MERGE command given columns [$cols]")
}
resolved
}

// This method is used to trim groupByExpressions/selectedGroupByExpressions's top-level
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ class PlanResolutionSuite extends AnalysisTest {
t
}

private val table1: Table = {
val t = mock(classOf[Table])
when(t.schema()).thenReturn(new StructType().add("s", "string").add("i", "int"))
when(t.partitioning()).thenReturn(Array.empty[Transform])
t
}

private val table2: Table = {
val t = mock(classOf[Table])
when(t.schema()).thenReturn(new StructType().add("i", "int").add("x", "string"))
when(t.partitioning()).thenReturn(Array.empty[Transform])
t
}

private val tableWithAcceptAnySchemaCapability: Table = {
val t = mock(classOf[Table])
when(t.schema()).thenReturn(new StructType().add("i", "int"))
Expand Down Expand Up @@ -91,7 +105,8 @@ class PlanResolutionSuite extends AnalysisTest {
when(newCatalog.loadTable(any())).thenAnswer((invocation: InvocationOnMock) => {
invocation.getArgument[Identifier](0).name match {
case "tab" => table
case "tab1" => table
case "tab1" => table1
case "tab2" => table2
case name => throw new NoSuchTableException(name)
}
})
Expand All @@ -107,7 +122,7 @@ class PlanResolutionSuite extends AnalysisTest {
case "v1Table1" => v1Table
case "v1HiveTable" => v1HiveTable
case "v2Table" => table
case "v2Table1" => table
case "v2Table1" => table1
case "v2TableWithAcceptAnySchemaCapability" => tableWithAcceptAnySchemaCapability
case "view" => view
case name => throw new NoSuchTableException(name)
Expand Down Expand Up @@ -1385,7 +1400,7 @@ class PlanResolutionSuite extends AnalysisTest {
// cte
val sql5 =
s"""
|WITH source(i, s) AS
|WITH source(s, i) AS
| (SELECT * FROM $source)
|MERGE INTO $target AS target
|USING source
Expand All @@ -1405,7 +1420,7 @@ class PlanResolutionSuite extends AnalysisTest {
updateAssigns)),
Seq(InsertAction(Some(EqualTo(il: AttributeReference, StringLiteral("insert"))),
insertAssigns))) =>
assert(source.output.map(_.name) == Seq("i", "s"))
assert(source.output.map(_.name) == Seq("s", "i"))
checkResolution(target, source, mergeCondition, Some(dl), Some(ul), Some(il),
updateAssigns, insertAssigns)

Expand All @@ -1414,8 +1429,7 @@ class PlanResolutionSuite extends AnalysisTest {
}

// no aliases
Seq(("v2Table", "v2Table1"),
("testcat.tab", "testcat.tab1")).foreach { pair =>
Seq(("v2Table", "v2Table1"), ("testcat.tab", "testcat.tab1")).foreach { pair =>

val target = pair._1
val source = pair._2
Expand Down Expand Up @@ -1507,7 +1521,7 @@ class PlanResolutionSuite extends AnalysisTest {
assert(e5.message.contains("Reference 's' is ambiguous"))
}

val sql6 =
val sql1 =
s"""
|MERGE INTO non_exist_target
|USING non_exist_source
Expand All @@ -1516,13 +1530,37 @@ class PlanResolutionSuite extends AnalysisTest {
|WHEN MATCHED THEN UPDATE SET *
|WHEN NOT MATCHED THEN INSERT *
""".stripMargin
val parsed = parseAndResolve(sql6)
val parsed = parseAndResolve(sql1)
parsed match {
case u: MergeIntoTable =>
assert(u.targetTable.isInstanceOf[UnresolvedRelation])
assert(u.sourceTable.isInstanceOf[UnresolvedRelation])
case _ => fail("Expect MergeIntoTable, but got:\n" + parsed.treeString)
}

// UPDATE * with incompatible schema between source and target tables.
val sql2 =
"""
|MERGE INTO testcat.tab
|USING testcat.tab2
|ON 1 = 1
|WHEN MATCHED THEN UPDATE SET *
|""".stripMargin
val e2 = intercept[AnalysisException](parseAndResolve(sql2))
assert(e2.message.contains(
"cannot resolve s in MERGE command given columns [testcat.tab2.i, testcat.tab2.x]"))

// INSERT * with incompatible schema between source and target tables.
val sql3 =
"""
|MERGE INTO testcat.tab
|USING testcat.tab2
|ON 1 = 1
|WHEN NOT MATCHED THEN INSERT *
|""".stripMargin
val e3 = intercept[AnalysisException](parseAndResolve(sql3))
assert(e3.message.contains(
"cannot resolve s in MERGE command given columns [testcat.tab2.i, testcat.tab2.x]"))
}

test("MERGE INTO TABLE - skip resolution on v2 tables that accept any schema") {
Expand Down