From 4c0d8348d0ae02274ed3d3cf0147b8f040391d70 Mon Sep 17 00:00:00 2001 From: Edgar Rodriguez Date: Wed, 17 Jul 2019 21:23:55 -0700 Subject: [PATCH 1/3] [SPARK-28265][SQL] Add renameTable in TableCatalog API --- .../spark/sql/catalog/v2/TableCatalog.java | 20 ++++++++ .../sql/catalog/v2/TableCatalogSuite.scala | 47 +++++++++++++++++++ .../sql/catalog/v2/TestTableCatalog.scala | 13 +++++ .../datasources/v2/V2SessionCatalog.scala | 8 ++++ .../sources/v2/TestInMemoryTableCatalog.scala | 14 ++++++ 5 files changed, 102 insertions(+) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java index 681629d2d5405..4775b58edf049 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java @@ -134,4 +134,24 @@ Table alterTable( * @return true if a table was deleted, false if no table exists for the identifier */ boolean dropTable(Identifier ident); + + /** + * Renames a table in the catalog. + *

+ * If the catalog supports views and contains a view for the old identifier and not a table, this + * throws {@link NoSuchTableException}. Additionally, if the new identifier is a table or a view, + * this throws {@link TableAlreadyExistsException}. + *

+ * If the catalog does not support table renames between namespaces, it throws + * {@link UnsupportedOperationException}. + * + * @param oldIdent the table identifier of the existing table to rename + * @param newIdent the new table identifier of the table + * @throws NoSuchTableException If the table to rename doesn't exist or is a view + * @throws TableAlreadyExistsException If the new table name already exists or is a view + * @throws UnsupportedOperationException If the namespaces of old and new identiers do not + * match (optional) + */ + void renameTable(Identifier oldIdent, Identifier newIdent) + throws NoSuchTableException, TableAlreadyExistsException; } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala index 089b4c5ed94f9..e4c1b3c297165 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala @@ -45,6 +45,7 @@ class TableCatalogSuite extends SparkFunSuite { private val testNs = Array("`", ".") private val testIdent = Identifier.of(testNs, "test_table") + private val testIdentNew = Identifier.of(testNs, "test_table_new") test("Catalogs can load the catalog") { val catalog = newCatalog() @@ -656,6 +657,52 @@ class TableCatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) } + test("renameTable") { + val catalog = newCatalog() + + assert(!catalog.tableExists(testIdent)) + assert(!catalog.tableExists(testIdentNew)) + + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(catalog.tableExists(testIdent)) + catalog.renameTable(testIdent, testIdentNew) + + assert(!catalog.tableExists(testIdent)) + assert(catalog.tableExists(testIdentNew)) + } + + test("renameTable: fail if table does not exist") { + val catalog = newCatalog() + + val exc = intercept[NoSuchTableException] { + catalog.renameTable(testIdent, testIdentNew) + } + + assert(exc.message.contains(testIdent.quoted)) + assert(exc.message.contains("not found")) + } + + test("renameTable: fail if new table name already exists") { + val catalog = newCatalog() + + assert(!catalog.tableExists(testIdent)) + assert(!catalog.tableExists(testIdentNew)) + + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdentNew, schema, Array.empty, emptyProps) + + assert(catalog.tableExists(testIdent)) + assert(catalog.tableExists(testIdentNew)) + + val exc = intercept[TableAlreadyExistsException] { + catalog.renameTable(testIdent, testIdentNew) + } + + assert(exc.message.contains(testIdentNew.quoted)) + assert(exc.message.contains("already exists")) + } + test("listNamespaces: list namespaces from metadata") { val catalog = newCatalog() catalog.createNamespace(Array("ns1"), Map("property" -> "value").asJava) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala index 6fdd6e30e1ee4..de7c5c9536fdf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala @@ -93,6 +93,19 @@ class TestTableCatalog extends TableCatalog with SupportsNamespaces { override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { + if (tables.containsKey(newIdent)) { + throw new TableAlreadyExistsException(newIdent) + } + + Option(tables.remove(oldIdent)) match { + case Some(table) => + tables.put(newIdent, InMemoryTable(table.name, table.schema, table.properties)) + case _ => + throw new NoSuchTableException(oldIdent) + } + } + private def allNamespaces: Seq[Seq[String]] = { (tables.keySet.asScala.map(_.namespace.toSeq) ++ namespaces.keySet.asScala).toSeq.distinct } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index a3b8f28fc5c39..f6bb0d719415b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -153,6 +153,14 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { } } + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { + if (Option(loadTable(newIdent)).isDefined) throw new TableAlreadyExistsException(newIdent) + + // Load table to make sure the table exists + loadTable(oldIdent) + catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier) + } + implicit class TableIdentifierHelper(ident: Identifier) { def asTableIdentifier: TableIdentifier = { ident.namespace match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 7c51a29bde905..b7151766cf987 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -104,6 +104,20 @@ class TestInMemoryTableCatalog extends TableCatalog { Option(tables.remove(ident)).isDefined } + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { + if (tables.containsKey(newIdent)) { + throw new TableAlreadyExistsException(newIdent) + } + + Option(tables.remove(oldIdent)) match { + case Some(table) => + tables.put(newIdent, + new InMemoryTable(table.name, table.schema, table.partitioning, table.properties)) + case _ => + throw new NoSuchTableException(oldIdent) + } + } + def clearTables(): Unit = { tables.clear() } From e95c603595d1b1bfca6b6a28ed5394417ab6c96b Mon Sep 17 00:00:00 2001 From: Edgar Rodriguez Date: Mon, 12 Aug 2019 14:24:50 -0700 Subject: [PATCH 2/3] Add V2SessionCatalog tests for renameTable --- .../datasources/v2/V2SessionCatalog.scala | 6 +- .../v2/V2SessionCatalogSuite.scala | 74 ++++++++++++++++++- 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index f6bb0d719415b..06aaab00c0977 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -154,7 +154,11 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { } override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { - if (Option(loadTable(newIdent)).isDefined) throw new TableAlreadyExistsException(newIdent) + try { + if (Option(loadTable(newIdent)).isDefined) throw new TableAlreadyExistsException(newIdent) + } catch { + case _: NoSuchTableException => + } // Load table to make sure the table exists loadTable(oldIdent) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 3822882cc91cb..4f14ecc28680d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._ import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalog.v2.{Catalogs, Identifier, TableCatalog, TableChange} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser @@ -45,6 +46,7 @@ class V2SessionCatalogSuite override protected def beforeAll(): Unit = { super.beforeAll() spark.sql("""CREATE DATABASE IF NOT EXISTS db""") + spark.sql("""CREATE DATABASE IF NOT EXISTS db2""") spark.sql("""CREATE DATABASE IF NOT EXISTS ns""") spark.sql("""CREATE DATABASE IF NOT EXISTS ns2""") } @@ -52,6 +54,7 @@ class V2SessionCatalogSuite override protected def afterAll(): Unit = { spark.sql("""DROP TABLE IF EXISTS db.test_table""") spark.sql("""DROP DATABASE IF EXISTS db""") + spark.sql("""DROP DATABASE IF EXISTS db2""") spark.sql("""DROP DATABASE IF EXISTS ns""") spark.sql("""DROP DATABASE IF EXISTS ns2""") super.afterAll() @@ -59,6 +62,7 @@ class V2SessionCatalogSuite after { newCatalog().dropTable(testIdent) + newCatalog().dropTable(testIdentNew) } private def newCatalog(): TableCatalog = { @@ -67,7 +71,9 @@ class V2SessionCatalogSuite newCatalog } - private val testIdent = Identifier.of(Array("db"), "test_table") + private val testNs = Array("db") + private val testIdent = Identifier.of(testNs, "test_table") + private val testIdentNew = Identifier.of(testNs, "test_table_new") test("Catalogs can load the catalog") { val catalog = newCatalog() @@ -680,4 +686,70 @@ class V2SessionCatalogSuite assert(!wasDropped) assert(!catalog.tableExists(testIdent)) } + + test("renameTable") { + val catalog = newCatalog() + + assert(!catalog.tableExists(testIdent)) + assert(!catalog.tableExists(testIdentNew)) + + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(catalog.tableExists(testIdent)) + catalog.renameTable(testIdent, testIdentNew) + + assert(!catalog.tableExists(testIdent)) + assert(catalog.tableExists(testIdentNew)) + } + + test("renameTable: fail if table does not exist") { + val catalog = newCatalog() + + val exc = intercept[NoSuchTableException] { + catalog.renameTable(testIdent, testIdentNew) + } + + assert(exc.message.contains(testIdent.quoted)) + assert(exc.message.contains("not found")) + } + + test("renameTable: fail if new table name already exists") { + val catalog = newCatalog() + + assert(!catalog.tableExists(testIdent)) + assert(!catalog.tableExists(testIdentNew)) + + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdentNew, schema, Array.empty, emptyProps) + + assert(catalog.tableExists(testIdent)) + assert(catalog.tableExists(testIdentNew)) + + val exc = intercept[TableAlreadyExistsException] { + catalog.renameTable(testIdent, testIdentNew) + } + + assert(exc.message.contains(testIdentNew.quoted)) + assert(exc.message.contains("already exists")) + } + + test("renameTable: fail if db does not match for old and new table names") { + val catalog = newCatalog() + val testIdentNewOtherDb = Identifier.of(Array("db2"), "test_table_new") + + assert(!catalog.tableExists(testIdent)) + assert(!catalog.tableExists(testIdentNewOtherDb)) + + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + assert(catalog.tableExists(testIdent)) + + val exc = intercept[AnalysisException] { + catalog.renameTable(testIdent, testIdentNewOtherDb) + } + + assert(exc.message.contains(testIdent.namespace.quoted)) + assert(exc.message.contains(testIdentNewOtherDb.namespace.quoted)) + assert(exc.message.contains("RENAME TABLE source and destination databases do not match")) + } } From 71d9ad1b497f79fefee9a83d8f40c565288c1c9a Mon Sep 17 00:00:00 2001 From: Edgar Rodriguez Date: Tue, 13 Aug 2019 11:07:17 -0700 Subject: [PATCH 3/3] Use tableExists in check for new table name --- .../sql/execution/datasources/v2/V2SessionCatalog.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 06aaab00c0977..79ea8756721ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -154,10 +154,8 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { } override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { - try { - if (Option(loadTable(newIdent)).isDefined) throw new TableAlreadyExistsException(newIdent) - } catch { - case _: NoSuchTableException => + if (tableExists(newIdent)) { + throw new TableAlreadyExistsException(newIdent) } // Load table to make sure the table exists