diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 12af9e0c32611..8008fcd639f14 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -331,7 +331,7 @@ class SessionCatalog( def loadPartition( name: TableIdentifier, loadPath: String, - partition: TablePartitionSpec, + spec: TablePartitionSpec, isOverwrite: Boolean, holdDDLTime: Boolean, inheritTableSpecs: Boolean, @@ -340,8 +340,9 @@ class SessionCatalog( val table = formatTableName(name.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Some(db))) + requireNonEmptyValueInPartitionSpec(Seq(spec)) externalCatalog.loadPartition( - db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal) + db, table, loadPath, spec, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal) } def defaultTablePath(tableIdent: TableIdentifier): String = { @@ -693,6 +694,7 @@ class SessionCatalog( requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) + requireNonEmptyValueInPartitionSpec(parts.map(_.spec)) externalCatalog.createPartitions(db, table, parts, ignoreIfExists) } @@ -711,6 +713,7 @@ class SessionCatalog( requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) + requireNonEmptyValueInPartitionSpec(specs) externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData) } @@ -731,6 +734,8 @@ class SessionCatalog( requireTableExists(TableIdentifier(table, Option(db))) requireExactMatchedPartitionSpec(specs, tableMetadata) requireExactMatchedPartitionSpec(newSpecs, tableMetadata) + requireNonEmptyValueInPartitionSpec(specs) + requireNonEmptyValueInPartitionSpec(newSpecs) externalCatalog.renamePartitions(db, table, specs, newSpecs) } @@ -749,6 +754,7 @@ class SessionCatalog( requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) + requireNonEmptyValueInPartitionSpec(parts.map(_.spec)) externalCatalog.alterPartitions(db, table, parts) } @@ -762,6 +768,7 @@ class SessionCatalog( requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) + requireNonEmptyValueInPartitionSpec(Seq(spec)) externalCatalog.getPartition(db, table, spec) } @@ -781,6 +788,7 @@ class SessionCatalog( requireTableExists(TableIdentifier(table, Option(db))) partialSpec.foreach { spec => requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) + requireNonEmptyValueInPartitionSpec(Seq(spec)) } externalCatalog.listPartitionNames(db, table, partialSpec) } @@ -801,6 +809,7 @@ class SessionCatalog( requireTableExists(TableIdentifier(table, Option(db))) partialSpec.foreach { spec => requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) + requireNonEmptyValueInPartitionSpec(Seq(spec)) } externalCatalog.listPartitions(db, table, partialSpec) } @@ -819,6 +828,19 @@ class SessionCatalog( externalCatalog.listPartitionsByFilter(db, table, predicates) } + /** + * Verify if the input partition spec has any empty value. + */ + private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = { + specs.foreach { s => + if (s.values.exists(_.isEmpty)) { + val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") + throw new AnalysisException( + s"Partition spec is invalid. The spec ($spec) contains an empty partition column value") + } + } + } + /** * Verify if the input partition spec exactly matches the existing defined partition spec * The columns must be the same but the orders could be different. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 91f464b47aae2..acf3bcfdaa955 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -848,6 +848,8 @@ abstract class CatalogTestUtils { CatalogTablePartition(Map("a" -> "5", "b" -> "6", "c" -> "7"), storageFormat) lazy val partWithUnknownColumns = CatalogTablePartition(Map("a" -> "5", "unknown" -> "6"), storageFormat) + lazy val partWithEmptyValue = + CatalogTablePartition(Map("a" -> "3", "b" -> ""), storageFormat) lazy val funcClass = "org.apache.spark.myFunc" /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index ae93dffbab8d2..7a7de25acb070 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -625,6 +625,13 @@ class SessionCatalogSuite extends PlanTest { } assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.createPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(partWithEmptyValue, part1), ignoreIfExists = true) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + + "empty partition column value")) } test("drop partitions") { @@ -722,6 +729,16 @@ class SessionCatalogSuite extends PlanTest { assert(e.getMessage.contains( "Partition spec is invalid. The spec (a, unknown) must be contained within " + "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(partWithEmptyValue.spec, part1.spec), + ignoreIfNotExists = false, + purge = false, + retainData = false) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + + "empty partition column value")) } test("get partition") { @@ -767,6 +784,11 @@ class SessionCatalogSuite extends PlanTest { } assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithEmptyValue.spec) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + + "empty partition column value")) } test("rename partitions") { @@ -834,6 +856,13 @@ class SessionCatalogSuite extends PlanTest { } assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.renamePartitions( + TableIdentifier("tbl1", Some("db2")), + Seq(part1.spec), Seq(partWithEmptyValue.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + + "empty partition column value")) } test("alter partitions") { @@ -893,6 +922,11 @@ class SessionCatalogSuite extends PlanTest { } assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithEmptyValue)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + + "empty partition column value")) } test("list partition names") { @@ -914,10 +948,24 @@ class SessionCatalogSuite extends PlanTest { test("list partition names with invalid partial partition spec") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { + var e = intercept[AnalysisException] { + catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), + Some(partWithMoreColumns.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " + + "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), + Some(partWithUnknownColumns.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " + + "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), - Some(Map("unknown" -> "unknown"))) + Some(partWithEmptyValue.spec)) } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + + "empty partition column value")) } test("list partitions") { @@ -937,10 +985,22 @@ class SessionCatalogSuite extends PlanTest { test("list partitions with invalid partial partition spec") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { - catalog.listPartitions( - TableIdentifier("tbl2", Some("db2")), Some(Map("unknown" -> "unknown"))) + var e = intercept[AnalysisException] { + catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithMoreColumns.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " + + "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), + Some(partWithUnknownColumns.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " + + "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithEmptyValue.spec)) } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + + "empty partition column value")) } test("list partitions when database/table does not exist") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 5c0e2f6ec4941..9a6144c5e3cc8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -471,6 +471,7 @@ private[hive] class HiveClientImpl( // do the check at first and collect all the matching partitions val matchingParts = specs.flatMap { s => + assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") // The provided spec here can be a partial spec, i.e. it will match all partitions // whose specs are supersets of this partial spec. E.g. If a table has partitions // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both. @@ -545,6 +546,7 @@ private[hive] class HiveClientImpl( // -1 for result limit means "no limit/return all" client.getPartitionNames(table.database, table.identifier.table, -1) case Some(s) => + assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") client.getPartitionNames(table.database, table.identifier.table, s.asJava, -1) } hivePartitionNames.asScala.sorted @@ -568,7 +570,9 @@ private[hive] class HiveClientImpl( val hiveTable = toHiveTable(table) val parts = spec match { case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition) - case Some(s) => client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition) + case Some(s) => + assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") + client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition) } HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 0af331e67b448..040f071ce9a44 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -247,6 +247,16 @@ class HiveDDLSuite } } + test("SPARK-19129: drop partition with a empty string will drop the whole table") { + val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name") + df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable") + val e = intercept[AnalysisException] { + spark.sql("alter table partitionedTable drop partition(partCol1='')") + }.getMessage + assert(e.contains("Partition spec is invalid. The spec ([partCol1=]) contains an empty " + + "partition column value")) + } + test("add/drop partitions - external table") { val catalog = spark.sessionState.catalog withTempDir { tmpDir =>