Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19129] [SQL] SessionCatalog: Disallow empty part col values in partition spec #16583

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ class SessionCatalog(
def loadPartition(
name: TableIdentifier,
loadPath: String,
partition: TablePartitionSpec,
spec: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
Expand All @@ -340,8 +340,9 @@ class SessionCatalog(
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
requireNonEmptyValueInPartitionSpec(Seq(spec))
externalCatalog.loadPartition(
db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
db, table, loadPath, spec, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
}

def defaultTablePath(tableIdent: TableIdentifier): String = {
Expand Down Expand Up @@ -693,6 +694,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
}

Expand All @@ -711,6 +713,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(specs)
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
}

Expand All @@ -731,6 +734,8 @@ class SessionCatalog(
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(specs, tableMetadata)
requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
requireNonEmptyValueInPartitionSpec(specs)
requireNonEmptyValueInPartitionSpec(newSpecs)
externalCatalog.renamePartitions(db, table, specs, newSpecs)
}

Expand All @@ -749,6 +754,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
externalCatalog.alterPartitions(db, table, parts)
}

Expand All @@ -762,6 +768,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(Seq(spec))
externalCatalog.getPartition(db, table, spec)
}

Expand All @@ -781,6 +788,7 @@ class SessionCatalog(
requireTableExists(TableIdentifier(table, Option(db)))
partialSpec.foreach { spec =>
requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(Seq(spec))
}
externalCatalog.listPartitionNames(db, table, partialSpec)
}
Expand All @@ -801,6 +809,7 @@ class SessionCatalog(
requireTableExists(TableIdentifier(table, Option(db)))
partialSpec.foreach { spec =>
requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(Seq(spec))
}
externalCatalog.listPartitions(db, table, partialSpec)
}
Expand All @@ -819,6 +828,19 @@ class SessionCatalog(
externalCatalog.listPartitionsByFilter(db, table, predicates)
}

/**
* Verify if the input partition spec has any empty value.
*/
private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = {
specs.foreach { s =>
if (s.values.exists(_.isEmpty)) {
val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
throw new AnalysisException(
s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
}
}
}

/**
* Verify if the input partition spec exactly matches the existing defined partition spec
* The columns must be the same but the orders could be different.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,8 @@ abstract class CatalogTestUtils {
CatalogTablePartition(Map("a" -> "5", "b" -> "6", "c" -> "7"), storageFormat)
lazy val partWithUnknownColumns =
CatalogTablePartition(Map("a" -> "5", "unknown" -> "6"), storageFormat)
lazy val partWithEmptyValue =
CatalogTablePartition(Map("a" -> "3", "b" -> ""), storageFormat)
lazy val funcClass = "org.apache.spark.myFunc"

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,13 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
e = intercept[AnalysisException] {
catalog.createPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(partWithEmptyValue, part1), ignoreIfExists = true)
}
assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
"empty partition column value"))
}

test("drop partitions") {
Expand Down Expand Up @@ -722,6 +729,16 @@ class SessionCatalogSuite extends PlanTest {
assert(e.getMessage.contains(
"Partition spec is invalid. The spec (a, unknown) must be contained within " +
"the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
e = intercept[AnalysisException] {
catalog.dropPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(partWithEmptyValue.spec, part1.spec),
ignoreIfNotExists = false,
purge = false,
retainData = false)
}
assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
"empty partition column value"))
}

test("get partition") {
Expand Down Expand Up @@ -767,6 +784,11 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
e = intercept[AnalysisException] {
catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithEmptyValue.spec)
}
assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
"empty partition column value"))
}

test("rename partitions") {
Expand Down Expand Up @@ -834,6 +856,13 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
e = intercept[AnalysisException] {
catalog.renamePartitions(
TableIdentifier("tbl1", Some("db2")),
Seq(part1.spec), Seq(partWithEmptyValue.spec))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
"empty partition column value"))
}

test("alter partitions") {
Expand Down Expand Up @@ -893,6 +922,11 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
e = intercept[AnalysisException] {
catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithEmptyValue))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
"empty partition column value"))
}

test("list partition names") {
Expand All @@ -914,10 +948,24 @@ class SessionCatalogSuite extends PlanTest {

test("list partition names with invalid partial partition spec") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
var e = intercept[AnalysisException] {
catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
Some(partWithMoreColumns.spec))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
"contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
e = intercept[AnalysisException] {
catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
Some(partWithUnknownColumns.spec))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
"contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
e = intercept[AnalysisException] {
catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
Some(Map("unknown" -> "unknown")))
Some(partWithEmptyValue.spec))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
"empty partition column value"))
}

test("list partitions") {
Expand All @@ -937,10 +985,22 @@ class SessionCatalogSuite extends PlanTest {

test("list partitions with invalid partial partition spec") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above one is verifying the catalog.listPartitionNames and this one is verifying catalog.listPartitions. Should we keep them separate?

val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
catalog.listPartitions(
TableIdentifier("tbl2", Some("db2")), Some(Map("unknown" -> "unknown")))
var e = intercept[AnalysisException] {
catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithMoreColumns.spec))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
"contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
e = intercept[AnalysisException] {
catalog.listPartitions(TableIdentifier("tbl2", Some("db2")),
Some(partWithUnknownColumns.spec))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
"contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
e = intercept[AnalysisException] {
catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithEmptyValue.spec))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
"empty partition column value"))
}

test("list partitions when database/table does not exist") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ private[hive] class HiveClientImpl(
// do the check at first and collect all the matching partitions
val matchingParts =
specs.flatMap { s =>
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
// The provided spec here can be a partial spec, i.e. it will match all partitions
// whose specs are supersets of this partial spec. E.g. If a table has partitions
// (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
Expand Down Expand Up @@ -545,6 +546,7 @@ private[hive] class HiveClientImpl(
// -1 for result limit means "no limit/return all"
client.getPartitionNames(table.database, table.identifier.table, -1)
case Some(s) =>
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
client.getPartitionNames(table.database, table.identifier.table, s.asJava, -1)
}
hivePartitionNames.asScala.sorted
Expand All @@ -568,7 +570,9 @@ private[hive] class HiveClientImpl(
val hiveTable = toHiveTable(table)
val parts = spec match {
case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
case Some(s) => client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
case Some(s) =>
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also add the assert in getPartitionNames?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it has the same issue.

client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
}
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,16 @@ class HiveDDLSuite
}
}

test("SPARK-19129: drop partition with a empty string will drop the whole table") {
val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name")
df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable")
val e = intercept[AnalysisException] {
spark.sql("alter table partitionedTable drop partition(partCol1='')")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the behavior of hive? also throw exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hive (v2.1.1) does not throw exception / error message here.

ALTER TABLE partitioned_table DROP PARTITION(ds = '') ;
OK
Time taken: 0.152 seconds

Given that (creating / inserting / querying) partitions with empty string is not allowed, DROP PARTITIONS going through seems inconsistent behavior to me. It might have made sense for supporting regexes but as per Hive language specification, partition spec has to be a plain string. If there is no way to create partitions with empty partition column name, allowing DROP seems werid. +1 for throwing exception ..... unless the general consensus about hive compatibility is to be exact same behavior (including such weirdness).

INSERT OVERWRITE TABLE partitioned_table PARTITION(ds = '') SELECT key AS user_id, value AS name FROM src;
FAILED: SemanticException [Error 10006]: Line 1:49 Partition not found ''''

ALTER TABLE partitioned_table ADD PARTITION(ds = '') ;
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. partition spec is invalid; field ds does not exist or is empty

DESC FORMATTED partitioned_table PARTITION(ds = '') ;
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. cannot find field null from [0:user_id, 1:name]

TRUNCATE TABLE partitioned_table PARTITION(ds = '') ;
FAILED: SemanticException [Error 10006]: Partition not found {ds=}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tejasapatil Thank you for your research

So far, we are not completely following Hive in the partition-related DDL commands. DROP PARTITION is an example. If the users-specified spec does not exist, we will throw an exception. Instead, Hive just silently ignores it without any exception, but Hive will always report which partition is dropped after the command. Thus, maybe we can improve this in the future PR.

Thus, this PR is to follow the same way to block the invalid inputs. That is, throwing an exception when the input partition spec is not valid.

}.getMessage
assert(e.contains("Partition spec is invalid. The spec ([partCol1=]) contains an empty " +
"partition column value"))
}

test("add/drop partitions - external table") {
val catalog = spark.sessionState.catalog
withTempDir { tmpDir =>
Expand Down