Skip to content

Commit

Permalink
[SPARK-13923][SPARK-14014][SQL] Session catalog follow-ups
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This patch addresses the remaining comments left in apache#11750 and apache#11918 after they are merged. For a full list of changes in this patch, just trace the commits.

## How was this patch tested?

`SessionCatalogSuite` and `CatalogTestCases`

Author: Andrew Or <[email protected]>

Closes apache#12006 from andrewor14/session-catalog-followup.
  • Loading branch information
Andrew Or committed Mar 28, 2016
1 parent 34c0638 commit eebc8c1
Show file tree
Hide file tree
Showing 24 changed files with 131 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class InMemoryCatalog extends ExternalCatalog {
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
requireDbExists(db)
val table = tableDefinition.name.table
val table = tableDefinition.identifier.table
if (tableExists(db, table)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table '$table' already exists in database '$db'")
Expand All @@ -182,14 +182,14 @@ class InMemoryCatalog extends ExternalCatalog {
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
requireTableExists(db, oldName)
val oldDesc = catalog(db).tables(oldName)
oldDesc.table = oldDesc.table.copy(name = TableIdentifier(newName, Some(db)))
oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db)))
catalog(db).tables.put(newName, oldDesc)
catalog(db).tables.remove(oldName)
}

override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
requireTableExists(db, tableDefinition.name.table)
catalog(db).tables(tableDefinition.name.table).table = tableDefinition
requireTableExists(db, tableDefinition.identifier.table)
catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
}

override def getTable(db: String, table: String): CatalogTable = synchronized {
Expand Down Expand Up @@ -296,10 +296,10 @@ class InMemoryCatalog extends ExternalCatalog {

override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
if (functionExists(db, func.name.funcName)) {
if (functionExists(db, func.identifier.funcName)) {
throw new AnalysisException(s"Function '$func' already exists in '$db' database")
} else {
catalog(db).functions.put(func.name.funcName, func)
catalog(db).functions.put(func.identifier.funcName, func)
}
}

Expand All @@ -310,14 +310,14 @@ class InMemoryCatalog extends ExternalCatalog {

override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
val newFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db)))
val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))
catalog(db).functions.remove(oldName)
catalog(db).functions.put(newName, newFunc)
}

override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
requireFunctionExists(db, funcDefinition.name.funcName)
catalog(db).functions.put(funcDefinition.name.funcName, funcDefinition)
requireFunctionExists(db, funcDefinition.identifier.funcName)
catalog(db).functions.put(funcDefinition.identifier.funcName, funcDefinition)
}

override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.spark.sql.catalyst.catalog

import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
Expand All @@ -31,6 +29,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
* An internal catalog that is used by a Spark Session. This internal catalog serves as a
* proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
* tables and functions of the Spark Session that it belongs to.
*
* This class is not thread-safe.
*/
class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
import ExternalCatalog._
Expand All @@ -39,8 +39,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
this(externalCatalog, new SimpleCatalystConf(true))
}

protected[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
protected[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
protected[this] val tempTables = new mutable.HashMap[String, LogicalPlan]
protected[this] val tempFunctions = new mutable.HashMap[String, CatalogFunction]

// Note: we track current database here because certain operations do not explicitly
// specify the database (e.g. DROP TABLE my_table). In these cases we must first
Expand Down Expand Up @@ -122,9 +122,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
* If no such database is specified, create it in the current database.
*/
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
val db = tableDefinition.name.database.getOrElse(currentDb)
val table = formatTableName(tableDefinition.name.table)
val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db)))
val db = tableDefinition.identifier.database.getOrElse(currentDb)
val table = formatTableName(tableDefinition.identifier.table)
val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
}

Expand All @@ -138,9 +138,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
* this becomes a no-op.
*/
def alterTable(tableDefinition: CatalogTable): Unit = {
val db = tableDefinition.name.database.getOrElse(currentDb)
val table = formatTableName(tableDefinition.name.table)
val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db)))
val db = tableDefinition.identifier.database.getOrElse(currentDb)
val table = formatTableName(tableDefinition.identifier.table)
val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
externalCatalog.alterTable(db, newTableDefinition)
}

Expand All @@ -164,9 +164,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
def createTempTable(
name: String,
tableDefinition: LogicalPlan,
ignoreIfExists: Boolean): Unit = {
overrideIfExists: Boolean): Unit = {
val table = formatTableName(name)
if (tempTables.containsKey(table) && !ignoreIfExists) {
if (tempTables.contains(table) && !overrideIfExists) {
throw new AnalysisException(s"Temporary table '$name' already exists.")
}
tempTables.put(table, tableDefinition)
Expand All @@ -188,10 +188,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
val db = oldName.database.getOrElse(currentDb)
val oldTableName = formatTableName(oldName.table)
val newTableName = formatTableName(newName.table)
if (oldName.database.isDefined || !tempTables.containsKey(oldTableName)) {
if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
externalCatalog.renameTable(db, oldTableName, newTableName)
} else {
val table = tempTables.remove(oldTableName)
val table = tempTables(oldTableName)
tempTables.remove(oldTableName)
tempTables.put(newTableName, table)
}
}
Expand All @@ -206,7 +207,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = {
val db = name.database.getOrElse(currentDb)
val table = formatTableName(name.table)
if (name.database.isDefined || !tempTables.containsKey(table)) {
if (name.database.isDefined || !tempTables.contains(table)) {
externalCatalog.dropTable(db, table, ignoreIfNotExists)
} else {
tempTables.remove(table)
Expand All @@ -224,11 +225,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
val db = name.database.getOrElse(currentDb)
val table = formatTableName(name.table)
val relation =
if (name.database.isDefined || !tempTables.containsKey(table)) {
if (name.database.isDefined || !tempTables.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
CatalogRelation(db, metadata, alias)
} else {
tempTables.get(table)
tempTables(table)
}
val qualifiedTable = SubqueryAlias(table, relation)
// If an alias was specified by the lookup, wrap the plan in a subquery so that
Expand All @@ -247,7 +248,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
def tableExists(name: TableIdentifier): Boolean = {
val db = name.database.getOrElse(currentDb)
val table = formatTableName(name.table)
if (name.database.isDefined || !tempTables.containsKey(table)) {
if (name.database.isDefined || !tempTables.contains(table)) {
externalCatalog.tableExists(db, table)
} else {
true // it's a temporary table
Expand All @@ -266,7 +267,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
val dbTables =
externalCatalog.listTables(db, pattern).map { t => TableIdentifier(t, Some(db)) }
val regex = pattern.replaceAll("\\*", ".*").r
val _tempTables = tempTables.keys().asScala
val _tempTables = tempTables.keys.toSeq
.filter { t => regex.pattern.matcher(t).matches() }
.map { t => TableIdentifier(t) }
dbTables ++ _tempTables
Expand All @@ -290,7 +291,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
* For testing only.
*/
private[catalog] def getTempTable(name: String): Option[LogicalPlan] = {
Option(tempTables.get(name))
tempTables.get(name)
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -399,9 +400,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
* If no such database is specified, create it in the current database.
*/
def createFunction(funcDefinition: CatalogFunction): Unit = {
val db = funcDefinition.name.database.getOrElse(currentDb)
val db = funcDefinition.identifier.database.getOrElse(currentDb)
val newFuncDefinition = funcDefinition.copy(
name = FunctionIdentifier(funcDefinition.name.funcName, Some(db)))
identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
externalCatalog.createFunction(db, newFuncDefinition)
}

Expand All @@ -424,9 +425,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
* this becomes a no-op.
*/
def alterFunction(funcDefinition: CatalogFunction): Unit = {
val db = funcDefinition.name.database.getOrElse(currentDb)
val db = funcDefinition.identifier.database.getOrElse(currentDb)
val newFuncDefinition = funcDefinition.copy(
name = FunctionIdentifier(funcDefinition.name.funcName, Some(db)))
identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
externalCatalog.alterFunction(db, newFuncDefinition)
}

Expand All @@ -439,10 +440,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
* This assumes no database is specified in `funcDefinition`.
*/
def createTempFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
require(funcDefinition.name.database.isEmpty,
require(funcDefinition.identifier.database.isEmpty,
"attempted to create a temporary function while specifying a database")
val name = funcDefinition.name.funcName
if (tempFunctions.containsKey(name) && !ignoreIfExists) {
val name = funcDefinition.identifier.funcName
if (tempFunctions.contains(name) && !ignoreIfExists) {
throw new AnalysisException(s"Temporary function '$name' already exists.")
}
tempFunctions.put(name, funcDefinition)
Expand All @@ -455,7 +456,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
// Hive has DROP FUNCTION and DROP TEMPORARY FUNCTION. We may want to consolidate
// dropFunction and dropTempFunction.
def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = {
if (!tempFunctions.containsKey(name) && !ignoreIfNotExists) {
if (!tempFunctions.contains(name) && !ignoreIfNotExists) {
throw new AnalysisException(
s"Temporary function '$name' cannot be dropped because it does not exist!")
}
Expand All @@ -476,11 +477,12 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
throw new AnalysisException("rename does not support moving functions across databases")
}
val db = oldName.database.getOrElse(currentDb)
if (oldName.database.isDefined || !tempFunctions.containsKey(oldName.funcName)) {
if (oldName.database.isDefined || !tempFunctions.contains(oldName.funcName)) {
externalCatalog.renameFunction(db, oldName.funcName, newName.funcName)
} else {
val func = tempFunctions.remove(oldName.funcName)
val newFunc = func.copy(name = func.name.copy(funcName = newName.funcName))
val func = tempFunctions(oldName.funcName)
val newFunc = func.copy(identifier = func.identifier.copy(funcName = newName.funcName))
tempFunctions.remove(oldName.funcName)
tempFunctions.put(newName.funcName, newFunc)
}
}
Expand All @@ -494,10 +496,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
*/
def getFunction(name: FunctionIdentifier): CatalogFunction = {
val db = name.database.getOrElse(currentDb)
if (name.database.isDefined || !tempFunctions.containsKey(name.funcName)) {
if (name.database.isDefined || !tempFunctions.contains(name.funcName)) {
externalCatalog.getFunction(db, name.funcName)
} else {
tempFunctions.get(name.funcName)
tempFunctions(name.funcName)
}
}

Expand All @@ -510,7 +512,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
val dbFunctions =
externalCatalog.listFunctions(db, pattern).map { f => FunctionIdentifier(f, Some(db)) }
val regex = pattern.replaceAll("\\*", ".*").r
val _tempFunctions = tempFunctions.keys().asScala
val _tempFunctions = tempFunctions.keys.toSeq
.filter { f => regex.pattern.matcher(f).matches() }
.map { f => FunctionIdentifier(f) }
dbFunctions ++ _tempFunctions
Expand All @@ -520,7 +522,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
* Return a temporary function. For testing only.
*/
private[catalog] def getTempFunction(name: String): Option[CatalogFunction] = {
Option(tempFunctions.get(name))
tempFunctions.get(name)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ abstract class ExternalCatalog {
/**
* A function defined in the catalog.
*
* @param name name of the function
* @param identifier name of the function
* @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
*/
case class CatalogFunction(name: FunctionIdentifier, className: String)
case class CatalogFunction(identifier: FunctionIdentifier, className: String)


/**
Expand Down Expand Up @@ -216,7 +216,7 @@ case class CatalogTablePartition(
* future once we have a better understanding of how we want to handle skewed columns.
*/
case class CatalogTable(
name: TableIdentifier,
identifier: TableIdentifier,
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: Seq[CatalogColumn],
Expand All @@ -230,12 +230,12 @@ case class CatalogTable(
viewText: Option[String] = None) {

/** Return the database this table was specified to belong to, assuming it exists. */
def database: String = name.database.getOrElse {
throw new AnalysisException(s"table $name did not specify database")
def database: String = identifier.database.getOrElse {
throw new AnalysisException(s"table $identifier did not specify database")
}

/** Return the fully qualified name of this table, assuming the database was specified. */
def qualifiedName: String = name.unquotedString
def qualifiedName: String = identifier.unquotedString

/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
Expand Down Expand Up @@ -290,6 +290,6 @@ case class CatalogRelation(
// TODO: implement this
override def output: Seq[Attribute] = Seq.empty

require(metadata.name.database == Some(db),
require(metadata.identifier.database == Some(db),
"provided database does not much the one specified in the table definition")
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait AnalysisTest extends PlanTest {
private def makeAnalyzer(caseSensitive: Boolean): Analyzer = {
val conf = new SimpleCatalystConf(caseSensitive)
val catalog = new SessionCatalog(new InMemoryCatalog, conf)
catalog.createTempTable("TaBlE", TestRelations.testRelation, ignoreIfExists = true)
catalog.createTempTable("TaBlE", TestRelations.testRelation, overrideIfExists = true)
new Analyzer(catalog, EmptyFunctionRegistry, conf) {
override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
private val b: Expression = UnresolvedAttribute("b")

before {
catalog.createTempTable("table", relation, ignoreIfExists = true)
catalog.createTempTable("table", relation, overrideIfExists = true)
}

private def checkType(expression: Expression, expectedType: DataType): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
}

test("get table") {
assert(newBasicCatalog().getTable("db2", "tbl1").name.table == "tbl1")
assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1")
}

test("get table when database/table does not exist") {
Expand Down Expand Up @@ -452,7 +452,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
assert(catalog.getFunction("db2", "func1").className == funcClass)
catalog.renameFunction("db2", "func1", newName)
intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
assert(catalog.getFunction("db2", newName).name.funcName == newName)
assert(catalog.getFunction("db2", newName).identifier.funcName == newName)
assert(catalog.getFunction("db2", newName).className == funcClass)
intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
}
Expand Down Expand Up @@ -549,7 +549,7 @@ abstract class CatalogTestUtils {

def newTable(name: String, database: Option[String] = None): CatalogTable = {
CatalogTable(
name = TableIdentifier(name, database),
identifier = TableIdentifier(name, database),
tableType = CatalogTableType.EXTERNAL_TABLE,
storage = storageFormat,
schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
Expand Down
Loading

0 comments on commit eebc8c1

Please sign in to comment.