Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql (fix): Resolve UNNEST earlier to prevent cyclic references #3312

Merged
merged 2 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ object TypeResolver extends LogSupport {
resolveTableRef ::
resolveJoinUsing ::
resolveSubquery ::
resolveJoinUnnest ::
resolveRegularRelation ::
resolveColumns ::
resolveAggregationIndexes ::
Expand Down Expand Up @@ -231,6 +232,22 @@ object TypeResolver extends LogSupport {
}
}

object resolveJoinUnnest extends RewriteRule {
override def apply(context: AnalyzerContext): PlanRewriter = {
case j @ Join(_, _, a @ AliasedRelation(u: Unnest, _, _, _), _, _) =>
j.copy(right = a.copy(child = resolveUnnest(u, j, context)))
case j @ Join(_, a @ AliasedRelation(u: Unnest, _, _, _), _, _, _) =>
j.copy(left = a.copy(child = resolveUnnest(u, j, context)))
}

private def resolveUnnest(u: Unnest, j: Join, context: AnalyzerContext): Unnest = {
val resolvedAttributes = j.outputAttributes.filter(_.resolved)
Copy link
Member Author

@takezoe takezoe Jan 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excluding unresolved attributes is a workaround to prevent cyclic references easily.

u.transformUpExpressions { case x: Identifier =>
resolveExpression(context, x, resolvedAttributes)
}.asInstanceOf[Unnest]
}
}

object resolveJoinUsing extends RewriteRule {
def apply(context: AnalyzerContext): PlanRewriter = {
case j @ Join(joinType, left, right, u @ JoinUsing(joinKeys, _), _) =>
Expand Down Expand Up @@ -375,8 +392,8 @@ object TypeResolver extends LogSupport {
} else {
a.copy(expr = resolved)
}
case SingleColumn(a: Attribute, qualifier, _, _) if a.resolved =>
a
case SingleColumn(a: Attribute, qualifier, tableAlias, _) if a.resolved =>
a.withQualifier(qualifier).withTableAlias(tableAlias)
Comment on lines +395 to +396
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following case cannot be resolved without this fix:

select t.id from (select id from A) as t(id)

case m: MultiSourceColumn =>
var changed = false
val resolvedInputs = m.inputs.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,34 @@ class TypeResolverTest extends AirSpec with ResolverTestHelper {
ResolvedAttribute("value", DataType.LongType, Some("t"), None, None, None)
)
}

test("un3: resolve UNNEST array column without CROSS JOIN") {
val p = analyze("SELECT id, n FROM A, UNNEST (name) AS t (n)")
p.outputAttributes shouldMatch { case List(c1: Attribute, c2: Attribute) =>
c1.fullName shouldBe "id"
c2.fullName shouldBe "n"
}
}

test("un4: resolve UNNEST array column with the same output name") {
val p = analyze("SELECT id, t.name FROM A CROSS JOIN UNNEST (name) AS t (name)")
p.outputAttributes shouldMatch { case List(c1: Attribute, c2: Attribute) =>
c1.fullName shouldBe "id"
c1.sourceColumns.map(_.column) shouldBe Seq(a1)
c2.fullName shouldBe "t.name"
c2.sourceColumns.map(_.column) shouldBe Seq(a2)
}
}

test("un5: resolve UNNEST array column with qualifier") {
val p = analyze("SELECT A.id, t.name FROM A INNER JOIN B ON A.id = B.id CROSS JOIN UNNEST (A.name) AS t (name)")
p.outputAttributes shouldMatch { case List(c1: Attribute, c2: Attribute) =>
c1.fullName shouldBe "A.id"
c1.sourceColumns.map(_.column) shouldBe Seq(a1)
c2.fullName shouldBe "t.name"
c2.sourceColumns.map(_.column) shouldBe Seq(a2)
}
}
}

test("resolve select from values") {
Expand Down Expand Up @@ -1149,4 +1177,19 @@ class TypeResolverTest extends AirSpec with ResolverTestHelper {
}
}
}

test("Resolve identifier refers to column in AliasedRelation") {
// with column alias (id -> t.xid)
val p1 = analyze("select t.xid from (select id from A) as t(xid)")
p1.outputAttributes shouldMatch { case List(col: ResolvedAttribute) =>
col.fullName shouldBe "t.xid"
col.sourceColumn.map(_.column) shouldBe Some(a1)
}
// without column alias (id -> t.td)
val p2 = analyze("select t.id from (select id from A) as t(id)")
p2.outputAttributes shouldMatch { case List(col: ResolvedAttribute) =>
col.fullName shouldBe "t.id"
col.sourceColumn.map(_.column) shouldBe Some(a1)
}
}
}
Loading