Skip to content

Commit

Permalink
[SPARK-18173][SQL] data source tables should support truncating parti…
Browse files Browse the repository at this point in the history
…tion

## What changes were proposed in this pull request?

Previously `TRUNCATE TABLE ... PARTITION` will always truncate the whole table for data source tables, this PR fixes it and improve `InMemoryCatalog` to make this command work with it.
## How was this patch tested?

existing tests

Author: Wenchen Fan <[email protected]>

Closes apache#15688 from cloud-fan/truncate.
  • Loading branch information
cloud-fan authored and uzadude committed Jan 27, 2017
1 parent b917944 commit d2d4296
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -487,11 +487,26 @@ class InMemoryCatalog(
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = synchronized {
requireTableExists(db, table)
if (partialSpec.nonEmpty) {
throw new UnsupportedOperationException(
"listPartition with partial partition spec is not implemented")

partialSpec match {
case None => catalog(db).tables(table).partitions.values.toSeq
case Some(partial) =>
catalog(db).tables(table).partitions.toSeq.collect {
case (spec, partition) if isPartialPartitionSpec(partial, spec) => partition
}
}
}

/**
* Returns true if `spec1` is a partial partition spec w.r.t. `spec2`, e.g. PARTITION (a=1) is a
* partial partition spec w.r.t. PARTITION (a=1,b=2).
*/
private def isPartialPartitionSpec(
spec1: TablePartitionSpec,
spec2: TablePartitionSpec): Boolean = {
spec1.forall {
case (partitionColumn, value) => spec2(partitionColumn) == value
}
catalog(db).tables(table).partitions.values.toSeq
}

override def listPartitionsByFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,17 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true)
}

test("list partitions with partial partition spec") {
val catalog = newBasicCatalog()
val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1")))
assert(parts.length == 1)
assert(parts.head.spec == part1.spec)

// if no partition is matched for the given partition spec, an empty list should be returned.
assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown", "b" -> "1"))).isEmpty)
assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown"))).isEmpty)
}

test("drop partitions") {
val catalog = newBasicCatalog()
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,19 @@ case class TruncateTableCommand(
DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION")
}
val locations =
// TODO: The `InMemoryCatalog` doesn't support listPartition with partial partition spec.
if (spark.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") {
Seq(table.storage.locationUri)
} else if (table.partitionColumnNames.isEmpty) {
if (table.partitionColumnNames.isEmpty) {
Seq(table.storage.locationUri)
} else {
catalog.listPartitions(table.identifier, partitionSpec).map(_.storage.locationUri)
// Here we diverge from Hive when the given partition spec contains all partition columns
// but no partition is matched: Hive will throw an exception and we just do nothing.
val normalizedSpec = partitionSpec.map { spec =>
PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionColumnNames,
table.identifier.quotedString,
spark.sessionState.conf.resolver)
}
catalog.listPartitions(table.identifier, normalizedSpec).map(_.storage.locationUri)
}
val hadoopConf = spark.sessionState.newHadoopConf()
locations.foreach { location =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1628,29 +1628,62 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {

test("truncate table - datasource table") {
import testImplicits._
val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")

val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
// Test both a Hive compatible and incompatible code path.
Seq("json", "parquet").foreach { format =>
withTable("rectangles") {
data.write.format(format).saveAsTable("rectangles")
assume(spark.table("rectangles").collect().nonEmpty,
"bad test; table was empty to begin with")

sql("TRUNCATE TABLE rectangles")
assert(spark.table("rectangles").collect().isEmpty)

// not supported since the table is not partitioned
assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
}
}
}

withTable("rectangles", "rectangles2") {
data.write.saveAsTable("rectangles")
data.write.partitionBy("length").saveAsTable("rectangles2")
test("truncate partitioned table - datasource table") {
import testImplicits._

// not supported since the table is not partitioned
assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", "length", "height")

withTable("partTable") {
data.write.partitionBy("width", "length").saveAsTable("partTable")
// supported since partitions are stored in the metastore
sql("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
assert(spark.table("rectangles2").collect().isEmpty)
sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)")
assert(spark.table("partTable").filter($"width" === 1).collect().nonEmpty)
assert(spark.table("partTable").filter($"width" === 1 && $"length" === 1).collect().isEmpty)
}

withTable("partTable") {
data.write.partitionBy("width", "length").saveAsTable("partTable")
// support partial partition spec
sql("TRUNCATE TABLE partTable PARTITION (width=1)")
assert(spark.table("partTable").collect().nonEmpty)
assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty)
}

withTable("partTable") {
data.write.partitionBy("width", "length").saveAsTable("partTable")
// do nothing if no partition is matched for the given partial partition spec
sql("TRUNCATE TABLE partTable PARTITION (width=100)")
assert(spark.table("partTable").count() == data.count())

// do nothing if no partition is matched for the given non-partial partition spec
// TODO: This behaviour is different from Hive, we should decide whether we need to follow
// Hive's behaviour or stick with our existing behaviour later.
sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
assert(spark.table("partTable").count() == data.count())

// throw exception if the column in partition spec is not a partition column.
val e = intercept[AnalysisException] {
sql("TRUNCATE TABLE partTable PARTITION (unknown=1)")
}
assert(e.message.contains("unknown is not a valid partition column"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1098,4 +1098,68 @@ class HiveDDLSuite
}
}
}

test("truncate table - datasource table") {
import testImplicits._

val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
// Test both a Hive compatible and incompatible code path.
Seq("json", "parquet").foreach { format =>
withTable("rectangles") {
data.write.format(format).saveAsTable("rectangles")
assume(spark.table("rectangles").collect().nonEmpty,
"bad test; table was empty to begin with")

sql("TRUNCATE TABLE rectangles")
assert(spark.table("rectangles").collect().isEmpty)

// not supported since the table is not partitioned
val e = intercept[AnalysisException] {
sql("TRUNCATE TABLE rectangles PARTITION (width=1)")
}
assert(e.message.contains("Operation not allowed"))
}
}
}

test("truncate partitioned table - datasource table") {
import testImplicits._

val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", "length", "height")

withTable("partTable") {
data.write.partitionBy("width", "length").saveAsTable("partTable")
// supported since partitions are stored in the metastore
sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)")
assert(spark.table("partTable").filter($"width" === 1).collect().nonEmpty)
assert(spark.table("partTable").filter($"width" === 1 && $"length" === 1).collect().isEmpty)
}

withTable("partTable") {
data.write.partitionBy("width", "length").saveAsTable("partTable")
// support partial partition spec
sql("TRUNCATE TABLE partTable PARTITION (width=1)")
assert(spark.table("partTable").collect().nonEmpty)
assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty)
}

withTable("partTable") {
data.write.partitionBy("width", "length").saveAsTable("partTable")
// do nothing if no partition is matched for the given partial partition spec
sql("TRUNCATE TABLE partTable PARTITION (width=100)")
assert(spark.table("partTable").count() == data.count())

// do nothing if no partition is matched for the given non-partial partition spec
// TODO: This behaviour is different from Hive, we should decide whether we need to follow
// Hive's behaviour or stick with our existing behaviour later.
sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
assert(spark.table("partTable").count() == data.count())

// throw exception if the column in partition spec is not a partition column.
val e = intercept[AnalysisException] {
sql("TRUNCATE TABLE partTable PARTITION (unknown=1)")
}
assert(e.message.contains("unknown is not a valid partition column"))
}
}
}

0 comments on commit d2d4296

Please sign in to comment.