Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34546][SQL] AlterViewAs.query should be analyzed during the analysis phase, and AlterViewAs should invalidate the cache #31652

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ class Analyzer(override val catalogManager: CatalogManager)

private def unwrapRelationPlan(plan: LogicalPlan): LogicalPlan = {
EliminateSubqueryAliases(plan) match {
case v: View if v.isDataFrameTempView => v.child
case v: View if v.isTempViewStoringAnalyzedPlan => v.child
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed because this type of view can also be created not from a dataframe - e.g., ALTER VIEW ... AS with spark.sql.legacy.storeAnalyzedPlanForView set to true.

case other => other
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ object UnsupportedOperationChecker extends Logging {
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
_: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias |
_: TypedFilter) =>
case v: View if v.isDataFrameTempView =>
case v: View if v.isTempViewStoringAnalyzedPlan =>
case node if node.nodeName == "StreamingRelationV2" =>
case node =>
throwError(s"Continuous processing does not support ${node.nodeName} operations.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
Expand Down Expand Up @@ -468,7 +468,7 @@ object CatalogTable {
val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames"
val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames"

val VIEW_CREATED_FROM_DATAFRAME = VIEW_PREFIX + "createdFromDataFrame"
val VIEW_STORING_ANALYZED_PLAN = VIEW_PREFIX + "storingAnalyzedPlan"

def splitLargeTableProp(
key: String,
Expand Down Expand Up @@ -782,14 +782,14 @@ case class UnresolvedCatalogRelation(

/**
* A wrapper to store the temporary view info, will be kept in `SessionCatalog`
* and will be transformed to `View` during analysis. If the temporary view was
* created from a dataframe, `plan` is set to the analyzed plan for the view.
* and will be transformed to `View` during analysis. If the temporary view is
* storing an analyzed plan, `plan` is set to the analyzed plan for the view.
*/
case class TemporaryViewRelation(
tableMeta: CatalogTable,
plan: Option[LogicalPlan] = None) extends LeafNode {
require(plan.isEmpty ||
(plan.get.resolved && tableMeta.properties.contains(VIEW_CREATED_FROM_DATAFRAME)))
(plan.get.resolved && tableMeta.properties.contains(VIEW_STORING_ANALYZED_PLAN)))

override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.AliasIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -462,7 +462,7 @@ case class View(
desc: CatalogTable,
isTempView: Boolean,
child: LogicalPlan) extends UnaryNode {
require(!isDataFrameTempView || child.resolved)
require(!isTempViewStoringAnalyzedPlan || child.resolved)

override def output: Seq[Attribute] = child.output

Expand All @@ -475,8 +475,8 @@ case class View(
case _ => child.canonicalized
}

def isDataFrameTempView: Boolean =
isTempView && desc.properties.contains(VIEW_CREATED_FROM_DATAFRAME)
def isTempViewStoringAnalyzedPlan: Boolean =
isTempView && desc.properties.contains(VIEW_STORING_ANALYZED_PLAN)

// When resolving a SQL view, we use an extra Project to add cast and alias to make sure the view
// output schema doesn't change even if the table referenced by the view is changed after view
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ case class AlterViewAs(
child: LogicalPlan,
originalText: String,
query: LogicalPlan) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
override def children: Seq[LogicalPlan] = child :: query :: Nil
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ trait AnalysisTest extends PlanTest {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
val actualPlan = getAnalyzer.executeAndCheck(inputPlan, new QueryPlanningTracker)
val transformed = actualPlan transformUp {
case v: View if v.isDataFrameTempView => v.child
case v: View if v.isTempViewStoringAnalyzedPlan => v.child
}
comparePlans(transformed, expectedPlan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
}

private def getTempViewRawPlan(plan: Option[LogicalPlan]): Option[LogicalPlan] = plan match {
case Some(v: View) if v.isDataFrameTempView => Some(v.child)
case Some(v: View) if v.isTempViewStoringAnalyzedPlan => Some(v.child)
case other => other
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) =>
AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, location)

case AlterViewAs(ResolvedView(ident, _), originalText, query) =>
case AlterViewAs(ResolvedView(ident, _), originalText, query) if query.resolved =>
AlterViewAsCommand(
ident.asTableIdentifier,
originalText,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ case class CreateViewCommand(
if (viewType == LocalTempView) {
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
if (replace && needsToUncache(catalog.getRawTempView(name.table), aliasedPlan)) {
logInfo(s"Try to uncache ${name.quotedString} before replacing.")
logDebug(s"Try to uncache ${name.quotedString} before replacing.")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to logDebug because the permanent view was using it:

logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.")

If logInfo is better, I can change these to logInfo instead.

checkCyclicViewReference(analyzedPlan, Seq(name), name)
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
}
Expand All @@ -131,7 +131,7 @@ case class CreateViewCommand(
originalText))
} else {
TemporaryViewRelation(
prepareTemporaryViewFromDataFrame(name, aliasedPlan),
prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan),
Some(aliasedPlan))
}
catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace)
Expand All @@ -140,7 +140,7 @@ case class CreateViewCommand(
val viewIdent = TableIdentifier(name.table, Option(db))
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
if (replace && needsToUncache(catalog.getRawGlobalTempView(name.table), aliasedPlan)) {
logInfo(s"Try to uncache ${viewIdent.quotedString} before replacing.")
logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.")
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString)
}
Expand All @@ -154,7 +154,7 @@ case class CreateViewCommand(
originalText))
} else {
TemporaryViewRelation(
prepareTemporaryViewFromDataFrame(name, aliasedPlan),
prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan),
Some(aliasedPlan))
}
catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace)
Expand Down Expand Up @@ -268,30 +268,32 @@ case class AlterViewAsCommand(
name: TableIdentifier,
originalText: String,
query: LogicalPlan) extends RunnableCommand {
require(query.resolved)

import ViewHelper._

override def innerChildren: Seq[QueryPlan[_]] = Seq(query)

override def run(session: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = session.sessionState.executePlan(query)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query will be an analyzed plan since AlterViewAs is converted to AlterViewAsCommand in Analyzer phase (ResolveSessionCatalog).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we remove the similar code in CreateViewCommand?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot yet because of the following:

override lazy val planToCache: LogicalPlan = {
Dataset.ofRows(sparkSession,
CreateViewCommand(
name = TableIdentifier(tempViewName),
userSpecifiedColumns = Nil,
comment = None,
properties = Map.empty,
originalText = Some(originalText),
child = query,
allowExisting = false,

Here, query is not resolved yet. I will get to this though.


if (session.sessionState.catalog.isTempView(name)) {
alterTemporaryView(session, analyzedPlan)
alterTemporaryView(session, query)
} else {
alterPermanentView(session, analyzedPlan)
alterPermanentView(session, query)
}
Seq.empty[Row]
}

private def alterTemporaryView(session: SparkSession, analyzedPlan: LogicalPlan): Unit = {
checkCyclicViewReference(analyzedPlan, Seq(name), name)

logDebug(s"Try to uncache ${name.quotedString} before altering.")
CommandUtils.uncacheTableOrView(session, name.quotedString)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Uncaching seems like a right thing to do when you alter a view?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I think so, since the cached data is stale. It's similar to CREATE OR REPLACE TEMP VIEW, seems we have an extra check needsToUncache. Shall we do it here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the logic is very similar, I refactored this part to share code with CreateViewCommand.


val tableDefinition = if (conf.storeAnalyzedPlanForView) {
analyzedPlan
TemporaryViewRelation(
prepareTemporaryViewStoringAnalyzedPlan(name, analyzedPlan),
Some(analyzedPlan))
} else {
checkCyclicViewReference(analyzedPlan, Seq(name), name)
TemporaryViewRelation(
prepareTemporaryView(
name, session, analyzedPlan, analyzedPlan.schema, Some(originalText)))
Expand All @@ -306,6 +308,9 @@ case class AlterViewAsCommand(
val viewIdent = viewMeta.identifier
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)

logDebug(s"Try to uncache ${viewIdent.quotedString} before altering.")
CommandUtils.uncacheTableOrView(session, viewIdent.quotedString)

val newProperties = generateViewProperties(
viewMeta.properties, session, analyzedPlan, analyzedPlan.schema.fieldNames)

Expand Down Expand Up @@ -627,17 +632,17 @@ object ViewHelper {
}

/**
* Returns a [[CatalogTable]] that contains information for the temporary view created
* from a dataframe.
* Returns a [[CatalogTable]] that contains information for the temporary view storing
* an analyzed plan.
*/
def prepareTemporaryViewFromDataFrame(
def prepareTemporaryViewStoringAnalyzedPlan(
viewName: TableIdentifier,
analyzedPlan: LogicalPlan): CatalogTable = {
CatalogTable(
identifier = viewName,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = analyzedPlan.schema,
properties = Map((VIEW_CREATED_FROM_DATAFRAME, "true")))
properties = Map((VIEW_STORING_ANALYZED_PLAN, "true")))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1445,4 +1445,54 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
}
}
}

test("SPARK-34546: ALTER VIEW AS should uncache if a temp view is cached") {
Seq(true, false).foreach { storeAnalyzed =>
withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> storeAnalyzed.toString) {
withTempView("tv") {
sql("CREATE TEMPORARY VIEW tv AS SELECT 1")
sql("CACHE TABLE tv")
assert(spark.catalog.isCached("tv"))
assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).nonEmpty)

sql("ALTER VIEW tv as SELECT 2")
assert(!spark.catalog.isCached("tv"))
assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).isEmpty)
}
}
}
}

test("SPARK-34546: ALTER VIEW AS should uncache if a global temp view is cached") {
Seq(true, false).foreach { storeAnalyzed =>
withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> storeAnalyzed.toString) {
withGlobalTempView("global_tv") {
sql("CREATE GLOBAL TEMPORARY VIEW global_tv AS SELECT 1")

val db = spark.sharedState.globalTempViewManager.database
val gv = s"$db.global_tv"
sql(s"CACHE TABLE $gv")
assert(spark.catalog.isCached(gv))
assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).nonEmpty)

sql(s"ALTER VIEW $gv as SELECT 2")
assert(!spark.catalog.isCached(gv))
assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).isEmpty)
}
}
}
}

test("SPARK-34546: ALTER VIEW AS should uncache if a permanent temp view is cached") {
withView("view") {
sql("CREATE VIEW view AS SELECT 1")
sql("CACHE TABLE view")
assert(spark.catalog.isCached("view"))
assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).nonEmpty)

sql("ALTER VIEW view as SELECT 2")
assert(!spark.catalog.isCached("view"))
assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).isEmpty)
}
}
}