-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22707][ML] Optimize CrossValidator memory occupation by models in fitting #19904
[SPARK-22707][ML] Optimize CrossValidator memory occupation by models in fitting #19904
Conversation
7725fd8
to
8bfcc94
Compare
Test build #84522 has finished for PR 19904 at commit
|
Test build #84523 has finished for PR 19904 at commit
|
// TODO: duplicate evaluator to take extra params from input | ||
val metric = eval.evaluate(model.transform(validationDataset, paramMap)) | ||
logDebug(s"Got metric $metric for model trained with $paramMap.") | ||
metric | ||
} (executionContext) | ||
} | ||
Future { | ||
signal.synchronized { | ||
while (completeFitCount < epm.length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I'm not too familiar with Futures in Scala. Is it save to create a blocking future like this, do you risk starving the thread pool? Can we jus an if statement in the synchronized
block above? something like:
completeFitCount += 1
if (completeFitCount == epm.length) {
trainingDataset.unpersist()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! your idea is really good. We don't need any "wait thread" to do this unpersist. Your solution will be simpler.
btw, about the risk of starving the thread pool, if it has the issue, then the current master code will also have this issue (because the "Future.sequence" thread use the same thread pool). But the thread was added into threadpool at the last, if it is scheduled to launch at the last, so won't casue this issue. But it seems depend on the threadpool implementation.
8bfcc94
to
2cc7c28
Compare
Test build #84639 has finished for PR 19904 at commit
|
Can you share your test/results with us? |
@@ -146,25 +147,18 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) | |||
val validationDataset = sparkSession.createDataFrame(validation, schema).cache() | |||
logDebug(s"Train split $splitIndex with multiple sets of parameters.") | |||
|
|||
val completeFitCount = new AtomicInteger(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of Scala futures may be off here, but this seems to change the behavior to me. Now, the unpersist operation will happen in one of the training threads, instead of asynchronously in its own thread. I'm not sure how much of an effect that will have.
Why can't you just put all the logic in one map statement like below:
// Fit models in a Future for training in parallel
val modelFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) =>
Future[Model[_]] {
val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]]
if (collectSubModelsParam) {
subModels.get(splitIndex)(paramIndex) = model
}
// TODO: duplicate evaluator to take extra params from input
val metric = eval.evaluate(model.transform(validationDataset, paramMap))
logDebug(s"Got metric $metric for model trained with $paramMap.")
metric
} (executionContext)
}
// Unpersist training data only when all models have trained
Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext)
.onComplete { _ => trainingDataset.unpersist() } (executionContext)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We hope to unpersist training dataset once all fitting finished. But your idea here will postpone the unpersist time until all fitting & evaluation done. and your code should have the same effect with:
val modelFutures = ...
val foldMetrics = modelFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
trainingDataset.unpersist()
validationDataset.unpersist()
and, about what you said:
"Now, the unpersist operation will happen in one of the training threads, instead of asynchronously in its own thread. "
What's the possible effect or impact ? trainingDataset.unpersist()
itself is a async method and won't block. So will it have some effect ? I think it can be put in any thread safely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also turn a sequence of futures into a future then map on that single future and do the unpersist there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk Yes, that's what's done in current master code, but, if in this way, the future
have to be split into a modelFuture
and foldMetricFuture
, so it cannot avoid the issue that: modelFuture
still holds the model
computed (after future completed) which cause the memory issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use futures to do this, you need to use a var for modelFutures
, then map on those futures to Unit
, then collect those into a sequence, then map on that to unpersist, and also set modelFutures
to null to release those references, but why go to the trouble. What's the concern with doing it in the final training thread. Why is this a change in behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MrBago About what your said:
You can use futures to do this, you need to use a var for modelFutures, then map on those futures to Unit, then collect those into a sequence, then map on that to unpersist, and also set modelFutures to null to release those references
Can you post some pseudo code so I can check whether it works fine and its peak memory occupation.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this might work, but I think what you have in the PR now is better.
// Fit models in a Future for training in parallel
var modelFutures = epm.map { paramMap =>
Future[Model[_]] {
val model = est.fit(trainingDataset, paramMap)
model.asInstanceOf[Model[_]]
} (executionContext)
}
// Unpersist training data only when all models have trained
val unitFutures = modelFutures.map{ _.map{ _ => () } (executionContext) }
Future.sequence[Unit, Iterable](unitFutures)(implicitly, executionContext)
.onComplete { _ => trainingDataset.unpersist() } (executionContext)
// Evaluate models in a Future that will calulate a metric and allow model to be cleaned up
val foldMetricFutures = modelFutures.zip(epm).map { case (modelFuture, paramMap) =>
modelFuture.map { model =>
// TODO: duplicate evaluator to take extra params from input
val metric = eval.evaluate(model.transform(validationDataset, paramMap))
logDebug(s"Got metric $metric for model trained with $paramMap.")
metric
} (executionContext)
}
modelFutures = null
@sethah To verify the memory issue, you can add one line test code against current master at here:
The test code I add here is val models = modelFutures.map(_.value.get.get) So it can prove that these models are still in memory, we can get them. |
Thanks for looking into this @WeichenXu123, this does change the behavior in a couple ways though. Like @sethah said, the unpersist of training data is not async anymore, but this also changes the order in which I did some local testing where I put |
@BryanCutler Thanks for the response, I think I understand the situation a little better here. I think there is a fundamental tradeoff we cannot avoid. Specifically there is a tradeoff between distributed memory usage and driver memory usage. A model cannot be garbage collected until its evaluation future is done running and the training data cannot be unpersisted until all of the training tasks are finished. Let's say we have 3 models to train and evaluate with I'm not sure how scala futures work, when T1 is done I don't know whether T2 or E1 get priority in the executor pool. Can we guarantee that the jvm will not schedule T1, T2, T3, E1, E2, E3, unpersist? |
@MrBago
|
I discussed with @MrBago offline, I make a summary for what I thought now: Approach 1
Approach 2
Approach 3After I compared approach 1 and approach 2, I realized that, in the case which parallelism is low but there're many fitting & evaluation tasks, we cannot achieve both of the following two goals:
So I vote for a simpler approach, move the unpersist training dataset to the end (Does this really matters ?)
Gentle ping @jkbradley @MrBago @sethah @BryanCutler @holdenk |
Strong +1 for unpersisting the data at the end. In the long-term, I don't think we'll even cache the training and validation datasets. Our caching of the training & validation datasets is a temporary hack to get around the issue that we don't have a DataFrame k-fold splitting method. Our current workaround is to go down to RDDs: DataFrame -> RDD -> k-fold split -> DataFrame, and as I recall, we cache to lower the SerDe costs in these conversions. Once we have a k-fold split method for DataFrames, we can just cache the original (full) dataset and not the k splits. |
Test build #85086 has finished for PR 19904 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems ok now if the long term plan is for a k-fold in DataFrames directly
@@ -18,6 +18,7 @@ | |||
package org.apache.spark.ml.tuning | |||
|
|||
import java.util.{List => JList, Locale} | |||
import java.util.concurrent.atomic.AtomicInteger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed anymore
Test build #85183 has finished for PR 19904 at commit
|
Test build #4024 has finished for PR 19904 at commit
|
LGTM |
What changes were proposed in this pull request?
Via some test I found CrossValidator still exists memory issue, it will still occupy
O(n*sizeof(model))
memory for holding models when fitting, if well optimized, it should beO(parallelism*sizeof(model))
This is because modelFutures will hold the reference to model object after future is complete (we can use
future.value.get.get
to fetch it), and theFuture.sequence
and themodelFutures
array holds references to each model future. So all model object are keep referenced. So it will still occupyO(n*sizeof(model))
memory.I fix this by merging the
modelFuture
andfoldMetricFuture
together, and useatomicInteger
to statistic complete fitting tasks and when all done, triggertrainingDataset.unpersist
.I ever commented this issue on the old PR [SPARK-19357]
#16774 (review)
unfortunately, at that time I do not realize that the issue still exists, but now I confirm it and create this PR to fix it.
Discussion
I give 3 approaches which we can compare, after discussion I realized none of them is ideal, we have to make a trade-off.
After discussion with @jkbradley , choose approach 3
Approach 1
The approach proposed by @MrBago at#19904 (comment)This approach resolve the model objects referenced issue, allow the model objects to be GCed in time. BUT, in some cases, it still do not resolve the O(N) model memory occupation issue. Let me use an extreme case to describe it:suppose we setparallelism = 1
, and there're 100 paramMaps. So we have 100 fitting & evaluation tasks. In this approach, because ofparallelism = 1
, the code have to wait 100 fitting tasks complete, (at this time the memory occupation by models already reach 100 * sizeof(model) ) and then it will unpersist training dataset and then do 100 evaluation tasks.Approach 2
This approach is my PR old version code2cc7c28This approach can make sure at any case, the peak memory occupation by models to be
O(numParallelism * sizeof(model))
, but, it exists an issue that, in some extreme case, the "unpersist training dataset" will be delayed until most of the evaluation tasks complete. Suppose the caseparallelism = 1
, and there're 100 fitting & evaluation tasks, each fitting&evaluation task have to be executed one by one, so only after the first 99 fitting&evaluation tasks and the 100th fitting task complete, the "unpersist training dataset" will be triggered.Approach 3
After I compared approach 1 and approach 2, I realized that, in the case which parallelism is low but there're many fitting & evaluation tasks, we cannot achieve both of the following two goals:
So I vote for a simpler approach, move the unpersist training dataset to the end (Does this really matters ?)
Because the goal 1 is more important, we must make sure the peak memory occupation by models (driver-side) to be O(parallelism * sizeof(model)), otherwise it will bring high risk of OOM.
Like following code:
How was this patch tested?
N/A