Skip to content

Commit

Permalink
[SPARK-13923][SQL] Implement SessionCatalog
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

As part of the effort to merge `SQLContext` and `HiveContext`, this patch implements an internal catalog called `SessionCatalog` that handles temporary functions and tables and delegates metastore operations to `ExternalCatalog`. Currently, this is still dead code, but in the future it will be part of `SessionState` and will replace `o.a.s.sql.catalyst.analysis.Catalog`.

A recent patch #11573 parses Hive commands ourselves in Spark, but still passes the entire query text to Hive. In a future patch, we will use `SessionCatalog` to implement the parsed commands.

## How was this patch tested?

800+ lines of tests in `SessionCatalogSuite`.

Author: Andrew Or <[email protected]>

Closes #11750 from andrewor14/temp-catalog.
  • Loading branch information
Andrew Or authored and yhuai committed Mar 17, 2016
1 parent 92b7057 commit ca9ef86
Show file tree
Hide file tree
Showing 19 changed files with 1,604 additions and 202 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}


/**
Expand Down Expand Up @@ -68,19 +69,20 @@ class InMemoryCatalog extends ExternalCatalog {

private def requireFunctionExists(db: String, funcName: String): Unit = {
if (!existsFunction(db, funcName)) {
throw new AnalysisException(s"Function $funcName does not exist in $db database")
throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'")
}
}

private def requireTableExists(db: String, table: String): Unit = {
if (!existsTable(db, table)) {
throw new AnalysisException(s"Table $table does not exist in $db database")
throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
}
}

private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
if (!existsPartition(db, table, spec)) {
throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec")
throw new AnalysisException(
s"Partition does not exist in database '$db' table '$table': '$spec'")
}
}

Expand All @@ -93,7 +95,7 @@ class InMemoryCatalog extends ExternalCatalog {
ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Database ${dbDefinition.name} already exists.")
throw new AnalysisException(s"Database '${dbDefinition.name}' already exists.")
}
} else {
catalog.put(dbDefinition.name, new DatabaseDesc(dbDefinition))
Expand All @@ -108,17 +110,17 @@ class InMemoryCatalog extends ExternalCatalog {
if (!cascade) {
// If cascade is false, make sure the database is empty.
if (catalog(db).tables.nonEmpty) {
throw new AnalysisException(s"Database $db is not empty. One or more tables exist.")
throw new AnalysisException(s"Database '$db' is not empty. One or more tables exist.")
}
if (catalog(db).functions.nonEmpty) {
throw new AnalysisException(s"Database $db is not empty. One or more functions exist.")
throw new AnalysisException(s"Database '$db' is not empty. One or more functions exist.")
}
}
// Remove the database.
catalog.remove(db)
} else {
if (!ignoreIfNotExists) {
throw new AnalysisException(s"Database $db does not exist")
throw new AnalysisException(s"Database '$db' does not exist")
}
}
}
Expand Down Expand Up @@ -156,12 +158,13 @@ class InMemoryCatalog extends ExternalCatalog {
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
requireDbExists(db)
if (existsTable(db, tableDefinition.name)) {
val table = tableDefinition.name.table
if (existsTable(db, table)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
throw new AnalysisException(s"Table '$table' already exists in database '$db'")
}
} else {
catalog(db).tables.put(tableDefinition.name, new TableDesc(tableDefinition))
catalog(db).tables.put(table, new TableDesc(tableDefinition))
}
}

Expand All @@ -174,22 +177,22 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).tables.remove(table)
} else {
if (!ignoreIfNotExists) {
throw new AnalysisException(s"Table $table does not exist in $db database")
throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
}
}
}

override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
requireTableExists(db, oldName)
val oldDesc = catalog(db).tables(oldName)
oldDesc.table = oldDesc.table.copy(name = newName)
oldDesc.table = oldDesc.table.copy(name = TableIdentifier(newName, Some(db)))
catalog(db).tables.put(newName, oldDesc)
catalog(db).tables.remove(oldName)
}

override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
requireTableExists(db, tableDefinition.name)
catalog(db).tables(tableDefinition.name).table = tableDefinition
requireTableExists(db, tableDefinition.name.table)
catalog(db).tables(tableDefinition.name.table).table = tableDefinition
}

override def getTable(db: String, table: String): CatalogTable = synchronized {
Expand Down Expand Up @@ -222,8 +225,8 @@ class InMemoryCatalog extends ExternalCatalog {
val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec }
if (dupSpecs.nonEmpty) {
val dupSpecsStr = dupSpecs.mkString("\n===\n")
throw new AnalysisException(
s"The following partitions already exist in database $db table $table:\n$dupSpecsStr")
throw new AnalysisException("The following partitions already exist in database " +
s"'$db' table '$table':\n$dupSpecsStr")
}
}
parts.foreach { p => existingParts.put(p.spec, p) }
Expand All @@ -240,8 +243,8 @@ class InMemoryCatalog extends ExternalCatalog {
val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s }
if (missingSpecs.nonEmpty) {
val missingSpecsStr = missingSpecs.mkString("\n===\n")
throw new AnalysisException(
s"The following partitions do not exist in database $db table $table:\n$missingSpecsStr")
throw new AnalysisException("The following partitions do not exist in database " +
s"'$db' table '$table':\n$missingSpecsStr")
}
}
partSpecs.foreach(existingParts.remove)
Expand Down Expand Up @@ -292,10 +295,10 @@ class InMemoryCatalog extends ExternalCatalog {

override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
if (existsFunction(db, func.name)) {
throw new AnalysisException(s"Function $func already exists in $db database")
if (existsFunction(db, func.name.funcName)) {
throw new AnalysisException(s"Function '$func' already exists in '$db' database")
} else {
catalog(db).functions.put(func.name, func)
catalog(db).functions.put(func.name.funcName, func)
}
}

Expand All @@ -306,14 +309,14 @@ class InMemoryCatalog extends ExternalCatalog {

override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
val newFunc = getFunction(db, oldName).copy(name = newName)
val newFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db)))
catalog(db).functions.remove(oldName)
catalog(db).functions.put(newName, newFunc)
}

override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
requireFunctionExists(db, funcDefinition.name)
catalog(db).functions.put(funcDefinition.name, funcDefinition)
requireFunctionExists(db, funcDefinition.name.funcName)
catalog(db).functions.put(funcDefinition.name.funcName, funcDefinition)
}

override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
Expand Down
Loading

0 comments on commit ca9ef86

Please sign in to comment.