Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28265][SQL] Add renameTable to TableCatalog API #25206

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,24 @@ Table alterTable(
* @return true if a table was deleted, false if no table exists for the identifier
*/
boolean dropTable(Identifier ident);

/**
* Renames a table in the catalog.
* <p>
* If the catalog supports views and contains a view for the old identifier and not a table, this
* throws {@link NoSuchTableException}. Additionally, if the new identifier is a table or a view,
* this throws {@link TableAlreadyExistsException}.
* <p>
* If the catalog does not support table renames between namespaces, it throws
* {@link UnsupportedOperationException}.
*
* @param oldIdent the table identifier of the existing table to rename
* @param newIdent the new table identifier of the table
* @throws NoSuchTableException If the table to rename doesn't exist or is a view
* @throws TableAlreadyExistsException If the new table name already exists or is a view
* @throws UnsupportedOperationException If the namespaces of old and new identiers do not
* match (optional)
*/
void renameTable(Identifier oldIdent, Identifier newIdent)
throws NoSuchTableException, TableAlreadyExistsException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class TableCatalogSuite extends SparkFunSuite {

private val testNs = Array("`", ".")
private val testIdent = Identifier.of(testNs, "test_table")
private val testIdentNew = Identifier.of(testNs, "test_table_new")

test("Catalogs can load the catalog") {
val catalog = newCatalog()
Expand Down Expand Up @@ -656,6 +657,52 @@ class TableCatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(testIdent))
}

test("renameTable") {
val catalog = newCatalog()

assert(!catalog.tableExists(testIdent))
assert(!catalog.tableExists(testIdentNew))

catalog.createTable(testIdent, schema, Array.empty, emptyProps)

assert(catalog.tableExists(testIdent))
catalog.renameTable(testIdent, testIdentNew)

assert(!catalog.tableExists(testIdent))
assert(catalog.tableExists(testIdentNew))
}

test("renameTable: fail if table does not exist") {
val catalog = newCatalog()

val exc = intercept[NoSuchTableException] {
catalog.renameTable(testIdent, testIdentNew)
}

assert(exc.message.contains(testIdent.quoted))
assert(exc.message.contains("not found"))
}

test("renameTable: fail if new table name already exists") {
val catalog = newCatalog()

assert(!catalog.tableExists(testIdent))
assert(!catalog.tableExists(testIdentNew))

catalog.createTable(testIdent, schema, Array.empty, emptyProps)
catalog.createTable(testIdentNew, schema, Array.empty, emptyProps)

assert(catalog.tableExists(testIdent))
assert(catalog.tableExists(testIdentNew))

val exc = intercept[TableAlreadyExistsException] {
catalog.renameTable(testIdent, testIdentNew)
}

assert(exc.message.contains(testIdentNew.quoted))
assert(exc.message.contains("already exists"))
}

test("listNamespaces: list namespaces from metadata") {
val catalog = newCatalog()
catalog.createNamespace(Array("ns1"), Map("property" -> "value").asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ class TestTableCatalog extends TableCatalog with SupportsNamespaces {

override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined

override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
if (tables.containsKey(newIdent)) {
throw new TableAlreadyExistsException(newIdent)
}

Option(tables.remove(oldIdent)) match {
case Some(table) =>
tables.put(newIdent, InMemoryTable(table.name, table.schema, table.properties))
case _ =>
throw new NoSuchTableException(oldIdent)
}
}

private def allNamespaces: Seq[Seq[String]] = {
(tables.keySet.asScala.map(_.namespace.toSeq) ++ namespaces.keySet.asScala).toSeq.distinct
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
}
}

override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add test cases for v2 session catalog?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it'd be nice to see tests for renaming across namespaces (failing), etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added the tests.

if (Option(loadTable(newIdent)).isDefined) throw new TableAlreadyExistsException(newIdent)

// Load table to make sure the table exists
loadTable(oldIdent)
catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
}

implicit class TableIdentifierHelper(ident: Identifier) {
def asTableIdentifier: TableIdentifier = {
ident.namespace match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ class TestInMemoryTableCatalog extends TableCatalog {
Option(tables.remove(ident)).isDefined
}

override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
if (tables.containsKey(newIdent)) {
throw new TableAlreadyExistsException(newIdent)
}

Option(tables.remove(oldIdent)) match {
case Some(table) =>
tables.put(newIdent,
new InMemoryTable(table.name, table.schema, table.partitioning, table.properties))
case _ =>
throw new NoSuchTableException(oldIdent)
}
}

def clearTables(): Unit = {
tables.clear()
}
Expand Down