Skip to content

Commit

Permalink
[SPARK-24613][SQL] Cache with UDF could not be matched with subsequen…
Browse files Browse the repository at this point in the history
…t dependent caches

Wrap the logical plan with a `AnalysisBarrier` for execution plan compilation in CacheManager, in order to avoid the plan being analyzed again.

Add one test in `DatasetCacheSuite`

Author: Maryann Xue <[email protected]>

Closes #21602 from maryannxue/cache-mismatch.
  • Loading branch information
maryannxue authored and gatorsmile committed Jun 27, 2018
1 parent db538b2 commit 6e1f5e0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ResolvedHint}
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, LogicalPlan, ResolvedHint}
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -97,7 +97,7 @@ class CacheManager extends Logging {
val inMemoryRelation = InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize, storageLevel,
sparkSession.sessionState.executePlan(planToCache).executedPlan,
sparkSession.sessionState.executePlan(AnalysisBarrier(planToCache)).executedPlan,
tableName,
planToCache.stats)
cachedData.add(CachedData(planToCache, inMemoryRelation))
Expand Down Expand Up @@ -146,7 +146,7 @@ class CacheManager extends Logging {
useCompression = cd.cachedRepresentation.useCompression,
batchSize = cd.cachedRepresentation.batchSize,
storageLevel = cd.cachedRepresentation.storageLevel,
child = spark.sessionState.executePlan(cd.plan).executedPlan,
child = spark.sessionState.executePlan(AnalysisBarrier(cd.plan)).executedPlan,
tableName = cd.cachedRepresentation.tableName,
statsOfPlanToCache = cd.plan.stats)
needToRecache += cd.copy(cachedRepresentation = newCache)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql

import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -96,4 +97,19 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
agged.unpersist()
assert(agged.storageLevel == StorageLevel.NONE, "The Dataset agged should not be cached.")
}

test("SPARK-24613 Cache with UDF could not be matched with subsequent dependent caches") {
val udf1 = udf({x: Int => x + 1})
val df = spark.range(0, 10).toDF("a").withColumn("b", udf1($"a"))
val df2 = df.agg(sum(df("b")))

df.cache()
df.count()
df2.cache()

val plan = df2.queryExecution.withCachedData
assert(plan.isInstanceOf[InMemoryRelation])
val internalPlan = plan.asInstanceOf[InMemoryRelation].child
assert(internalPlan.find(_.isInstanceOf[InMemoryTableScanExec]).isDefined)
}
}

0 comments on commit 6e1f5e0

Please sign in to comment.