Skip to content

Commit

Permalink
SPARK-36680: Support Dynamic Table Options for reads in DELETE and UP…
Browse files Browse the repository at this point in the history
…DATE SQL
  • Loading branch information
shardulm94 committed Jul 11, 2024
1 parent 4d13c22 commit 4aae1dc
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,8 @@ resource
dmlStatementNoWith
: insertInto query #singleInsertQuery
| fromClause multiInsertQueryBody+ #multiInsertQuery
| DELETE FROM identifierReference tableAlias whereClause? #deleteFromTable
| UPDATE identifierReference tableAlias setClause whereClause? #updateTable
| DELETE FROM identifierReference optionsClause? tableAlias whereClause? #deleteFromTable
| UPDATE identifierReference optionsClause? tableAlias setClause whereClause? #updateTable
| MERGE (WITH SCHEMA EVOLUTION)? INTO target=identifierReference targetAlias=tableAlias
USING (source=identifierReference |
LEFT_PAREN sourceQuery=query RIGHT_PAREN) sourceAlias=tableAlias
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {

override def visitDeleteFromTable(
ctx: DeleteFromTableContext): LogicalPlan = withOrigin(ctx) {
val table = createUnresolvedRelation(ctx.identifierReference)
val table = createUnresolvedRelation(ctx.identifierReference, Option(ctx.optionsClause))
val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(), "DELETE")
val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table)
val predicate = if (ctx.whereClause() != null) {
Expand All @@ -478,7 +478,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
}

override def visitUpdateTable(ctx: UpdateTableContext): LogicalPlan = withOrigin(ctx) {
val table = createUnresolvedRelation(ctx.identifierReference)
val table = createUnresolvedRelation(ctx.identifierReference, Option(ctx.optionsClause))
val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(), "UPDATE")
val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table)
val assignments = withAssignments(ctx.setClause().assignmentList())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.parser

import java.util.Locale

import scala.jdk.CollectionConverters._

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
Expand All @@ -28,6 +30,7 @@ import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransfo
import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{Decimal, IntegerType, LongType, StringType, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.StorageLevelMapper
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

Expand Down Expand Up @@ -1779,6 +1782,14 @@ class DDLParserSuite extends AnalysisTest {
stop = 56))
}

test("delete from table: with options") {
parseCompare("DELETE FROM testcat.ns1.ns2.tbl WITH ('a' = 'b', 'c' = 'd')",
DeleteFromTable(UnresolvedRelation(
Seq("testcat", "ns1", "ns2", "tbl"),
new CaseInsensitiveStringMap(Map("a" -> "b", "c" -> "d").asJava)),
Literal.TrueLiteral))
}

test("update table: basic") {
parseCompare(
"""
Expand Down Expand Up @@ -1821,6 +1832,21 @@ class DDLParserSuite extends AnalysisTest {
stop = 70))
}

test("update table: with options") {
parseCompare(
"""
|UPDATE testcat.ns1.ns2.tbl WITH ('a' = 'b', 'c' = 'd')
|SET a='Robert', b=32
""".stripMargin,
UpdateTable(
UnresolvedRelation(
Seq("testcat", "ns1", "ns2", "tbl"),
new CaseInsensitiveStringMap(Map("a" -> "b", "c" -> "d").asJava)),
Seq(Assignment(UnresolvedAttribute("a"), Literal("Robert")),
Assignment(UnresolvedAttribute("b"), Literal(32))),
None))
}

test("merge into table: basic") {
parseCompare(
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchDatabaseException, NoSuchNamespaceException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, CommandResult}
import org.apache.spark.sql.catalyst.statsEstimation.StatsEstimationTestBase
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, _}
Expand All @@ -41,7 +41,7 @@ import org.apache.spark.sql.errors.QueryErrorsBase
import org.apache.spark.sql.execution.FilterExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION}
Expand Down Expand Up @@ -3547,6 +3547,34 @@ class DataSourceV2SQLSuiteV1Filter
}
}

test("SPARK-36680: Support Dynamic Table Options in DELETE and UPDATE SQL") {
val t1 = s"testcat_rowlevel.table"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b')")

var df = sql(s"DELETE FROM $t1 WITH (`split-size` = 5) WHERE id > 1")
var commandPlan = df.queryExecution.commandExecuted
.asInstanceOf[CommandResult].commandLogicalPlan
var collected = commandPlan.collect {
case r: DataSourceV2Relation =>
assert(r.options.get("split-size") == "5")
}
assert (collected.size == 1)
checkAnswer(sql(s"SELECT * FROM $t1"), Seq(Row(1, "a")))

df = sql(s"UPDATE $t1 WITH (`split-size` = 5) SET id = 3, data = 'abc'")
commandPlan = df.queryExecution.commandExecuted
.asInstanceOf[CommandResult].commandLogicalPlan
collected = commandPlan.collect {
case r: DataSourceV2Relation =>
assert(r.options.get("split-size") == "5")
}
assert (collected.size == 1)
checkAnswer(sql(s"SELECT * FROM $t1"), Seq(Row(3, "abc")))
}
}

private def testNotSupportedV2Command(
sqlCommand: String,
sqlParams: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, InMemoryCatalog, InMemoryPartitionTableCatalog, InMemoryTableWithV2FilterCatalog, StagingInMemoryTableCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, InMemoryCatalog, InMemoryPartitionTableCatalog, InMemoryRowLevelOperationTableCatalog, InMemoryTableWithV2FilterCatalog, StagingInMemoryTableCatalog}
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.test.SharedSparkSession

Expand All @@ -41,6 +41,7 @@ trait DatasourceV2SQLBase
registerCatalog("testpart", classOf[InMemoryPartitionTableCatalog])
registerCatalog("testcat_atomic", classOf[StagingInMemoryTableCatalog])
registerCatalog("testcat2", classOf[InMemoryCatalog])
registerCatalog("testcat_rowlevel", classOf[InMemoryRowLevelOperationTableCatalog])
registerCatalog(SESSION_CATALOG_NAME, classOf[InMemoryTableSessionCatalog])

val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
Expand Down

0 comments on commit 4aae1dc

Please sign in to comment.