Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14014] [SQL] Replace existing catalog with SessionCatalog #11918

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9130563
Squashed commit of the following:
Mar 16, 2016
aa80f9c
Refactor SQLContext etc. to take in ExternalCatalog
Mar 17, 2016
1f1dd00
Attempt to remove old catalog from SessionState
Mar 17, 2016
5daa696
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 17, 2016
71a01e0
Fix style
Mar 17, 2016
9f5154f
Replace all usages of analysis.Catalog
Mar 17, 2016
78cbcbd
Fix tests
Mar 18, 2016
5e16480
Fix tests round 2
Mar 18, 2016
57c8c29
Fix MiMa
Mar 18, 2016
c439280
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 18, 2016
a3c6bf7
Minor fixes
Mar 18, 2016
193d93c
sessionState.sessionCatalog -> sessionState.catalog
Mar 18, 2016
f089e2b
Fix tests round 3 (small round)
Mar 18, 2016
9cd89f8
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 19, 2016
f41346b
Don't bother sessionizing HiveCatalog
Mar 19, 2016
4b37d7a
Fix tests (round 4) - ignored test in CliSuite
Mar 19, 2016
1e72b0a
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 21, 2016
52e0273
Clear temp tables after each suite
Mar 21, 2016
19750d7
Require DB exists before showing tables on them
Mar 21, 2016
561ca3c
Fix tests
Mar 21, 2016
b9de78c
Fix MultiDatabaseSuite
Mar 21, 2016
536cea2
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 22, 2016
4133d3f
Fix HiveUDFSuite + add tests
Mar 22, 2016
159e51c
Fix HiveCompatibilitySuite?
Mar 22, 2016
542283c
Fix CliSuite
Mar 22, 2016
98751cc
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 22, 2016
16a54ba
Fix HiveQuerySuite?
Mar 22, 2016
3439dc2
Ignore new test for now...
Mar 22, 2016
e552558
Fix HiveContextSuite?
Mar 23, 2016
5ea8469
Revert "Fix HiveContextSuite?"
Mar 23, 2016
9519cd8
Merge branch 'master' of github.com:apache/spark into use-session-cat…
Mar 23, 2016
d3c40f7
Use default as the db.
yhuai Mar 23, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,9 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.Logging.initializeLogIfNecessary"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerEvent.logEvent"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.OutputWriterFactory.newInstance")
) ++ Seq(
// [SPARK-14014] Replace existing analysis.Catalog with SessionCatalog
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.this")
) ++ Seq(
// [SPARK-13928] Move org.apache.spark.Logging into org.apache.spark.internal.Logging
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Logging"),
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ def tableNames(self, dbName=None):
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlContext.tableNames()
True
>>> "table1" in sqlContext.tableNames("db")
>>> "table1" in sqlContext.tableNames("default")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the follow-up PR, let's have a test for a non-existent db (we should expect an error).

True
"""
if dbName is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
Expand All @@ -36,23 +37,22 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.types._

/**
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
* when all relations are already filled in and the analyzer needs only to resolve attribute
* references.
* A trivial [[Analyzer]] with an dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
* Used for testing when all relations are already filled in and the analyzer needs only
* to resolve attribute references.
*/
object SimpleAnalyzer
extends Analyzer(
EmptyCatalog,
EmptyFunctionRegistry,
new SimpleCatalystConf(caseSensitiveAnalysis = true))
extends SimpleAnalyzer(new SimpleCatalystConf(caseSensitiveAnalysis = true))
class SimpleAnalyzer(conf: CatalystConf)
extends Analyzer(new SessionCatalog(new InMemoryCatalog, conf), EmptyFunctionRegistry, conf)

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
* a [[FunctionRegistry]].
* [[UnresolvedRelation]]s into fully typed objects using information in a
* [[SessionCatalog]] and a [[FunctionRegistry]].
*/
class Analyzer(
catalog: Catalog,
catalog: SessionCatalog,
registry: FunctionRegistry,
conf: CatalystConf,
maxIterations: Int = 100)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null)

/**
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
* Holds the name of a relation that has yet to be looked up in a catalog.
*/
case class UnresolvedRelation(
tableIdentifier: TableIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,37 +52,34 @@ class InMemoryCatalog extends ExternalCatalog {
names.filter { funcName => regex.pattern.matcher(funcName).matches() }
}

private def existsFunction(db: String, funcName: String): Boolean = {
private def functionExists(db: String, funcName: String): Boolean = {
requireDbExists(db)
catalog(db).functions.contains(funcName)
}

private def existsTable(db: String, table: String): Boolean = {
requireDbExists(db)
catalog(db).tables.contains(table)
}

private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = {
private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = {
requireTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
}

private def requireFunctionExists(db: String, funcName: String): Unit = {
if (!existsFunction(db, funcName)) {
throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'")
if (!functionExists(db, funcName)) {
throw new AnalysisException(
s"Function not found: '$funcName' does not exist in database '$db'")
}
}

private def requireTableExists(db: String, table: String): Unit = {
if (!existsTable(db, table)) {
throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
if (!tableExists(db, table)) {
throw new AnalysisException(
s"Table not found: '$table' does not exist in database '$db'")
}
}

private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
if (!existsPartition(db, table, spec)) {
if (!partitionExists(db, table, spec)) {
throw new AnalysisException(
s"Partition does not exist in database '$db' table '$table': '$spec'")
s"Partition not found: database '$db' table '$table' does not contain: '$spec'")
}
}

Expand Down Expand Up @@ -159,7 +156,7 @@ class InMemoryCatalog extends ExternalCatalog {
ignoreIfExists: Boolean): Unit = synchronized {
requireDbExists(db)
val table = tableDefinition.name.table
if (existsTable(db, table)) {
if (tableExists(db, table)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table '$table' already exists in database '$db'")
}
Expand All @@ -173,7 +170,7 @@ class InMemoryCatalog extends ExternalCatalog {
table: String,
ignoreIfNotExists: Boolean): Unit = synchronized {
requireDbExists(db)
if (existsTable(db, table)) {
if (tableExists(db, table)) {
catalog(db).tables.remove(table)
} else {
if (!ignoreIfNotExists) {
Expand All @@ -200,13 +197,17 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).tables(table).table
}

override def tableExists(db: String, table: String): Boolean = synchronized {
requireDbExists(db)
catalog(db).tables.contains(table)
}

override def listTables(db: String): Seq[String] = synchronized {
requireDbExists(db)
catalog(db).tables.keySet.toSeq
}

override def listTables(db: String, pattern: String): Seq[String] = synchronized {
requireDbExists(db)
filterPattern(listTables(db), pattern)
}

Expand Down Expand Up @@ -295,7 +296,7 @@ class InMemoryCatalog extends ExternalCatalog {

override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
if (existsFunction(db, func.name.funcName)) {
if (functionExists(db, func.name.funcName)) {
throw new AnalysisException(s"Function '$func' already exists in '$db' database")
} else {
catalog(db).functions.put(func.name.funcName, func)
Expand Down
Loading