Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34546][SQL] AlterViewAs.query should be analyzed during the analysis phase, and AlterViewAs should invalidate the cache #31652

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ class Analyzer(override val catalogManager: CatalogManager)

private def unwrapRelationPlan(plan: LogicalPlan): LogicalPlan = {
EliminateSubqueryAliases(plan) match {
case v: View if v.isDataFrameTempView => v.child
case v: View if v.isTempViewStoringAnalyzedPlan => v.child
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed because this type of view can also be created not from a dataframe - e.g., ALTER VIEW ... AS with spark.sql.legacy.storeAnalyzedPlanForView set to true.

case other => other
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ object UnsupportedOperationChecker extends Logging {
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
_: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias |
_: TypedFilter) =>
case v: View if v.isDataFrameTempView =>
case v: View if v.isTempViewStoringAnalyzedPlan =>
case node if node.nodeName == "StreamingRelationV2" =>
case node =>
throwError(s"Continuous processing does not support ${node.nodeName} operations.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
Expand Down Expand Up @@ -468,7 +468,7 @@ object CatalogTable {
val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames"
val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames"

val VIEW_CREATED_FROM_DATAFRAME = VIEW_PREFIX + "createdFromDataFrame"
val VIEW_STORING_ANALYZED_PLAN = VIEW_PREFIX + "storingAnalyzedPlan"

def splitLargeTableProp(
key: String,
Expand Down Expand Up @@ -782,14 +782,14 @@ case class UnresolvedCatalogRelation(

/**
* A wrapper to store the temporary view info, will be kept in `SessionCatalog`
* and will be transformed to `View` during analysis. If the temporary view was
* created from a dataframe, `plan` is set to the analyzed plan for the view.
* and will be transformed to `View` during analysis. If the temporary view is
* storing an analyzed plan, `plan` is set to the analyzed plan for the view.
*/
case class TemporaryViewRelation(
tableMeta: CatalogTable,
plan: Option[LogicalPlan] = None) extends LeafNode {
require(plan.isEmpty ||
(plan.get.resolved && tableMeta.properties.contains(VIEW_CREATED_FROM_DATAFRAME)))
(plan.get.resolved && tableMeta.properties.contains(VIEW_STORING_ANALYZED_PLAN)))

override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.AliasIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -462,7 +462,7 @@ case class View(
desc: CatalogTable,
isTempView: Boolean,
child: LogicalPlan) extends UnaryNode {
require(!isDataFrameTempView || child.resolved)
require(!isTempViewStoringAnalyzedPlan || child.resolved)

override def output: Seq[Attribute] = child.output

Expand All @@ -475,8 +475,8 @@ case class View(
case _ => child.canonicalized
}

def isDataFrameTempView: Boolean =
isTempView && desc.properties.contains(VIEW_CREATED_FROM_DATAFRAME)
def isTempViewStoringAnalyzedPlan: Boolean =
isTempView && desc.properties.contains(VIEW_STORING_ANALYZED_PLAN)

// When resolving a SQL view, we use an extra Project to add cast and alias to make sure the view
// output schema doesn't change even if the table referenced by the view is changed after view
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ case class AlterViewAs(
child: LogicalPlan,
originalText: String,
query: LogicalPlan) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
override def children: Seq[LogicalPlan] = child :: query :: Nil
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ trait AnalysisTest extends PlanTest {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
val actualPlan = getAnalyzer.executeAndCheck(inputPlan, new QueryPlanningTracker)
val transformed = actualPlan transformUp {
case v: View if v.isDataFrameTempView => v.child
case v: View if v.isTempViewStoringAnalyzedPlan => v.child
}
comparePlans(transformed, expectedPlan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
}

private def getTempViewRawPlan(plan: Option[LogicalPlan]): Option[LogicalPlan] = plan match {
case Some(v: View) if v.isDataFrameTempView => Some(v.child)
case Some(v: View) if v.isTempViewStoringAnalyzedPlan => Some(v.child)
case other => other
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) =>
AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, location)

case AlterViewAs(ResolvedView(ident, _), originalText, query) =>
case AlterViewAs(ResolvedView(ident, _), originalText, query) if query.resolved =>
AlterViewAsCommand(
ident.asTableIdentifier,
originalText,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import scala.collection.mutable
import org.json4s.JsonAST.{JArray, JString}
import org.json4s.jackson.JsonMethods._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, ViewType}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression}
Expand Down Expand Up @@ -115,48 +116,27 @@ case class CreateViewCommand(

if (viewType == LocalTempView) {
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
if (replace && needsToUncache(catalog.getRawTempView(name.table), aliasedPlan)) {
logInfo(s"Try to uncache ${name.quotedString} before replacing.")
checkCyclicViewReference(analyzedPlan, Seq(name), name)
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
}
// If there is no sql text (e.g. from Dataset API), we will always store the analyzed plan
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
TemporaryViewRelation(
prepareTemporaryView(
name,
sparkSession,
analyzedPlan,
aliasedPlan.schema,
originalText))
} else {
TemporaryViewRelation(
prepareTemporaryViewFromDataFrame(name, aliasedPlan),
Some(aliasedPlan))
}
val tableDefinition = createTemporaryViewRelation(
name,
sparkSession,
replace,
catalog.getRawTempView,
originalText,
analyzedPlan,
aliasedPlan)
catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace)
} else if (viewType == GlobalTempView) {
val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val viewIdent = TableIdentifier(name.table, Option(db))
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
if (replace && needsToUncache(catalog.getRawGlobalTempView(name.table), aliasedPlan)) {
logInfo(s"Try to uncache ${viewIdent.quotedString} before replacing.")
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString)
}
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
TemporaryViewRelation(
prepareTemporaryView(
viewIdent,
sparkSession,
analyzedPlan,
aliasedPlan.schema,
originalText))
} else {
TemporaryViewRelation(
prepareTemporaryViewFromDataFrame(viewIdent, aliasedPlan),
Some(aliasedPlan))
}
val tableDefinition = createTemporaryViewRelation(
viewIdent,
sparkSession,
replace,
catalog.getRawGlobalTempView,
originalText,
analyzedPlan,
aliasedPlan)
catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace)
} else if (catalog.tableExists(name)) {
val tableMetadata = catalog.getTableMetadata(name)
Expand Down Expand Up @@ -192,20 +172,6 @@ case class CreateViewCommand(
Seq.empty[Row]
}

/**
* Checks if need to uncache the temp view being replaced.
*/
private def needsToUncache(
rawTempView: Option[LogicalPlan],
aliasedPlan: LogicalPlan): Boolean = rawTempView match {
// The temp view doesn't exist, no need to uncache.
case None => false
// Do not need to uncache if the to-be-replaced temp view plan and the new plan are the
// same-result plans.
case Some(TemporaryViewRelation(_, Some(p))) => !p.sameResult(aliasedPlan)
case Some(p) => !p.sameResult(aliasedPlan)
}

/**
* If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
* else return the analyzed plan directly.
Expand Down Expand Up @@ -274,28 +240,29 @@ case class AlterViewAsCommand(
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)

override def run(session: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = session.sessionState.executePlan(query)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query will be an analyzed plan since AlterViewAs is converted to AlterViewAsCommand in Analyzer phase (ResolveSessionCatalog).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we remove the similar code in CreateViewCommand?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot yet because of the following:

override lazy val planToCache: LogicalPlan = {
Dataset.ofRows(sparkSession,
CreateViewCommand(
name = TableIdentifier(tempViewName),
userSpecifiedColumns = Nil,
comment = None,
properties = Map.empty,
originalText = Some(originalText),
child = query,
allowExisting = false,

Here, query is not resolved yet. I will get to this though.


if (session.sessionState.catalog.isTempView(name)) {
alterTemporaryView(session, analyzedPlan)
alterTemporaryView(session, query)
} else {
alterPermanentView(session, analyzedPlan)
alterPermanentView(session, query)
}
Seq.empty[Row]
}

private def alterTemporaryView(session: SparkSession, analyzedPlan: LogicalPlan): Unit = {
val tableDefinition = if (conf.storeAnalyzedPlanForView) {
analyzedPlan
val catalog = session.sessionState.catalog
val getRawTempView: String => Option[LogicalPlan] = if (name.database.isEmpty) {
catalog.getRawTempView
} else {
checkCyclicViewReference(analyzedPlan, Seq(name), name)
TemporaryViewRelation(
prepareTemporaryView(
name, session, analyzedPlan, analyzedPlan.schema, Some(originalText)))
catalog.getRawGlobalTempView
}
val tableDefinition = createTemporaryViewRelation(
name,
session,
replace = true,
getRawTempView,
Some(originalText),
analyzedPlan,
aliasedPlan = analyzedPlan)
session.sessionState.catalog.alterTempViewDefinition(name, tableDefinition)
}

Expand All @@ -306,6 +273,9 @@ case class AlterViewAsCommand(
val viewIdent = viewMeta.identifier
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)

logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.")
CommandUtils.uncacheTableOrView(session, viewIdent.quotedString)

val newProperties = generateViewProperties(
viewMeta.properties, session, analyzedPlan, analyzedPlan.schema.fieldNames)

Expand Down Expand Up @@ -349,7 +319,7 @@ case class ShowViewsCommand(
}
}

object ViewHelper {
object ViewHelper extends SQLConfHelper with Logging {

private val configPrefixDenyList = Seq(
SQLConf.MAX_NESTED_VIEW_DEPTH.key,
Expand Down Expand Up @@ -596,19 +566,80 @@ object ViewHelper {
(collectTempViews(child), collectTempFunctions(child))
}

/**
* Returns a [[TemporaryViewRelation]] that contains information about a temporary view
* to create, given an analyzed plan of the view. If a temp view is to be replaced and it is
* cached, it will be uncached before being replaced.
*
* @param name the name of the temporary view to create/replace.
* @param session the spark session.
* @param replace if true and the existing view is cached, it will be uncached.
* @param getRawTempView the function that returns an optional raw plan of the local or
* global temporary view.
* @param originalText the original SQL text of this view, can be None if this view is created via
* Dataset API or spark.sql.legacy.storeAnalyzedPlanForView is set to true.
* @param analyzedPlan the logical plan that represents the view; this is used to generate the
* logical plan for temporary view and the view schema.
* @param aliasedPlan the aliased logical plan based on the user specified columns. If there are
* no user specified plans, this should be same as `analyzedPlan`.
*/
def createTemporaryViewRelation(
name: TableIdentifier,
session: SparkSession,
replace: Boolean,
getRawTempView: String => Option[LogicalPlan],
originalText: Option[String],
analyzedPlan: LogicalPlan,
aliasedPlan: LogicalPlan): TemporaryViewRelation = {
val uncache = getRawTempView(name.table).map { r =>
needsToUncache(r, aliasedPlan)
}.getOrElse(false)
if (replace && uncache) {
logDebug(s"Try to uncache ${name.quotedString} before replacing.")
checkCyclicViewReference(analyzedPlan, Seq(name), name)
CommandUtils.uncacheTableOrView(session, name.quotedString)
}
if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
TemporaryViewRelation(
prepareTemporaryView(
name,
session,
analyzedPlan,
aliasedPlan.schema,
originalText.get))
} else {
TemporaryViewRelation(
prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan),
Some(aliasedPlan))
}
}

/**
* Checks if need to uncache the temp view being replaced.
*/
private def needsToUncache(
rawTempView: LogicalPlan,
aliasedPlan: LogicalPlan): Boolean = rawTempView match {
// If TemporaryViewRelation doesn't store the analyzed view, always uncache.
case TemporaryViewRelation(_, None) => true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary because case p => !p.sameResult(aliasedPlan) should cover, but I added this case explicitly to make the intention clear.

// Do not need to uncache if the to-be-replaced temp view plan and the new plan are the
// same-result plans.
case TemporaryViewRelation(_, Some(p)) => !p.sameResult(aliasedPlan)
case p => !p.sameResult(aliasedPlan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need it? rawTempView should always be TemporaryViewRelation right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to first update CREATE TEMP VIEW USING:

val viewDefinition = Dataset.ofRows(
sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan
if (global) {
catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace)
} else {
catalog.createTempView(tableIdent.table, viewDefinition, replace)
}

, which I will get to right after this PR.

Once the PR is done, we can update createTempView to take in TemporaryViewRelation (https://github.com/apache/spark/pull/31273/files#r580757641), then I can safely remove this line.

Let me actually create a JIRA to capture these as subtasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

/**
* Returns a [[CatalogTable]] that contains information for temporary view.
* Generate the view-specific properties(e.g. view default database, view query output
* column names) and store them as properties in the CatalogTable, and also creates
* the proper schema for the view.
*/
def prepareTemporaryView(
private def prepareTemporaryView(
viewName: TableIdentifier,
session: SparkSession,
analyzedPlan: LogicalPlan,
viewSchema: StructType,
originalText: Option[String]): CatalogTable = {
originalText: String): CatalogTable = {

val catalog = session.sessionState.catalog
val (tempViews, tempFunctions) = collectTemporaryObjects(catalog, analyzedPlan)
Expand All @@ -622,22 +653,22 @@ object ViewHelper {
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = viewSchema,
viewText = originalText,
viewText = Some(originalText),
properties = newProperties)
}

/**
* Returns a [[CatalogTable]] that contains information for the temporary view created
* from a dataframe.
* Returns a [[CatalogTable]] that contains information for the temporary view storing
* an analyzed plan.
*/
def prepareTemporaryViewFromDataFrame(
private def prepareTemporaryViewStoringAnalyzedPlan(
viewName: TableIdentifier,
analyzedPlan: LogicalPlan): CatalogTable = {
CatalogTable(
identifier = viewName,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = analyzedPlan.schema,
properties = Map((VIEW_CREATED_FROM_DATAFRAME, "true")))
properties = Map((VIEW_STORING_ANALYZED_PLAN, "true")))
}
}
Loading