Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26917][SQL] Further reduce locks in CacheManager #24028

Closed
wants to merge 17 commits into from
Closed
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,10 @@ class CacheManager extends Logging {
} else {
_.sameResult(plan)
}
val plansToUncache = mutable.Buffer[CachedData]()
readLock {
val it = cachedData.iterator()
while (it.hasNext) {
val cd = it.next()
if (shouldRemove(cd.plan)) {
plansToUncache += cd
}
}
val cachedDataCopy = readLock {
cachedData.asScala.clone()
}
val plansToUncache = cachedDataCopy.filter(cd => shouldRemove(cd.plan))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we always use whole loops when performance should be a concern:
https://github.com/databricks/scala-style-guide#traversal-and-zipwithindex
No actual performance impact between the two pattens on your heavy workload?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that it's the logic in shouldRemove that takes the time here, and can be done without the lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the problem is that the "shouldRemove" function is passed into this call. If that call is expensive, it causes the lock to be held for arbitrarily long amounts of time.

"shouldRemove" when called from "recacheByPath" causes a full traversal of the entire logical plan tree for every cached plan. In the process of doing this it will regenerate path strings for every file referenced by every single plan. In our situations at least this is easily many orders of magnitude more memory overhead than a shallow copy of the list.

plansToUncache.foreach { cd =>
writeLock {
cachedData.remove(cd)
Expand Down Expand Up @@ -194,16 +188,10 @@ class CacheManager extends Logging {
private def recacheByCondition(
spark: SparkSession,
condition: CachedData => Boolean): Unit = {
val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
readLock {
val it = cachedData.iterator()
while (it.hasNext) {
val cd = it.next()
if (condition(cd)) {
needToRecache += cd
}
}
val cachedDataCopy = readLock {
cachedData.asScala.clone()
}
val needToRecache = cachedDataCopy.filter(condition)
needToRecache.map { cd =>
writeLock {
// Remove the cache entry before we create a new one, so that we can have a different
Expand Down