-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26917][SQL] Further reduce locks in CacheManager #24028
Conversation
merge in spark
looks reasonable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems OK to me. I can't think of a material difference in behavior here that could cause a problem. The tradeoff seems to be the cost of clone() vs less time spent with the lock. I could believe that's a win but wonder if it's only a win under heavy load or at scale -- would it materially slow things down for other cases?
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
Outdated
Show resolved
Hide resolved
} | ||
val plansToUncache = cachedDataCopy.filter(cd => shouldRemove(cd.plan)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we always use whole
loops when performance should be a concern:
https://github.com/databricks/scala-style-guide#traversal-and-zipwithindex
No actual performance impact between the two pattens on your heavy workload?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that it's the logic in shouldRemove
that takes the time here, and can be done without the lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the problem is that the "shouldRemove" function is passed into this call. If that call is expensive, it causes the lock to be held for arbitrarily long amounts of time.
"shouldRemove" when called from "recacheByPath" causes a full traversal of the entire logical plan tree for every cached plan. In the process of doing this it will regenerate path strings for every file referenced by every single plan. In our situations at least this is easily many orders of magnitude more memory overhead than a shallow copy of the list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this going to pull pressure on the heap or GC with the "temp" copy?
Good question. I had the same question in my mind. I just think that it is ok since this is shallow copy for a list of objects with two fields. |
I don't think this will be a significant allocation as it's a shallow copy. At smaller scale, it shouldn't matter much. I'd take the win at larger scale |
ok to test |
shall we use some "copy-on-write" thread-safe collections instead of doing clone manually? |
Test build #103294 has finished for PR 24028 at commit
|
retest this please |
Test build #103304 has finished for PR 24028 at commit
|
In response to @cloud-fan I updated this PR to stored the cached data in a scala immutable IndexedSeq. As a result of this, there is no longer a need for read locks at all. cachedData is changed to a var and all we need is a write lock for things that want to update the var. From a code clarity perspective I think this solution is cleaner. In addition to removing read locks, it removes several java to scala collection conversions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I couldn't see any new ways that concurrent updates to cachedData could cause a problem. I suppose you could clear it while uncaching plans and add something back after it was cleared, but that was already possible.
Test build #103344 has finished for PR 24028 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good except for one minor comment
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
Show resolved
Hide resolved
Test build #103375 has finished for PR 24028 at commit
|
Test build #103384 has finished for PR 24028 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
Outdated
Show resolved
Hide resolved
Test build #103400 has finished for PR 24028 at commit
|
} | ||
|
||
/** Checks if the cache is empty. */ | ||
def isEmpty: Boolean = readLock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove the readLock
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove writeLock
as well, and simply use this.synchronized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this change, but while doing that I realized there was an issue with the earlier change to the "partition" function. See comment below
val (plansToUncache, remainingPlans) = cachedData.partition(cd => shouldRemove(cd.plan))
// If a new plan is cached by a different thread at this point, it will be in the cachedData object,
// but not in plansToUncache or remainingPlans. So the next line will remove it.
writeLock {
cachedData = remainingPlans
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I reverted this back to the previous behavior.
} | ||
} | ||
@transient @volatile | ||
private var cachedData = IndexedSeq[CachedData]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we'd better add some comments to explain the access pattern of cachedData
. e.g. adding/removing elements should be done by creating a new seq, and locked by this.
@@ -101,11 +79,11 @@ class CacheManager extends Logging { | |||
sparkSession.sessionState.executePlan(planToCache).executedPlan, | |||
tableName, | |||
planToCache) | |||
writeLock { | |||
this.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please use space instead of tab.
} | ||
cd.cachedRepresentation.cacheBuilder.clearCache(blocking) | ||
val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan)) | ||
this.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Test build #103443 has finished for PR 24028 at commit
|
Test build #103448 has started for PR 24028 at commit |
It looks like the build for the latest commit got messed up. It says here that it is still going, but on the Jenkins server it looks like it completed. |
LGTM, pending jenkins |
It seems the Jenkins run stopped in the middle? |
retest this please |
Test build #103478 has finished for PR 24028 at commit
|
retest this please |
Test build #103485 has finished for PR 24028 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
Outdated
Show resolved
Hide resolved
Test build #103504 has finished for PR 24028 at commit
|
thanks, merging to master! |
val needToRecache = cachedData.filter(condition) | ||
this.synchronized { | ||
// Remove the cache entry before creating a new ones. | ||
cachedData = cachedData.filterNot(cd => needToRecache.exists(_ eq cd)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I talked with @maropu. Since write to a volatile
reference is atomic, I think that we could remove this.sychronization
if we would allow this operation cachedData.filterNot(cd => needToRecache.exists(_ eq cd))
to run on multiple threads.
Since the performance bottleneck was in read lock, I do not recommend to apply this change very soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The write itself is atomic, but you need the this.synchronized to ensure that the value you are writing is using the most up to date value of cachedData. The value is read as part of cachedData.filterNot, and you need to make sure cachedData doesn't change between the time when it is read and when it is written.
What changes were proposed in this pull request?
Further load increases in our production environment have shown that even the read locks can cause some contention, since they contain a mechanism that turns a read lock into an exclusive lock if a writer has been starved out. This PR reduces the potential for lock contention even further than #23833. Additionally, it uses more idiomatic scala than the previous implementation.
@cloud-fan & @gatorsmile This is a relatively minor improvement to the previous CacheManager changes. At this point, I think we finally are doing the minimum possible amount of locking.
How was this patch tested?
Has been tested on a live system where the blocking was causing major issues and it is working well.
CacheManager has no explicit unit test but is used in many places internally as part of the SharedState.