-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-34546][SQL] AlterViewAs.query should be analyzed during the analysis phase, and AlterViewAs should invalidate the cache #31652
Changes from all commits
ba766f7
12af3a7
eeb3712
2b2d35e
ba42775
51c66e8
8c8b0ef
31cb500
9302be9
e86d5e3
7e4815a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -22,8 +22,9 @@ import scala.collection.mutable | |||||||||||||||||||||
import org.json4s.JsonAST.{JArray, JString} | ||||||||||||||||||||||
import org.json4s.jackson.JsonMethods._ | ||||||||||||||||||||||
|
||||||||||||||||||||||
import org.apache.spark.internal.Logging | ||||||||||||||||||||||
import org.apache.spark.sql.{AnalysisException, Row, SparkSession} | ||||||||||||||||||||||
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} | ||||||||||||||||||||||
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier} | ||||||||||||||||||||||
import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, ViewType} | ||||||||||||||||||||||
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation} | ||||||||||||||||||||||
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression} | ||||||||||||||||||||||
|
@@ -115,48 +116,27 @@ case class CreateViewCommand( | |||||||||||||||||||||
|
||||||||||||||||||||||
if (viewType == LocalTempView) { | ||||||||||||||||||||||
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) | ||||||||||||||||||||||
if (replace && needsToUncache(catalog.getRawTempView(name.table), aliasedPlan)) { | ||||||||||||||||||||||
logInfo(s"Try to uncache ${name.quotedString} before replacing.") | ||||||||||||||||||||||
checkCyclicViewReference(analyzedPlan, Seq(name), name) | ||||||||||||||||||||||
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
// If there is no sql text (e.g. from Dataset API), we will always store the analyzed plan | ||||||||||||||||||||||
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) { | ||||||||||||||||||||||
TemporaryViewRelation( | ||||||||||||||||||||||
prepareTemporaryView( | ||||||||||||||||||||||
name, | ||||||||||||||||||||||
sparkSession, | ||||||||||||||||||||||
analyzedPlan, | ||||||||||||||||||||||
aliasedPlan.schema, | ||||||||||||||||||||||
originalText)) | ||||||||||||||||||||||
} else { | ||||||||||||||||||||||
TemporaryViewRelation( | ||||||||||||||||||||||
prepareTemporaryViewFromDataFrame(name, aliasedPlan), | ||||||||||||||||||||||
Some(aliasedPlan)) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
val tableDefinition = createTemporaryViewRelation( | ||||||||||||||||||||||
name, | ||||||||||||||||||||||
sparkSession, | ||||||||||||||||||||||
replace, | ||||||||||||||||||||||
catalog.getRawTempView, | ||||||||||||||||||||||
originalText, | ||||||||||||||||||||||
analyzedPlan, | ||||||||||||||||||||||
aliasedPlan) | ||||||||||||||||||||||
catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace) | ||||||||||||||||||||||
} else if (viewType == GlobalTempView) { | ||||||||||||||||||||||
val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) | ||||||||||||||||||||||
val viewIdent = TableIdentifier(name.table, Option(db)) | ||||||||||||||||||||||
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) | ||||||||||||||||||||||
if (replace && needsToUncache(catalog.getRawGlobalTempView(name.table), aliasedPlan)) { | ||||||||||||||||||||||
logInfo(s"Try to uncache ${viewIdent.quotedString} before replacing.") | ||||||||||||||||||||||
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent) | ||||||||||||||||||||||
CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) { | ||||||||||||||||||||||
TemporaryViewRelation( | ||||||||||||||||||||||
prepareTemporaryView( | ||||||||||||||||||||||
viewIdent, | ||||||||||||||||||||||
sparkSession, | ||||||||||||||||||||||
analyzedPlan, | ||||||||||||||||||||||
aliasedPlan.schema, | ||||||||||||||||||||||
originalText)) | ||||||||||||||||||||||
} else { | ||||||||||||||||||||||
TemporaryViewRelation( | ||||||||||||||||||||||
prepareTemporaryViewFromDataFrame(viewIdent, aliasedPlan), | ||||||||||||||||||||||
Some(aliasedPlan)) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
val tableDefinition = createTemporaryViewRelation( | ||||||||||||||||||||||
viewIdent, | ||||||||||||||||||||||
sparkSession, | ||||||||||||||||||||||
replace, | ||||||||||||||||||||||
catalog.getRawGlobalTempView, | ||||||||||||||||||||||
originalText, | ||||||||||||||||||||||
analyzedPlan, | ||||||||||||||||||||||
aliasedPlan) | ||||||||||||||||||||||
catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace) | ||||||||||||||||||||||
} else if (catalog.tableExists(name)) { | ||||||||||||||||||||||
val tableMetadata = catalog.getTableMetadata(name) | ||||||||||||||||||||||
|
@@ -192,20 +172,6 @@ case class CreateViewCommand( | |||||||||||||||||||||
Seq.empty[Row] | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
/** | ||||||||||||||||||||||
* Checks if need to uncache the temp view being replaced. | ||||||||||||||||||||||
*/ | ||||||||||||||||||||||
private def needsToUncache( | ||||||||||||||||||||||
rawTempView: Option[LogicalPlan], | ||||||||||||||||||||||
aliasedPlan: LogicalPlan): Boolean = rawTempView match { | ||||||||||||||||||||||
// The temp view doesn't exist, no need to uncache. | ||||||||||||||||||||||
case None => false | ||||||||||||||||||||||
// Do not need to uncache if the to-be-replaced temp view plan and the new plan are the | ||||||||||||||||||||||
// same-result plans. | ||||||||||||||||||||||
case Some(TemporaryViewRelation(_, Some(p))) => !p.sameResult(aliasedPlan) | ||||||||||||||||||||||
case Some(p) => !p.sameResult(aliasedPlan) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
/** | ||||||||||||||||||||||
* If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, | ||||||||||||||||||||||
* else return the analyzed plan directly. | ||||||||||||||||||||||
|
@@ -274,28 +240,29 @@ case class AlterViewAsCommand( | |||||||||||||||||||||
override def innerChildren: Seq[QueryPlan[_]] = Seq(query) | ||||||||||||||||||||||
|
||||||||||||||||||||||
override def run(session: SparkSession): Seq[Row] = { | ||||||||||||||||||||||
// If the plan cannot be analyzed, throw an exception and don't proceed. | ||||||||||||||||||||||
val qe = session.sessionState.executePlan(query) | ||||||||||||||||||||||
qe.assertAnalyzed() | ||||||||||||||||||||||
val analyzedPlan = qe.analyzed | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we remove the similar code in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot yet because of the following: spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala Lines 96 to 105 in f340857
Here, |
||||||||||||||||||||||
|
||||||||||||||||||||||
if (session.sessionState.catalog.isTempView(name)) { | ||||||||||||||||||||||
alterTemporaryView(session, analyzedPlan) | ||||||||||||||||||||||
alterTemporaryView(session, query) | ||||||||||||||||||||||
} else { | ||||||||||||||||||||||
alterPermanentView(session, analyzedPlan) | ||||||||||||||||||||||
alterPermanentView(session, query) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
Seq.empty[Row] | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
private def alterTemporaryView(session: SparkSession, analyzedPlan: LogicalPlan): Unit = { | ||||||||||||||||||||||
val tableDefinition = if (conf.storeAnalyzedPlanForView) { | ||||||||||||||||||||||
analyzedPlan | ||||||||||||||||||||||
val catalog = session.sessionState.catalog | ||||||||||||||||||||||
val getRawTempView: String => Option[LogicalPlan] = if (name.database.isEmpty) { | ||||||||||||||||||||||
catalog.getRawTempView | ||||||||||||||||||||||
} else { | ||||||||||||||||||||||
checkCyclicViewReference(analyzedPlan, Seq(name), name) | ||||||||||||||||||||||
TemporaryViewRelation( | ||||||||||||||||||||||
prepareTemporaryView( | ||||||||||||||||||||||
name, session, analyzedPlan, analyzedPlan.schema, Some(originalText))) | ||||||||||||||||||||||
catalog.getRawGlobalTempView | ||||||||||||||||||||||
} | ||||||||||||||||||||||
val tableDefinition = createTemporaryViewRelation( | ||||||||||||||||||||||
name, | ||||||||||||||||||||||
session, | ||||||||||||||||||||||
replace = true, | ||||||||||||||||||||||
getRawTempView, | ||||||||||||||||||||||
Some(originalText), | ||||||||||||||||||||||
analyzedPlan, | ||||||||||||||||||||||
aliasedPlan = analyzedPlan) | ||||||||||||||||||||||
session.sessionState.catalog.alterTempViewDefinition(name, tableDefinition) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
|
@@ -306,6 +273,9 @@ case class AlterViewAsCommand( | |||||||||||||||||||||
val viewIdent = viewMeta.identifier | ||||||||||||||||||||||
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent) | ||||||||||||||||||||||
|
||||||||||||||||||||||
logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.") | ||||||||||||||||||||||
CommandUtils.uncacheTableOrView(session, viewIdent.quotedString) | ||||||||||||||||||||||
|
||||||||||||||||||||||
val newProperties = generateViewProperties( | ||||||||||||||||||||||
viewMeta.properties, session, analyzedPlan, analyzedPlan.schema.fieldNames) | ||||||||||||||||||||||
|
||||||||||||||||||||||
|
@@ -349,7 +319,7 @@ case class ShowViewsCommand( | |||||||||||||||||||||
} | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
object ViewHelper { | ||||||||||||||||||||||
object ViewHelper extends SQLConfHelper with Logging { | ||||||||||||||||||||||
|
||||||||||||||||||||||
private val configPrefixDenyList = Seq( | ||||||||||||||||||||||
SQLConf.MAX_NESTED_VIEW_DEPTH.key, | ||||||||||||||||||||||
|
@@ -596,19 +566,80 @@ object ViewHelper { | |||||||||||||||||||||
(collectTempViews(child), collectTempFunctions(child)) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
/** | ||||||||||||||||||||||
* Returns a [[TemporaryViewRelation]] that contains information about a temporary view | ||||||||||||||||||||||
* to create, given an analyzed plan of the view. If a temp view is to be replaced and it is | ||||||||||||||||||||||
* cached, it will be uncached before being replaced. | ||||||||||||||||||||||
* | ||||||||||||||||||||||
* @param name the name of the temporary view to create/replace. | ||||||||||||||||||||||
* @param session the spark session. | ||||||||||||||||||||||
* @param replace if true and the existing view is cached, it will be uncached. | ||||||||||||||||||||||
* @param getRawTempView the function that returns an optional raw plan of the local or | ||||||||||||||||||||||
* global temporary view. | ||||||||||||||||||||||
* @param originalText the original SQL text of this view, can be None if this view is created via | ||||||||||||||||||||||
* Dataset API or spark.sql.legacy.storeAnalyzedPlanForView is set to true. | ||||||||||||||||||||||
* @param analyzedPlan the logical plan that represents the view; this is used to generate the | ||||||||||||||||||||||
* logical plan for temporary view and the view schema. | ||||||||||||||||||||||
* @param aliasedPlan the aliased logical plan based on the user specified columns. If there are | ||||||||||||||||||||||
* no user specified plans, this should be same as `analyzedPlan`. | ||||||||||||||||||||||
*/ | ||||||||||||||||||||||
def createTemporaryViewRelation( | ||||||||||||||||||||||
name: TableIdentifier, | ||||||||||||||||||||||
session: SparkSession, | ||||||||||||||||||||||
replace: Boolean, | ||||||||||||||||||||||
getRawTempView: String => Option[LogicalPlan], | ||||||||||||||||||||||
originalText: Option[String], | ||||||||||||||||||||||
analyzedPlan: LogicalPlan, | ||||||||||||||||||||||
aliasedPlan: LogicalPlan): TemporaryViewRelation = { | ||||||||||||||||||||||
val uncache = getRawTempView(name.table).map { r => | ||||||||||||||||||||||
needsToUncache(r, aliasedPlan) | ||||||||||||||||||||||
}.getOrElse(false) | ||||||||||||||||||||||
if (replace && uncache) { | ||||||||||||||||||||||
logDebug(s"Try to uncache ${name.quotedString} before replacing.") | ||||||||||||||||||||||
checkCyclicViewReference(analyzedPlan, Seq(name), name) | ||||||||||||||||||||||
CommandUtils.uncacheTableOrView(session, name.quotedString) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) { | ||||||||||||||||||||||
TemporaryViewRelation( | ||||||||||||||||||||||
prepareTemporaryView( | ||||||||||||||||||||||
name, | ||||||||||||||||||||||
session, | ||||||||||||||||||||||
analyzedPlan, | ||||||||||||||||||||||
aliasedPlan.schema, | ||||||||||||||||||||||
originalText.get)) | ||||||||||||||||||||||
} else { | ||||||||||||||||||||||
TemporaryViewRelation( | ||||||||||||||||||||||
prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan), | ||||||||||||||||||||||
Some(aliasedPlan)) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
/** | ||||||||||||||||||||||
* Checks if need to uncache the temp view being replaced. | ||||||||||||||||||||||
*/ | ||||||||||||||||||||||
private def needsToUncache( | ||||||||||||||||||||||
rawTempView: LogicalPlan, | ||||||||||||||||||||||
aliasedPlan: LogicalPlan): Boolean = rawTempView match { | ||||||||||||||||||||||
// If TemporaryViewRelation doesn't store the analyzed view, always uncache. | ||||||||||||||||||||||
case TemporaryViewRelation(_, None) => true | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is unnecessary because |
||||||||||||||||||||||
// Do not need to uncache if the to-be-replaced temp view plan and the new plan are the | ||||||||||||||||||||||
// same-result plans. | ||||||||||||||||||||||
case TemporaryViewRelation(_, Some(p)) => !p.sameResult(aliasedPlan) | ||||||||||||||||||||||
case p => !p.sameResult(aliasedPlan) | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we still need it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to first update spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala Lines 93 to 100 in 3a299aa
, which I will get to right after this PR. Once the PR is done, we can update Let me actually create a JIRA to capture these as subtasks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I created https://issues.apache.org/jira/browse/SPARK-34698 to track these. |
||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
/** | ||||||||||||||||||||||
* Returns a [[CatalogTable]] that contains information for temporary view. | ||||||||||||||||||||||
* Generate the view-specific properties(e.g. view default database, view query output | ||||||||||||||||||||||
* column names) and store them as properties in the CatalogTable, and also creates | ||||||||||||||||||||||
* the proper schema for the view. | ||||||||||||||||||||||
*/ | ||||||||||||||||||||||
def prepareTemporaryView( | ||||||||||||||||||||||
private def prepareTemporaryView( | ||||||||||||||||||||||
viewName: TableIdentifier, | ||||||||||||||||||||||
session: SparkSession, | ||||||||||||||||||||||
analyzedPlan: LogicalPlan, | ||||||||||||||||||||||
viewSchema: StructType, | ||||||||||||||||||||||
originalText: Option[String]): CatalogTable = { | ||||||||||||||||||||||
originalText: String): CatalogTable = { | ||||||||||||||||||||||
|
||||||||||||||||||||||
val catalog = session.sessionState.catalog | ||||||||||||||||||||||
val (tempViews, tempFunctions) = collectTemporaryObjects(catalog, analyzedPlan) | ||||||||||||||||||||||
|
@@ -622,22 +653,22 @@ object ViewHelper { | |||||||||||||||||||||
tableType = CatalogTableType.VIEW, | ||||||||||||||||||||||
storage = CatalogStorageFormat.empty, | ||||||||||||||||||||||
schema = viewSchema, | ||||||||||||||||||||||
viewText = originalText, | ||||||||||||||||||||||
viewText = Some(originalText), | ||||||||||||||||||||||
properties = newProperties) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
/** | ||||||||||||||||||||||
* Returns a [[CatalogTable]] that contains information for the temporary view created | ||||||||||||||||||||||
* from a dataframe. | ||||||||||||||||||||||
* Returns a [[CatalogTable]] that contains information for the temporary view storing | ||||||||||||||||||||||
* an analyzed plan. | ||||||||||||||||||||||
*/ | ||||||||||||||||||||||
def prepareTemporaryViewFromDataFrame( | ||||||||||||||||||||||
private def prepareTemporaryViewStoringAnalyzedPlan( | ||||||||||||||||||||||
viewName: TableIdentifier, | ||||||||||||||||||||||
analyzedPlan: LogicalPlan): CatalogTable = { | ||||||||||||||||||||||
CatalogTable( | ||||||||||||||||||||||
identifier = viewName, | ||||||||||||||||||||||
tableType = CatalogTableType.VIEW, | ||||||||||||||||||||||
storage = CatalogStorageFormat.empty, | ||||||||||||||||||||||
schema = analyzedPlan.schema, | ||||||||||||||||||||||
properties = Map((VIEW_CREATED_FROM_DATAFRAME, "true"))) | ||||||||||||||||||||||
properties = Map((VIEW_STORING_ANALYZED_PLAN, "true"))) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed because this type of view can also be created not from a dataframe - e.g.,
ALTER VIEW ... AS
withspark.sql.legacy.storeAnalyzedPlanForView
set to true.