Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22707][ML] Optimize CrossValidator memory occupation by models in fitting #19904

Conversation

WeichenXu123
Copy link
Contributor

@WeichenXu123 WeichenXu123 commented Dec 6, 2017

What changes were proposed in this pull request?

Via some test I found CrossValidator still exists memory issue, it will still occupy O(n*sizeof(model)) memory for holding models when fitting, if well optimized, it should be O(parallelism*sizeof(model))

This is because modelFutures will hold the reference to model object after future is complete (we can use future.value.get.get to fetch it), and the Future.sequence and the modelFutures array holds references to each model future. So all model object are keep referenced. So it will still occupy O(n*sizeof(model)) memory.

I fix this by merging the modelFuture and foldMetricFuture together, and use atomicInteger to statistic complete fitting tasks and when all done, trigger trainingDataset.unpersist.

I ever commented this issue on the old PR [SPARK-19357]
#16774 (review)
unfortunately, at that time I do not realize that the issue still exists, but now I confirm it and create this PR to fix it.

Discussion

I give 3 approaches which we can compare, after discussion I realized none of them is ideal, we have to make a trade-off.

After discussion with @jkbradley , choose approach 3

Approach 1

The approach proposed by @MrBago at #19904 (comment)
This approach resolve the model objects referenced issue, allow the model objects to be GCed in time. BUT, in some cases, it still do not resolve the O(N) model memory occupation issue. Let me use an extreme case to describe it:
suppose we set parallelism = 1, and there're 100 paramMaps. So we have 100 fitting & evaluation tasks. In this approach, because of parallelism = 1, the code have to wait 100 fitting tasks complete, (at this time the memory occupation by models already reach 100 * sizeof(model) ) and then it will unpersist training dataset and then do 100 evaluation tasks.

Approach 2

This approach is my PR old version code 2cc7c28
This approach can make sure at any case, the peak memory occupation by models to be O(numParallelism * sizeof(model)), but, it exists an issue that, in some extreme case, the "unpersist training dataset" will be delayed until most of the evaluation tasks complete. Suppose the case
parallelism = 1, and there're 100 fitting & evaluation tasks, each fitting&evaluation task have to be executed one by one, so only after the first 99 fitting&evaluation tasks and the 100th fitting task complete, the "unpersist training dataset" will be triggered.

Approach 3

After I compared approach 1 and approach 2, I realized that, in the case which parallelism is low but there're many fitting & evaluation tasks, we cannot achieve both of the following two goals:

  • Make the peak memory occupation by models(driver-side) to be O(parallelism * sizeof(model))
  • unpersist training dataset before most of the evaluation tasks started.

So I vote for a simpler approach, move the unpersist training dataset to the end (Does this really matters ?)
Because the goal 1 is more important, we must make sure the peak memory occupation by models (driver-side) to be O(parallelism * sizeof(model)), otherwise it will bring high risk of OOM.
Like following code:

      val foldMetricFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) =>
        Future[Double] {
          val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]]
          //...other minor codes
          val metric = eval.evaluate(model.transform(validationDataset, paramMap))
          logDebug(s"Got metric metricformodeltrainedwithparamMap.")
          metric
        } (executionContext)
      }
      val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
      trainingDataset.unpersist() // <------- unpersist at the end
      validationDataset.unpersist()

How was this patch tested?

N/A

@WeichenXu123
Copy link
Contributor Author

@WeichenXu123 WeichenXu123 force-pushed the fix_cross_validator_memory_issue branch from 7725fd8 to 8bfcc94 Compare December 6, 2017 03:26
@WeichenXu123 WeichenXu123 changed the title [SPARK-22707][ML] Optimize CrossValidator fitting memory occupation by models [SPARK-22707][ML] Optimize CrossValidator memory occupation by models in fitting Dec 6, 2017
@SparkQA
Copy link

SparkQA commented Dec 6, 2017

Test build #84522 has finished for PR 19904 at commit 7725fd8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 6, 2017

Test build #84523 has finished for PR 19904 at commit 8bfcc94.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// TODO: duplicate evaluator to take extra params from input
val metric = eval.evaluate(model.transform(validationDataset, paramMap))
logDebug(s"Got metric $metric for model trained with $paramMap.")
metric
} (executionContext)
}
Future {
signal.synchronized {
while (completeFitCount < epm.length) {
Copy link
Contributor

@MrBago MrBago Dec 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm not too familiar with Futures in Scala. Is it save to create a blocking future like this, do you risk starving the thread pool? Can we jus an if statement in the synchronized block above? something like:

completeFitCount += 1
if (completeFitCount == epm.length) {
    trainingDataset.unpersist()
}

Copy link
Contributor Author

@WeichenXu123 WeichenXu123 Dec 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! your idea is really good. We don't need any "wait thread" to do this unpersist. Your solution will be simpler.
btw, about the risk of starving the thread pool, if it has the issue, then the current master code will also have this issue (because the "Future.sequence" thread use the same thread pool). But the thread was added into threadpool at the last, if it is scheduled to launch at the last, so won't casue this issue. But it seems depend on the threadpool implementation.

@WeichenXu123 WeichenXu123 force-pushed the fix_cross_validator_memory_issue branch from 8bfcc94 to 2cc7c28 Compare December 8, 2017 02:56
@SparkQA
Copy link

SparkQA commented Dec 8, 2017

Test build #84639 has finished for PR 19904 at commit 2cc7c28.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sethah
Copy link
Contributor

sethah commented Dec 8, 2017

Can you share your test/results with us?

@@ -146,25 +147,18 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
val validationDataset = sparkSession.createDataFrame(validation, schema).cache()
logDebug(s"Train split $splitIndex with multiple sets of parameters.")

val completeFitCount = new AtomicInteger(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of Scala futures may be off here, but this seems to change the behavior to me. Now, the unpersist operation will happen in one of the training threads, instead of asynchronously in its own thread. I'm not sure how much of an effect that will have.

Why can't you just put all the logic in one map statement like below:

      // Fit models in a Future for training in parallel
      val modelFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) =>
        Future[Model[_]] {
          val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]]

          if (collectSubModelsParam) {
            subModels.get(splitIndex)(paramIndex) = model
          }
          // TODO: duplicate evaluator to take extra params from input
          val metric = eval.evaluate(model.transform(validationDataset, paramMap))
          logDebug(s"Got metric $metric for model trained with $paramMap.")
          metric
        } (executionContext)
      }

      // Unpersist training data only when all models have trained
      Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext)
        .onComplete { _ => trainingDataset.unpersist() } (executionContext)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We hope to unpersist training dataset once all fitting finished. But your idea here will postpone the unpersist time until all fitting & evaluation done. and your code should have the same effect with:

val modelFutures = ...
val foldMetrics = modelFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
trainingDataset.unpersist()
validationDataset.unpersist()

and, about what you said:

"Now, the unpersist operation will happen in one of the training threads, instead of asynchronously in its own thread. "

What's the possible effect or impact ? trainingDataset.unpersist() itself is a async method and won't block. So will it have some effect ? I think it can be put in any thread safely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also turn a sequence of futures into a future then map on that single future and do the unpersist there.

Copy link
Contributor Author

@WeichenXu123 WeichenXu123 Dec 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk Yes, that's what's done in current master code, but, if in this way, the future have to be split into a modelFuture and foldMetricFuture, so it cannot avoid the issue that: modelFuture still holds the model computed (after future completed) which cause the memory issue.

Copy link
Contributor

@MrBago MrBago Dec 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use futures to do this, you need to use a var for modelFutures, then map on those futures to Unit, then collect those into a sequence, then map on that to unpersist, and also set modelFutures to null to release those references, but why go to the trouble. What's the concern with doing it in the final training thread. Why is this a change in behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MrBago About what your said:

You can use futures to do this, you need to use a var for modelFutures, then map on those futures to Unit, then collect those into a sequence, then map on that to unpersist, and also set modelFutures to null to release those references

Can you post some pseudo code so I can check whether it works fine and its peak memory occupation.
Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might work, but I think what you have in the PR now is better.

      // Fit models in a Future for training in parallel
      var modelFutures = epm.map { paramMap =>
        Future[Model[_]] {
          val model = est.fit(trainingDataset, paramMap)
          model.asInstanceOf[Model[_]]
        } (executionContext)
      }

      // Unpersist training data only when all models have trained
      val unitFutures = modelFutures.map{ _.map{ _ => () } (executionContext) }
      Future.sequence[Unit, Iterable](unitFutures)(implicitly, executionContext)
        .onComplete { _ => trainingDataset.unpersist() } (executionContext)

      // Evaluate models in a Future that will calulate a metric and allow model to be cleaned up
      val foldMetricFutures = modelFutures.zip(epm).map { case (modelFuture, paramMap) =>
        modelFuture.map { model =>
          // TODO: duplicate evaluator to take extra params from input
          val metric = eval.evaluate(model.transform(validationDataset, paramMap))
          logDebug(s"Got metric $metric for model trained with $paramMap.")
          metric
        } (executionContext)
      }
      modelFutures = null

@WeichenXu123
Copy link
Contributor Author

@sethah To verify the memory issue, you can add one line test code against current master at here:

      val modelFutures = ...
      // Unpersist training data only when all models have trained
      Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext)
        .onComplete { _ => trainingDataset.unpersist() } (executionContext)
      // Evaluate models in a Future that will calulate a metric and allow model to be cleaned up
      val foldMetricFutures = ....
      // Wait for metrics to be calculated before unpersisting validation dataset
      val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
      validationDataset.unpersist()

      //add test code here, fetch all models
      val models = modelFutures.map(_.value.get.get)

      foldMetrics

The test code I add here is val models = modelFutures.map(_.value.get.get) So it can prove that these models are still in memory, we can get them.

@BryanCutler
Copy link
Member

Thanks for looking into this @WeichenXu123, this does change the behavior in a couple ways though. Like @sethah said, the unpersist of training data is not async anymore, but this also changes the order in which fit and evaluate are called so that training data is not unpersisted until all but the last models are also evaluated. Before, all modelFutures would be executed first before metricFutures and so training data could be unpersisted as soon as possible. I believe this is how it worked before adding the parallelism too.

I did some local testing where I put modelFutures in an inner function so that they are out of scope before awaitResult is called, and also mapped the Future.sequence similar to #19904 (comment), and this seemed to be enough to allow the models to be GC'd. I think this approach would be a little better.

@MrBago
Copy link
Contributor

MrBago commented Dec 13, 2017

@BryanCutler Thanks for the response, I think I understand the situation a little better here.

I think there is a fundamental tradeoff we cannot avoid. Specifically there is a tradeoff between distributed memory usage and driver memory usage. A model cannot be garbage collected until its evaluation future is done running and the training data cannot be unpersisted until all of the training tasks are finished.

Let's say we have 3 models to train and evaluate with parallelism=1, and let's call the training & evaluation tasks T1-3 and E1-3. If we schedule the tasks T1, T2, T3, unpersist, E1, E2, E3 then we can use less distributed memory but we cannot avoid holding all 3 models in memory. If we schedule the tasks T1, E1, T2, E2, T3, unpersist, E3 then we must use more distributed memory but we only ever need to hold 1 model in memory.

I'm not sure how scala futures work, when T1 is done I don't know whether T2 or E1 get priority in the executor pool. Can we guarantee that the jvm will not schedule T1, T2, T3, E1, E2, E3, unpersist?

@WeichenXu123
Copy link
Contributor Author

WeichenXu123 commented Dec 14, 2017

@MrBago
Your code #19904 (comment) also works fine, I think. Although it is more complicated.

@BryanCutler

the unpersist of training data is not async anymore, but this also changes the order in which fit and evaluate are called so that training data is not unpersisted until all but the last models are also evaluated. Before, all modelFutures would be executed first before metricFutures and so training data could be unpersisted as soon as possible.

My current PR code also unpersist the trainingdataset once all fitting finished (before evaluation). and, the calling df.unpersist() won't block (see df.unpersist(block = false) param). So move it into training thread won't cause some issue I think.

So, in all, I think my current PR also works fine, and the scheduling has the same effect with current master, and my PR is much simpler.

@WeichenXu123
Copy link
Contributor Author

WeichenXu123 commented Dec 15, 2017

I discussed with @MrBago offline, I make a summary for what I thought now:
I give 3 approaches which we can compare, after discussion I realized none of them is ideal, we have to make a trade-off.

Approach 1

The approach proposed by @MrBago at #19904 (comment)
This approach resolve the model objects referenced issue, allow the model objects to be GCed in time. BUT, in some cases, it still do not resolve the O(N) model memory occupation issue. Let me use an extreme case to describe it:
suppose we set parallelism = 1, and there're 100 paramMaps. So we have 100 fitting & evaluation tasks. In this approach, because of parallelism = 1, the code have to wait 100 fitting tasks complete, (at this time the memory occupation by models already reach 100 * sizeof(model) ) and then it will unpersist training dataset and then do 100 evaluation tasks.

Approach 2

This approach is my PR old version code 2cc7c28
This approach can make sure at any case, the peak memory occupation by models to be O(numParallelism * sizeof(model)), but, it exists an issue that, in some extreme case, the "unpersist training dataset" will be delayed until most of the evaluation tasks complete. Suppose the case
parallelism = 1, and there're 100 fitting & evaluation tasks, each fitting&evaluation task have to be executed one by one, so only after the first 99 fitting&evaluation tasks and the 100th fitting task complete, the "unpersist training dataset" will be triggered.

Approach 3

After I compared approach 1 and approach 2, I realized that, in the case which parallelism is low but there're many fitting & evaluation tasks, we cannot achieve both of the following two goals:

  • Make the peak memory occupation by models(driver-side) to be O(parallelism * sizeof(model))
  • unpersist training dataset before most of the evaluation tasks started.

So I vote for a simpler approach, move the unpersist training dataset to the end (Does this really matters ?)
Because the goal 1 is more important, we must make sure the peak memory occupation by models (driver-side) to be O(parallelism * sizeof(model)), otherwise it will bring high risk of OOM.
Like following code:

      val foldMetricFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) =>
        Future[Double] {
          val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]]
          //...other minor codes
          val metric = eval.evaluate(model.transform(validationDataset, paramMap))
          logDebug(s"Got metric metricformodeltrainedwithparamMap.")
          metric
        } (executionContext)
      }
      val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
      trainingDataset.unpersist() // <------- unpersist at the end
      validationDataset.unpersist()

Gentle ping @jkbradley @MrBago @sethah @BryanCutler @holdenk

@jkbradley
Copy link
Member

Strong +1 for unpersisting the data at the end. In the long-term, I don't think we'll even cache the training and validation datasets. Our caching of the training & validation datasets is a temporary hack to get around the issue that we don't have a DataFrame k-fold splitting method. Our current workaround is to go down to RDDs: DataFrame -> RDD -> k-fold split -> DataFrame, and as I recall, we cache to lower the SerDe costs in these conversions. Once we have a k-fold split method for DataFrames, we can just cache the original (full) dataset and not the k splits.

@SparkQA
Copy link

SparkQA commented Dec 19, 2017

Test build #85086 has finished for PR 19904 at commit ccd2689.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems ok now if the long term plan is for a k-fold in DataFrames directly

@@ -18,6 +18,7 @@
package org.apache.spark.ml.tuning

import java.util.{List => JList, Locale}
import java.util.concurrent.atomic.AtomicInteger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed anymore

@SparkQA
Copy link

SparkQA commented Dec 20, 2017

Test build #85183 has finished for PR 19904 at commit cad2104.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 25, 2017

Test build #4024 has finished for PR 19904 at commit cad2104.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jkbradley
Copy link
Member

LGTM
Sorry for the delay & thanks for the PR!
Merging with master

@asfgit asfgit closed this in fba0313 Dec 25, 2017
@WeichenXu123 WeichenXu123 deleted the fix_cross_validator_memory_issue branch December 25, 2017 09:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants