Skip to content

Commit

Permalink
Implement tests for table partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Mar 16, 2016
1 parent 7947445 commit 3da16fb
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {

test("alter partitions") {
val catalog = newBasicCatalog()
try{
try {
// Note: Before altering table partitions in Hive, you *must* set the current database
// to the one that contains the table of interest. Otherwise you will end up with the
// most helpful error message ever: "Unable to alter partition. alter is not possible."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,4 +370,162 @@ class SessionCatalogSuite extends SparkFunSuite {
intercept[AnalysisException] { catalog.listTables("unknown_db") }
}

// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------

test("basic create and list partitions") {
val externalCatalog = newEmptyCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
sessionCatalog.createTable("mydb", newTable("tbl", "mydb"), ignoreIfExists = false)
sessionCatalog.createPartitions(
"mydb", TableIdentifier("tbl"), Seq(part1, part2), ignoreIfExists = false)
assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2)))
}

test("create partitions when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
catalog.createPartitions(
"does_not_exist", TableIdentifier("tbl1"), Seq(), ignoreIfExists = false)
}
intercept[AnalysisException] {
catalog.createPartitions(
"db2", TableIdentifier("does_not_exist"), Seq(), ignoreIfExists = false)
}
}

test("create partitions that already exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
catalog.createPartitions(
"db2", TableIdentifier("tbl2"), Seq(part1), ignoreIfExists = false)
}
catalog.createPartitions("db2", TableIdentifier("tbl2"), Seq(part1), ignoreIfExists = true)
}

test("drop partitions") {
val externalCatalog1 = newBasicCatalog()
val sessionCatalog1 = new SessionCatalog(externalCatalog1)
assert(catalogPartitionsEqual(externalCatalog1, "db2", "tbl2", Seq(part1, part2)))
sessionCatalog1.dropPartitions(
"db2", TableIdentifier("tbl2"), Seq(part1.spec), ignoreIfNotExists = false)
assert(catalogPartitionsEqual(externalCatalog1, "db2", "tbl2", Seq(part2)))
val externalCatalog2 = newBasicCatalog()
val sessionCatalog2 = new SessionCatalog(externalCatalog2)
assert(catalogPartitionsEqual(externalCatalog2, "db2", "tbl2", Seq(part1, part2)))
sessionCatalog2.dropPartitions(
"db2", TableIdentifier("tbl2"), Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
assert(externalCatalog2.listPartitions("db2", "tbl2").isEmpty)
}

test("drop partitions when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
catalog.dropPartitions(
"does_not_exist", TableIdentifier("tbl1"), Seq(), ignoreIfNotExists = false)
}
intercept[AnalysisException] {
catalog.dropPartitions(
"db2", TableIdentifier("does_not_exist"), Seq(), ignoreIfNotExists = false)
}
}

test("drop partitions that do not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
catalog.dropPartitions(
"db2", TableIdentifier("tbl2"), Seq(part3.spec), ignoreIfNotExists = false)
}
catalog.dropPartitions(
"db2", TableIdentifier("tbl2"), Seq(part3.spec), ignoreIfNotExists = true)
}

test("get partition") {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
assert(sessionCatalog.getPartition(
"db2", TableIdentifier("tbl2"), part1.spec).spec == part1.spec)
assert(sessionCatalog.getPartition(
"db2", TableIdentifier("tbl2"), part2.spec).spec == part2.spec)
intercept[AnalysisException] {
sessionCatalog.getPartition("db2", TableIdentifier("tbl1"), part3.spec)
}
}

test("get partition when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
catalog.getPartition("does_not_exist", TableIdentifier("tbl1"), part1.spec)
}
intercept[AnalysisException] {
catalog.getPartition("db2", TableIdentifier("does_not_exist"), part1.spec)
}
}

test("rename partitions") {
val catalog = new SessionCatalog(newBasicCatalog())
val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201"))
val newSpecs = Seq(newPart1.spec, newPart2.spec)
catalog.renamePartitions("db2", TableIdentifier("tbl2"), Seq(part1.spec, part2.spec), newSpecs)
assert(catalog.getPartition(
"db2", TableIdentifier("tbl2"), newPart1.spec).spec === newPart1.spec)
assert(catalog.getPartition(
"db2", TableIdentifier("tbl2"), newPart2.spec).spec === newPart2.spec)
// The old partitions should no longer exist
intercept[AnalysisException] {
catalog.getPartition("db2", TableIdentifier("tbl2"), part1.spec)
}
intercept[AnalysisException] {
catalog.getPartition("db2", TableIdentifier("tbl2"), part2.spec)
}
}

test("rename partitions when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
catalog.renamePartitions(
"does_not_exist", TableIdentifier("tbl1"), Seq(part1.spec), Seq(part2.spec))
}
intercept[AnalysisException] {
catalog.renamePartitions(
"db2", TableIdentifier("does_not_exist"), Seq(part1.spec), Seq(part2.spec))
}
}

test("alter partitions") {
val catalog = new SessionCatalog(newBasicCatalog())
val newLocation = newUriForDatabase()
// alter but keep spec the same
val oldPart1 = catalog.getPartition("db2", TableIdentifier("tbl2"), part1.spec)
val oldPart2 = catalog.getPartition("db2", TableIdentifier("tbl2"), part2.spec)
catalog.alterPartitions("db2", TableIdentifier("tbl2"), Seq(
oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
val newPart1 = catalog.getPartition("db2", TableIdentifier("tbl2"), part1.spec)
val newPart2 = catalog.getPartition("db2", TableIdentifier("tbl2"), part2.spec)
assert(newPart1.storage.locationUri == Some(newLocation))
assert(newPart2.storage.locationUri == Some(newLocation))
assert(oldPart1.storage.locationUri != Some(newLocation))
assert(oldPart2.storage.locationUri != Some(newLocation))
// alter but change spec, should fail because new partition specs do not exist yet
val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
intercept[AnalysisException] {
catalog.alterPartitions("db2", TableIdentifier("tbl2"), Seq(badPart1, badPart2))
}
}

test("alter partitions when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
catalog.alterPartitions("does_not_exist", TableIdentifier("tbl1"), Seq(part1))
}
intercept[AnalysisException] {
catalog.alterPartitions("db2", TableIdentifier("does_not_exist"), Seq(part1))
}
}

}

0 comments on commit 3da16fb

Please sign in to comment.