Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34700][SQL] SessionCatalog's temporary view related APIs should take/return more concrete types #31906

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable

import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.errors.QueryCompilationErrors

Expand All @@ -40,12 +39,12 @@ class GlobalTempViewManager(val database: String) {

/** List of view definitions, mapping from view name to logical plan. */
@GuardedBy("this")
private val viewDefinitions = new mutable.HashMap[String, LogicalPlan]
private val viewDefinitions = new mutable.HashMap[String, TemporaryViewRelation]

/**
* Returns the global view definition which matches the given name, or None if not found.
*/
def get(name: String): Option[LogicalPlan] = synchronized {
def get(name: String): Option[TemporaryViewRelation] = synchronized {
viewDefinitions.get(name)
}

Expand All @@ -55,7 +54,7 @@ class GlobalTempViewManager(val database: String) {
*/
def create(
name: String,
viewDefinition: LogicalPlan,
viewDefinition: TemporaryViewRelation,
overrideIfExists: Boolean): Unit = synchronized {
if (!overrideIfExists && viewDefinitions.contains(name)) {
throw new TempTableAlreadyExistsException(name)
Expand All @@ -68,7 +67,7 @@ class GlobalTempViewManager(val database: String) {
*/
def update(
name: String,
viewDefinition: LogicalPlan): Boolean = synchronized {
viewDefinition: TemporaryViewRelation): Boolean = synchronized {
if (viewDefinitions.contains(name)) {
viewDefinitions.put(name, viewDefinition)
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class SessionCatalog(

/** List of temporary views, mapping from table name to their logical plan. */
@GuardedBy("this")
protected val tempViews = new mutable.HashMap[String, LogicalPlan]
protected val tempViews = new mutable.HashMap[String, TemporaryViewRelation]

// Note: we track current database here because certain operations do not explicitly
// specify the database (e.g. DROP TABLE my_table). In these cases we must first
Expand Down Expand Up @@ -573,21 +573,21 @@ class SessionCatalog(
*/
def createTempView(
name: String,
tableDefinition: LogicalPlan,
viewDefinition: TemporaryViewRelation,
overrideIfExists: Boolean): Unit = synchronized {
val table = formatTableName(name)
if (tempViews.contains(table) && !overrideIfExists) {
throw new TempTableAlreadyExistsException(name)
}
tempViews.put(table, tableDefinition)
tempViews.put(table, viewDefinition)
}

/**
* Create a global temporary view.
*/
def createGlobalTempView(
name: String,
viewDefinition: LogicalPlan,
viewDefinition: TemporaryViewRelation,
overrideIfExists: Boolean): Unit = {
globalTempViewManager.create(formatTableName(name), viewDefinition, overrideIfExists)
}
Expand All @@ -598,7 +598,7 @@ class SessionCatalog(
*/
def alterTempViewDefinition(
name: TableIdentifier,
viewDefinition: LogicalPlan): Boolean = synchronized {
viewDefinition: TemporaryViewRelation): Boolean = synchronized {
val viewName = formatTableName(name.table)
if (name.database.isEmpty) {
if (tempViews.contains(viewName)) {
Expand All @@ -617,14 +617,14 @@ class SessionCatalog(
/**
* Return a local temporary view exactly as it was stored.
*/
def getRawTempView(name: String): Option[LogicalPlan] = synchronized {
def getRawTempView(name: String): Option[TemporaryViewRelation] = synchronized {
tempViews.get(formatTableName(name))
}

/**
* Generate a [[View]] operator from the temporary view stored.
*/
def getTempView(name: String): Option[LogicalPlan] = synchronized {
def getTempView(name: String): Option[View] = synchronized {
getRawTempView(name).map(getTempViewPlan)
}

Expand All @@ -635,14 +635,14 @@ class SessionCatalog(
/**
* Return a global temporary view exactly as it was stored.
*/
def getRawGlobalTempView(name: String): Option[LogicalPlan] = {
def getRawGlobalTempView(name: String): Option[TemporaryViewRelation] = {
globalTempViewManager.get(formatTableName(name))
}

/**
* Generate a [[View]] operator from the global temporary view stored.
*/
def getGlobalTempView(name: String): Option[LogicalPlan] = {
def getGlobalTempView(name: String): Option[View] = {
getRawGlobalTempView(name).map(getTempViewPlan)
}

Expand Down Expand Up @@ -680,25 +680,10 @@ class SessionCatalog(
def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
val table = formatTableName(name.table)
if (name.database.isEmpty) {
tempViews.get(table).map {
case TemporaryViewRelation(metadata, _) => metadata
case plan =>
CatalogTable(
identifier = TableIdentifier(table),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = plan.output.toStructType)
}.getOrElse(getTableMetadata(name))
tempViews.get(table).map(_.tableMeta).getOrElse(getTableMetadata(name))
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
globalTempViewManager.get(table).map {
case TemporaryViewRelation(metadata, _) => metadata
case plan =>
CatalogTable(
identifier = TableIdentifier(table, Some(globalTempViewManager.database)),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = plan.output.toStructType)
}.getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table))
globalTempViewManager.get(table).map(_.tableMeta)
.getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table))
} else {
getTableMetadata(name)
}
Expand Down Expand Up @@ -834,21 +819,9 @@ class SessionCatalog(
}
}

private def getTempViewPlan(plan: LogicalPlan): LogicalPlan = {
plan match {
case TemporaryViewRelation(tableMeta, None) =>
fromCatalogTable(tableMeta, isTempView = true)
case TemporaryViewRelation(tableMeta, Some(plan)) =>
View(desc = tableMeta, isTempView = true, child = plan)
case other => other
}
}

def getTempViewSchema(plan: LogicalPlan): StructType = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only called from SparkGetColumnsOperation.scala and now that a raw temp plan always has CatalogTable, the caller can just directly get the schema from the raw plan, and I don't see a reason to expose this as a def. Check SparkGetColumnsOperation.scala changes below.

plan match {
case viewInfo: TemporaryViewRelation => viewInfo.tableMeta.schema
case v => v.schema
}
private def getTempViewPlan(viewInfo: TemporaryViewRelation): View = viewInfo.plan match {
case Some(p) => View(desc = viewInfo.tableMeta, isTempView = true, child = p)
case None => fromCatalogTable(viewInfo.tableMeta, isTempView = true)
}

private def fromCatalogTable(metadata: CatalogTable, isTempView: Boolean): View = {
Expand Down Expand Up @@ -909,7 +882,7 @@ class SessionCatalog(
isTempView(nameParts.asTableIdentifier)
}

def lookupTempView(name: TableIdentifier): Option[LogicalPlan] = {
def lookupTempView(name: TableIdentifier): Option[View] = {
val tableName = formatTableName(name.table)
if (name.database.isEmpty) {
tempViews.get(tableName).map(getTempViewPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,64 @@ import java.net.URI
import java.util.Locale

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.{QueryPlanningTracker, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.StructType

trait AnalysisTest extends PlanTest {

protected def extendedAnalysisRules: Seq[Rule[LogicalPlan]] = Nil

protected def createTempView(
catalog: SessionCatalog,
name: String,
plan: LogicalPlan,
overrideIfExists: Boolean): Unit = {
val identifier = TableIdentifier(name)
val metadata = createTempViewMetadata(identifier, plan.schema)
val viewDefinition = TemporaryViewRelation(metadata, Some(plan))
catalog.createTempView(name, viewDefinition, overrideIfExists)
}

protected def createGlobalTempView(
catalog: SessionCatalog,
name: String,
plan: LogicalPlan,
overrideIfExists: Boolean): Unit = {
val globalDb = Some(SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE))
val identifier = TableIdentifier(name, globalDb)
val metadata = createTempViewMetadata(identifier, plan.schema)
val viewDefinition = TemporaryViewRelation(metadata, Some(plan))
catalog.createGlobalTempView(name, viewDefinition, overrideIfExists)
}

private def createTempViewMetadata(
identifier: TableIdentifier,
schema: StructType): CatalogTable = {
CatalogTable(
identifier = identifier,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = schema,
properties = Map((VIEW_STORING_ANALYZED_PLAN, "true")))
}

protected def getAnalyzer: Analyzer = {
val catalog = new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin)
catalog.createDatabase(
CatalogDatabase("default", "", new URI("loc"), Map.empty),
ignoreIfExists = false)
catalog.createTempView("TaBlE", TestRelations.testRelation, overrideIfExists = true)
catalog.createTempView("TaBlE2", TestRelations.testRelation2, overrideIfExists = true)
catalog.createTempView("TaBlE3", TestRelations.testRelation3, overrideIfExists = true)
catalog.createGlobalTempView("TaBlE4", TestRelations.testRelation4, overrideIfExists = true)
catalog.createGlobalTempView("TaBlE5", TestRelations.testRelation5, overrideIfExists = true)
createTempView(catalog, "TaBlE", TestRelations.testRelation, overrideIfExists = true)
createTempView(catalog, "TaBlE2", TestRelations.testRelation2, overrideIfExists = true)
createTempView(catalog, "TaBlE3", TestRelations.testRelation3, overrideIfExists = true)
createGlobalTempView(catalog, "TaBlE4", TestRelations.testRelation4, overrideIfExists = true)
createGlobalTempView(catalog, "TaBlE5", TestRelations.testRelation5, overrideIfExists = true)
new Analyzer(catalog) {
override val extendedResolutionRules = EliminateSubqueryAliases +: extendedAnalysisRules
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ class DecimalPrecisionSuite extends AnalysisTest with BeforeAndAfter {
private val f: Expression = UnresolvedAttribute("f")
private val b: Expression = UnresolvedAttribute("b")

before {
catalog.createTempView("table", relation, overrideIfExists = true)
}

private def checkType(expression: Expression, expectedType: DataType): Unit = {
val plan = Project(Seq(Alias(expression, "c")()), relation)
assert(analyzer.execute(plan).schema.fields(0).dataType === expectedType)
Expand Down
Loading