-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-34546][SQL] AlterViewAs.query should be analyzed during the analysis phase, and AlterViewAs should invalidate the cache #31652
Changes from 6 commits
ba766f7
12af3a7
eeb3712
2b2d35e
ba42775
51c66e8
8c8b0ef
31cb500
9302be9
e86d5e3
7e4815a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -116,7 +116,7 @@ case class CreateViewCommand( | |||||||||||||||||||||
if (viewType == LocalTempView) { | ||||||||||||||||||||||
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) | ||||||||||||||||||||||
if (replace && needsToUncache(catalog.getRawTempView(name.table), aliasedPlan)) { | ||||||||||||||||||||||
logInfo(s"Try to uncache ${name.quotedString} before replacing.") | ||||||||||||||||||||||
logDebug(s"Try to uncache ${name.quotedString} before replacing.") | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed this to spark/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala Line 174 in 9ec8696
If |
||||||||||||||||||||||
checkCyclicViewReference(analyzedPlan, Seq(name), name) | ||||||||||||||||||||||
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
@@ -131,7 +131,7 @@ case class CreateViewCommand( | |||||||||||||||||||||
originalText)) | ||||||||||||||||||||||
} else { | ||||||||||||||||||||||
TemporaryViewRelation( | ||||||||||||||||||||||
prepareTemporaryViewFromDataFrame(name, aliasedPlan), | ||||||||||||||||||||||
prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan), | ||||||||||||||||||||||
Some(aliasedPlan)) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace) | ||||||||||||||||||||||
|
@@ -140,7 +140,7 @@ case class CreateViewCommand( | |||||||||||||||||||||
val viewIdent = TableIdentifier(name.table, Option(db)) | ||||||||||||||||||||||
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) | ||||||||||||||||||||||
if (replace && needsToUncache(catalog.getRawGlobalTempView(name.table), aliasedPlan)) { | ||||||||||||||||||||||
logInfo(s"Try to uncache ${viewIdent.quotedString} before replacing.") | ||||||||||||||||||||||
logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.") | ||||||||||||||||||||||
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent) | ||||||||||||||||||||||
CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
@@ -154,7 +154,7 @@ case class CreateViewCommand( | |||||||||||||||||||||
originalText)) | ||||||||||||||||||||||
} else { | ||||||||||||||||||||||
TemporaryViewRelation( | ||||||||||||||||||||||
prepareTemporaryViewFromDataFrame(name, aliasedPlan), | ||||||||||||||||||||||
prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan), | ||||||||||||||||||||||
Some(aliasedPlan)) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace) | ||||||||||||||||||||||
|
@@ -268,30 +268,32 @@ case class AlterViewAsCommand( | |||||||||||||||||||||
name: TableIdentifier, | ||||||||||||||||||||||
originalText: String, | ||||||||||||||||||||||
query: LogicalPlan) extends RunnableCommand { | ||||||||||||||||||||||
require(query.resolved) | ||||||||||||||||||||||
imback82 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||
|
||||||||||||||||||||||
import ViewHelper._ | ||||||||||||||||||||||
|
||||||||||||||||||||||
override def innerChildren: Seq[QueryPlan[_]] = Seq(query) | ||||||||||||||||||||||
|
||||||||||||||||||||||
override def run(session: SparkSession): Seq[Row] = { | ||||||||||||||||||||||
// If the plan cannot be analyzed, throw an exception and don't proceed. | ||||||||||||||||||||||
val qe = session.sessionState.executePlan(query) | ||||||||||||||||||||||
qe.assertAnalyzed() | ||||||||||||||||||||||
val analyzedPlan = qe.analyzed | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we remove the similar code in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot yet because of the following: spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala Lines 96 to 105 in f340857
Here, |
||||||||||||||||||||||
|
||||||||||||||||||||||
if (session.sessionState.catalog.isTempView(name)) { | ||||||||||||||||||||||
alterTemporaryView(session, analyzedPlan) | ||||||||||||||||||||||
alterTemporaryView(session, query) | ||||||||||||||||||||||
} else { | ||||||||||||||||||||||
alterPermanentView(session, analyzedPlan) | ||||||||||||||||||||||
alterPermanentView(session, query) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
Seq.empty[Row] | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
private def alterTemporaryView(session: SparkSession, analyzedPlan: LogicalPlan): Unit = { | ||||||||||||||||||||||
checkCyclicViewReference(analyzedPlan, Seq(name), name) | ||||||||||||||||||||||
|
||||||||||||||||||||||
logDebug(s"Try to uncache ${name.quotedString} before altering.") | ||||||||||||||||||||||
CommandUtils.uncacheTableOrView(session, name.quotedString) | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan Uncaching seems like a right thing to do when you alter a view? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea I think so, since the cached data is stale. It's similar to CREATE OR REPLACE TEMP VIEW, seems we have an extra check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the logic is very similar, I refactored this part to share code with |
||||||||||||||||||||||
|
||||||||||||||||||||||
val tableDefinition = if (conf.storeAnalyzedPlanForView) { | ||||||||||||||||||||||
analyzedPlan | ||||||||||||||||||||||
TemporaryViewRelation( | ||||||||||||||||||||||
prepareTemporaryViewStoringAnalyzedPlan(name, analyzedPlan), | ||||||||||||||||||||||
Some(analyzedPlan)) | ||||||||||||||||||||||
} else { | ||||||||||||||||||||||
checkCyclicViewReference(analyzedPlan, Seq(name), name) | ||||||||||||||||||||||
TemporaryViewRelation( | ||||||||||||||||||||||
prepareTemporaryView( | ||||||||||||||||||||||
name, session, analyzedPlan, analyzedPlan.schema, Some(originalText))) | ||||||||||||||||||||||
|
@@ -306,6 +308,9 @@ case class AlterViewAsCommand( | |||||||||||||||||||||
val viewIdent = viewMeta.identifier | ||||||||||||||||||||||
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent) | ||||||||||||||||||||||
|
||||||||||||||||||||||
logDebug(s"Try to uncache ${viewIdent.quotedString} before altering.") | ||||||||||||||||||||||
CommandUtils.uncacheTableOrView(session, viewIdent.quotedString) | ||||||||||||||||||||||
|
||||||||||||||||||||||
val newProperties = generateViewProperties( | ||||||||||||||||||||||
viewMeta.properties, session, analyzedPlan, analyzedPlan.schema.fieldNames) | ||||||||||||||||||||||
|
||||||||||||||||||||||
|
@@ -627,17 +632,17 @@ object ViewHelper { | |||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
/** | ||||||||||||||||||||||
* Returns a [[CatalogTable]] that contains information for the temporary view created | ||||||||||||||||||||||
* from a dataframe. | ||||||||||||||||||||||
* Returns a [[CatalogTable]] that contains information for the temporary view storing | ||||||||||||||||||||||
* an analyzed plan. | ||||||||||||||||||||||
*/ | ||||||||||||||||||||||
def prepareTemporaryViewFromDataFrame( | ||||||||||||||||||||||
def prepareTemporaryViewStoringAnalyzedPlan( | ||||||||||||||||||||||
viewName: TableIdentifier, | ||||||||||||||||||||||
analyzedPlan: LogicalPlan): CatalogTable = { | ||||||||||||||||||||||
CatalogTable( | ||||||||||||||||||||||
identifier = viewName, | ||||||||||||||||||||||
tableType = CatalogTableType.VIEW, | ||||||||||||||||||||||
storage = CatalogStorageFormat.empty, | ||||||||||||||||||||||
schema = analyzedPlan.schema, | ||||||||||||||||||||||
properties = Map((VIEW_CREATED_FROM_DATAFRAME, "true"))) | ||||||||||||||||||||||
properties = Map((VIEW_STORING_ANALYZED_PLAN, "true"))) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed because this type of view can also be created not from a dataframe - e.g.,
ALTER VIEW ... AS
withspark.sql.legacy.storeAnalyzedPlanForView
set to true.