diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index e455fae4675f4..1d2292e69904d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.execution.command.ViewHelper.createTemporaryViewRelation +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types._ /** @@ -90,12 +91,30 @@ case class CreateTempViewUsing( options = options) val catalog = sparkSession.sessionState.catalog - val viewDefinition = Dataset.ofRows( + val analyzedPlan = Dataset.ofRows( sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan if (global) { + val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) + val viewIdent = TableIdentifier(tableIdent.table, Option(db)) + val viewDefinition = createTemporaryViewRelation( + viewIdent, + sparkSession, + replace, + catalog.getRawGlobalTempView, + originalText = None, + analyzedPlan, + aliasedPlan = analyzedPlan) catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace) } else { + val viewDefinition = createTemporaryViewRelation( + tableIdent, + sparkSession, + replace, + catalog.getRawTempView, + originalText = None, + analyzedPlan, + aliasedPlan = analyzedPlan) catalog.createTempView(tableIdent.table, viewDefinition, replace) } diff --git a/sql/core/src/test/resources/sql-tests/results/show-tables.sql.out b/sql/core/src/test/resources/sql-tests/results/show-tables.sql.out index 139004345accb..9f877a2ab93ec 100644 --- a/sql/core/src/test/resources/sql-tests/results/show-tables.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/show-tables.sql.out @@ -127,6 +127,7 @@ Created Time [not included in comparison] Last Access [not included in comparison] Created By [not included in comparison] Type: VIEW +Table Properties: [view.storingAnalyzedPlan=true] Schema: root |-- e: integer (nullable = true) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 2313a1a206ab0..a98f4d5f49d34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -1509,4 +1509,49 @@ class CachedTableSuite extends QueryTest with SQLTestUtils assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).isEmpty) } } + + test("SPARK-34699: CREATE TEMP VIEW USING should uncache correctly") { + withTempView("tv") { + testCreateTemporaryViewUsingWithCache(TableIdentifier("tv")) + } + } + + test("SPARK-34699: CREATE GLOBAL TEMP VIEW USING should uncache correctly") { + withGlobalTempView("global_tv") { + val db = spark.sharedState.globalTempViewManager.database + testCreateTemporaryViewUsingWithCache(TableIdentifier("global_tv", Some(db))) + } + } + + private def testCreateTemporaryViewUsingWithCache(ident: TableIdentifier): Unit = { + withTempDir { dir => + val path1 = new File(dir, "t1").getCanonicalPath + val path2 = new File(dir, "t2").getCanonicalPath + Seq(1).toDF.write.parquet(path1) + Seq(1).toDF.write.parquet(path2) + + val (tempViewStr, viewName) = if (ident.database.nonEmpty) { + ("GLOBAL TEMPORARY VIEW", s"${ident.database.get}.${ident.table}") + } else { + ("TEMPORARY VIEW", ident.table) + } + + sql(s"CREATE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path1')") + + sql(s"CACHE TABLE $viewName") + assert(spark.catalog.isCached(viewName)) + + // Replacing with the same relation. The cache shouldn't be uncached. + sql(s"CREATE OR REPLACE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path1')") + assert(spark.catalog.isCached(viewName)) + + // Replacing with a different relation. The cache should be cleared. + sql(s"CREATE OR REPLACE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path2')") + assert(!spark.catalog.isCached(viewName)) + + // Validate that the cache is cleared by creating a temp view with the same relation. + sql(s"CREATE OR REPLACE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path1')") + assert(!spark.catalog.isCached(viewName)) + } + } }