Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26917][SQL] Further reduce locks in CacheManager #24028

Closed
wants to merge 17 commits into from

Conversation

DaveDeCaprio
Copy link
Contributor

What changes were proposed in this pull request?

Further load increases in our production environment have shown that even the read locks can cause some contention, since they contain a mechanism that turns a read lock into an exclusive lock if a writer has been starved out. This PR reduces the potential for lock contention even further than #23833. Additionally, it uses more idiomatic scala than the previous implementation.

@cloud-fan & @gatorsmile This is a relatively minor improvement to the previous CacheManager changes. At this point, I think we finally are doing the minimum possible amount of locking.

How was this patch tested?

Has been tested on a live system where the blocking was causing major issues and it is working well.
CacheManager has no explicit unit test but is used in many places internally as part of the SharedState.

@kiszk
Copy link
Member

kiszk commented Mar 9, 2019

looks reasonable

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems OK to me. I can't think of a material difference in behavior here that could cause a problem. The tradeoff seems to be the cost of clone() vs less time spent with the lock. I could believe that's a win but wonder if it's only a win under heavy load or at scale -- would it materially slow things down for other cases?

}
val plansToUncache = cachedDataCopy.filter(cd => shouldRemove(cd.plan))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we always use whole loops when performance should be a concern:
https://github.com/databricks/scala-style-guide#traversal-and-zipwithindex
No actual performance impact between the two pattens on your heavy workload?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that it's the logic in shouldRemove that takes the time here, and can be done without the lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the problem is that the "shouldRemove" function is passed into this call. If that call is expensive, it causes the lock to be held for arbitrarily long amounts of time.

"shouldRemove" when called from "recacheByPath" causes a full traversal of the entire logical plan tree for every cached plan. In the process of doing this it will regenerate path strings for every file referenced by every single plan. In our situations at least this is easily many orders of magnitude more memory overhead than a shallow copy of the list.

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this going to pull pressure on the heap or GC with the "temp" copy?

@kiszk
Copy link
Member

kiszk commented Mar 10, 2019

Good question. I had the same question in my mind. I just think that it is ok since this is shallow copy for a list of objects with two fields.
It would be good if there is statistics regarding # of objects in multiple scenario.

@srowen
Copy link
Member

srowen commented Mar 10, 2019

I don't think this will be a significant allocation as it's a shallow copy. At smaller scale, it shouldn't matter much. I'd take the win at larger scale

@cloud-fan
Copy link
Contributor

ok to test

@cloud-fan
Copy link
Contributor

shall we use some "copy-on-write" thread-safe collections instead of doing clone manually?

@SparkQA
Copy link

SparkQA commented Mar 11, 2019

Test build #103294 has finished for PR 24028 at commit 27ab9f0.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Mar 11, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Mar 11, 2019

Test build #103304 has finished for PR 24028 at commit 27ab9f0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@DaveDeCaprio
Copy link
Contributor Author

In response to @cloud-fan I updated this PR to stored the cached data in a scala immutable IndexedSeq.

As a result of this, there is no longer a need for read locks at all. cachedData is changed to a var and all we need is a write lock for things that want to update the var. From a code clarity perspective I think this solution is cleaner. In addition to removing read locks, it removes several java to scala collection conversions.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I couldn't see any new ways that concurrent updates to cachedData could cause a problem. I suppose you could clear it while uncaching plans and add something back after it was cleared, but that was already possible.

@SparkQA
Copy link

SparkQA commented Mar 11, 2019

Test build #103344 has finished for PR 24028 at commit 1d6f84a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good except for one minor comment

@SparkQA
Copy link

SparkQA commented Mar 12, 2019

Test build #103375 has finished for PR 24028 at commit a5977f0.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 12, 2019

Test build #103384 has finished for PR 24028 at commit 1de029c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 13, 2019

Test build #103400 has finished for PR 24028 at commit 755d484.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

/** Checks if the cache is empty. */
def isEmpty: Boolean = readLock {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove the readLock method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove writeLock as well, and simply use this.synchronized

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this change, but while doing that I realized there was an issue with the earlier change to the "partition" function. See comment below

val (plansToUncache, remainingPlans) = cachedData.partition(cd => shouldRemove(cd.plan))
// If a new plan is cached by a different thread at this point, it will be in the cachedData object,
// but not in plansToUncache or remainingPlans. So the next line will remove it.
writeLock {
cachedData = remainingPlans
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I reverted this back to the previous behavior.

}
}
@transient @volatile
private var cachedData = IndexedSeq[CachedData]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'd better add some comments to explain the access pattern of cachedData. e.g. adding/removing elements should be done by creating a new seq, and locked by this.

@@ -101,11 +79,11 @@ class CacheManager extends Logging {
sparkSession.sessionState.executePlan(planToCache).executedPlan,
tableName,
planToCache)
writeLock {
this.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please use space instead of tab.

}
cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan))
this.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@SparkQA
Copy link

SparkQA commented Mar 13, 2019

Test build #103443 has finished for PR 24028 at commit d402533.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 13, 2019

Test build #103448 has started for PR 24028 at commit 1d693af.

@DaveDeCaprio
Copy link
Contributor Author

It looks like the build for the latest commit got messed up. It says here that it is still going, but on the Jenkins server it looks like it completed.

@cloud-fan
Copy link
Contributor

LGTM, pending jenkins

@maropu
Copy link
Member

maropu commented Mar 14, 2019

It seems the Jenkins run stopped in the middle?

@maropu
Copy link
Member

maropu commented Mar 14, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Mar 14, 2019

Test build #103478 has finished for PR 24028 at commit 1d693af.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Mar 14, 2019

Test build #103485 has finished for PR 24028 at commit 1d693af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 14, 2019

Test build #103504 has finished for PR 24028 at commit 1bb2511.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

val needToRecache = cachedData.filter(condition)
this.synchronized {
// Remove the cache entry before creating a new ones.
cachedData = cachedData.filterNot(cd => needToRecache.exists(_ eq cd))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I talked with @maropu. Since write to a volatile reference is atomic, I think that we could remove this.sychronization if we would allow this operation cachedData.filterNot(cd => needToRecache.exists(_ eq cd)) to run on multiple threads.

Since the performance bottleneck was in read lock, I do not recommend to apply this change very soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write itself is atomic, but you need the this.synchronized to ensure that the value you are writing is using the most up to date value of cachedData. The value is read as part of cachedData.filterNot, and you need to make sure cachedData doesn't change between the time when it is read and when it is written.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants