Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17393] [SQL] Error Handling when CTAS Against the Same Data Source Table Using Overwrite Mode #14954

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,25 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
failAnalysis(s"Database name ${tblIdent.database.get} is not a valid name for " +
s"metastore. Metastore only accepts table name containing characters, numbers and _.")
}
if (query.isDefined &&
mode == SaveMode.Overwrite &&
catalog.tableExists(tableDesc.identifier)) {
// Need to remove SubQuery operator.
EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match {
// Only do the check if the table is a data source table
// (the relation is a BaseRelation).
case l @ LogicalRelation(dest: BaseRelation, _, _) =>
// Get all input data source relations of the query.
val srcRelations = query.get.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
}
if (srcRelations.contains(dest)) {
failAnalysis(
s"Cannot overwrite table ${tableDesc.identifier} that is also being read from")
}
case _ => // OK
}
}

case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: InsertableRelation, _, _),
Expand Down Expand Up @@ -357,32 +376,6 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")

case CreateTable(tableDesc, mode, Some(query)) =>
// When the SaveMode is Overwrite, we need to check if the table is an input table of
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.
if (mode == SaveMode.Overwrite && catalog.tableExists(tableDesc.identifier)) {
// Need to remove SubQuery operator.
EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match {
// Only do the check if the table is a data source table
// (the relation is a BaseRelation).
case l @ LogicalRelation(dest: BaseRelation, _, _) =>
// Get all input data source relations of the query.
val srcRelations = query.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
}
if (srcRelations.contains(dest)) {
failAnalysis(
s"Cannot overwrite table ${tableDesc.identifier} that is also being read from.")
} else {
// OK
}

case _ => // OK
}
} else {
// OK
}

case _ => // OK
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,58 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}

test("saveAsTable - source and target are the same table") {
val tableName = "tab1"
withTable(tableName) {
Seq((1, 2)).toDF("i", "j").write.saveAsTable(tableName)

table(tableName).write.mode(SaveMode.Append).saveAsTable(tableName)
checkAnswer(table(tableName),
Seq(Row(1, 2), Row(1, 2)))

table(tableName).write.mode(SaveMode.Ignore).saveAsTable(tableName)
checkAnswer(table(tableName),
Seq(Row(1, 2), Row(1, 2)))

var e = intercept[AnalysisException] {
table(tableName).write.mode(SaveMode.Overwrite).saveAsTable(tableName)
}.getMessage
assert(e.contains(s"Cannot overwrite table `$tableName` that is also being read from"))

e = intercept[AnalysisException] {
table(tableName).write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName)
}.getMessage
assert(e.contains(s"Table `$tableName` already exists"))
}
}

test("insertInto - source and target are the same table") {
val tableName = "tab1"
withTable(tableName) {
Seq((1, 2)).toDF("i", "j").write.saveAsTable(tableName)

table(tableName).write.mode(SaveMode.Append).insertInto(tableName)
checkAnswer(
table(tableName),
Seq(Row(1, 2), Row(1, 2)))

table(tableName).write.mode(SaveMode.Ignore).insertInto(tableName)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we issue error messages when the operation is insertInto but the mode is Ignore or ErrorIfExists?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I think we should, let's do it in follow-up PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do it. Thanks!

checkAnswer(
table(tableName),
Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2)))

table(tableName).write.mode(SaveMode.ErrorIfExists).insertInto(tableName)
checkAnswer(
table(tableName),
Seq(Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2), Row(1, 2)))

val e = intercept[AnalysisException] {
table(tableName).write.mode(SaveMode.Overwrite).insertInto(tableName)
}.getMessage
assert(e.contains(s"Cannot overwrite a path that is also being read from"))
}
}

test("saveAsTable[append]: less columns") {
withTable("saveAsTable_less_columns") {
Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_less_columns")
Expand Down