diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index f7a78ea0cb55b..a5f62f43c21c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.execution import java.util.concurrent.locks.ReentrantReadWriteLock -import scala.collection.JavaConverters._ -import scala.collection.mutable +import scala.collection.immutable.IndexedSeq import org.apache.hadoop.fs.{FileSystem, Path} @@ -46,38 +45,22 @@ case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) */ class CacheManager extends Logging { - @transient - private val cachedData = new java.util.LinkedList[CachedData] - - @transient - private val cacheLock = new ReentrantReadWriteLock - - /** Acquires a read lock on the cache for the duration of `f`. */ - private def readLock[A](f: => A): A = { - val lock = cacheLock.readLock() - lock.lock() - try f finally { - lock.unlock() - } - } - - /** Acquires a write lock on the cache for the duration of `f`. */ - private def writeLock[A](f: => A): A = { - val lock = cacheLock.writeLock() - lock.lock() - try f finally { - lock.unlock() - } - } + /** + * Maintains the list of cached plans as an immutable sequence. Any updates to the list + * should be protected in a "this.synchronized" block which includes the reading of the + * existing value and the update of the cachedData var. + */ + @transient @volatile + private var cachedData = IndexedSeq[CachedData]() /** Clears all cached tables. */ - def clearCache(): Unit = writeLock { - cachedData.asScala.foreach(_.cachedRepresentation.cacheBuilder.clearCache()) - cachedData.clear() + def clearCache(): Unit = this.synchronized { + cachedData.foreach(_.cachedRepresentation.cacheBuilder.clearCache()) + cachedData = IndexedSeq[CachedData]() } /** Checks if the cache is empty. */ - def isEmpty: Boolean = readLock { + def isEmpty: Boolean = { cachedData.isEmpty } @@ -101,11 +84,11 @@ class CacheManager extends Logging { sparkSession.sessionState.executePlan(planToCache).executedPlan, tableName, planToCache) - writeLock { + this.synchronized { if (lookupCachedData(planToCache).nonEmpty) { logWarning("Data has already been cached.") } else { - cachedData.add(CachedData(planToCache, inMemoryRelation)) + cachedData = CachedData(planToCache, inMemoryRelation) +: cachedData } } } @@ -144,22 +127,12 @@ class CacheManager extends Logging { } else { _.sameResult(plan) } - val plansToUncache = mutable.Buffer[CachedData]() - readLock { - val it = cachedData.iterator() - while (it.hasNext) { - val cd = it.next() - if (shouldRemove(cd.plan)) { - plansToUncache += cd - } - } - } - plansToUncache.foreach { cd => - writeLock { - cachedData.remove(cd) - } - cd.cachedRepresentation.cacheBuilder.clearCache(blocking) + val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan)) + this.synchronized { + cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd)) } + plansToUncache.foreach { _.cachedRepresentation.cacheBuilder.clearCache(blocking) } + // Re-compile dependent cached queries after removing the cached query. if (!cascade) { recacheByCondition(spark, cd => { @@ -194,46 +167,36 @@ class CacheManager extends Logging { private def recacheByCondition( spark: SparkSession, condition: CachedData => Boolean): Unit = { - val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData] - readLock { - val it = cachedData.iterator() - while (it.hasNext) { - val cd = it.next() - if (condition(cd)) { - needToRecache += cd - } - } + val needToRecache = cachedData.filter(condition) + this.synchronized { + // Remove the cache entry before creating a new ones. + cachedData = cachedData.filterNot(cd => needToRecache.exists(_ eq cd)) } needToRecache.map { cd => - writeLock { - // Remove the cache entry before we create a new one, so that we can have a different - // physical plan. - cachedData.remove(cd) - } cd.cachedRepresentation.cacheBuilder.clearCache() val plan = spark.sessionState.executePlan(cd.plan).executedPlan val newCache = InMemoryRelation( cacheBuilder = cd.cachedRepresentation.cacheBuilder.copy(cachedPlan = plan), logicalPlan = cd.plan) val recomputedPlan = cd.copy(cachedRepresentation = newCache) - writeLock { + this.synchronized { if (lookupCachedData(recomputedPlan.plan).nonEmpty) { logWarning("While recaching, data was already added to cache.") } else { - cachedData.add(recomputedPlan) + cachedData = recomputedPlan +: cachedData } } } } /** Optionally returns cached data for the given [[Dataset]] */ - def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock { + def lookupCachedData(query: Dataset[_]): Option[CachedData] = { lookupCachedData(query.logicalPlan) } /** Optionally returns cached data for the given [[LogicalPlan]]. */ - def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock { - cachedData.asScala.find(cd => plan.sameResult(cd.plan)) + def lookupCachedData(plan: LogicalPlan): Option[CachedData] = { + cachedData.find(cd => plan.sameResult(cd.plan)) } /** Replaces segments of the given logical plan with cached versions where possible. */