diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 64c527aad8a57..86ebdac3480b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -46,6 +46,7 @@ case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) class CacheManager extends Logging { @transient + @volatile private var cachedData = IndexedSeq[CachedData]() @transient @@ -143,13 +144,12 @@ class CacheManager extends Logging { } else { _.sameResult(plan) } - val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan)) - plansToUncache.foreach { cd => - writeLock { - cachedData = cachedData.filter(_ != cd) - } - cd.cachedRepresentation.cacheBuilder.clearCache(blocking) - } + val (plansToUncache, remainingPlans) = cachedData.partition(cd => shouldRemove(cd.plan)) + writeLock { + cachedData = remainingPlans + } + plansToUncache.foreach { _.cachedRepresentation.cacheBuilder.clearCache(blocking) } + // Re-compile dependent cached queries after removing the cached query. if (!cascade) { recacheByCondition(spark, cd => { @@ -184,13 +184,12 @@ class CacheManager extends Logging { private def recacheByCondition( spark: SparkSession, condition: CachedData => Boolean): Unit = { - val needToRecache = cachedData.filter(condition) + val (needToRecache, remainingPlans) = cachedData.partition(condition) + writeLock { + // Remove the cache entry before creating a new ones. + cachedData = remainingPlans + } needToRecache.map { cd => - writeLock { - // Remove the cache entry before we create a new one, so that we can have a different - // physical plan. - cachedData = cachedData.filter(_ != cd) - } cd.cachedRepresentation.cacheBuilder.clearCache() val plan = spark.sessionState.executePlan(cd.plan).executedPlan val newCache = InMemoryRelation(