From 4a5c388693159c22e69c29da32a4c0c766d30249 Mon Sep 17 00:00:00 2001 From: Maryann Xue Date: Wed, 20 Jun 2018 16:18:50 -0700 Subject: [PATCH 1/2] [SPARK-24613] Cache with UDF could not be matched with subsequent dependent caches --- .../apache/spark/sql/execution/CacheManager.scala | 6 +++--- .../org/apache/spark/sql/DatasetCacheSuite.scala | 15 +++++++++++++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 93bf91e56f1bd..2db7c02e86014 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ResolvedHint} +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, LogicalPlan, ResolvedHint} import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.storage.StorageLevel @@ -97,7 +97,7 @@ class CacheManager extends Logging { val inMemoryRelation = InMemoryRelation( sparkSession.sessionState.conf.useCompression, sparkSession.sessionState.conf.columnBatchSize, storageLevel, - sparkSession.sessionState.executePlan(planToCache).executedPlan, + sparkSession.sessionState.executePlan(AnalysisBarrier(planToCache)).executedPlan, tableName, planToCache) cachedData.add(CachedData(planToCache, inMemoryRelation)) @@ -142,7 +142,7 @@ class CacheManager extends Logging { // Remove the cache entry before we create a new one, so that we can have a different // physical plan. it.remove() - val plan = spark.sessionState.executePlan(cd.plan).executedPlan + val plan = spark.sessionState.executePlan(AnalysisBarrier(cd.plan)).executedPlan val newCache = InMemoryRelation( cacheBuilder = cd.cachedRepresentation .cacheBuilder.copy(cachedPlan = plan)(_cachedColumnBuffers = null), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala index 82a93f74dd76c..5447bc845e000 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala @@ -132,4 +132,19 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits df.unpersist() assert(df.storageLevel == StorageLevel.NONE) } + + test("SPARK-24613 Cache with UDF could not be matched with subsequent dependent caches") { + val expensiveUDF = udf({x: Int => Thread.sleep(10000); x}) + val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a")) + val df2 = df.agg(sum(df("b"))) + + df.cache() + df.count() + df2.cache() + + // udf has been evaluated during caching, and thus should not be re-evaluated here + failAfter(5 seconds) { + df2.collect() + } + } } From 377f2134e6b4990b4c1d080e5fd5119fe808e057 Mon Sep 17 00:00:00 2001 From: Maryann Xue Date: Wed, 20 Jun 2018 23:09:08 -0700 Subject: [PATCH 2/2] Refine test --- .../org/apache/spark/sql/DatasetCacheSuite.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala index 5447bc845e000..c4f056334cd1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import org.scalatest.concurrent.TimeLimits import org.scalatest.time.SpanSugar._ +import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.storage.StorageLevel @@ -134,17 +135,17 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits } test("SPARK-24613 Cache with UDF could not be matched with subsequent dependent caches") { - val expensiveUDF = udf({x: Int => Thread.sleep(10000); x}) - val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a")) + val udf1 = udf({x: Int => x + 1}) + val df = spark.range(0, 10).toDF("a").withColumn("b", udf1($"a")) val df2 = df.agg(sum(df("b"))) df.cache() df.count() df2.cache() - // udf has been evaluated during caching, and thus should not be re-evaluated here - failAfter(5 seconds) { - df2.collect() - } + val plan = df2.queryExecution.withCachedData + assert(plan.isInstanceOf[InMemoryRelation]) + val internalPlan = plan.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan + assert(internalPlan.find(_.isInstanceOf[InMemoryTableScanExec]).isDefined) } }