Skip to content

Commit

Permalink
[SPARK-22707][ML] Optimize CrossValidator memory occupation by models…
Browse files Browse the repository at this point in the history
… in fitting

## What changes were proposed in this pull request?

Via some test I found CrossValidator still exists memory issue, it will still occupy `O(n*sizeof(model))` memory for holding models when fitting, if well optimized, it should be `O(parallelism*sizeof(model))`

This is because modelFutures will hold the reference to model object after future is complete (we can use `future.value.get.get` to fetch it), and the `Future.sequence` and the `modelFutures` array holds references to each model future. So all model object are keep referenced. So it will still occupy `O(n*sizeof(model))` memory.

I fix this by merging the `modelFuture` and `foldMetricFuture` together, and use `atomicInteger` to statistic complete fitting tasks and when all done, trigger `trainingDataset.unpersist`.

I ever commented this issue on the old PR [SPARK-19357]
#16774 (review)
unfortunately, at that time I do not realize that the issue still exists, but now I confirm it and create this PR to fix it.

## Discussion
I give 3 approaches which we can compare, after discussion I realized none of them is ideal, we have to make a trade-off.

**After discussion with jkbradley , choose approach 3**

### Approach 1
~~The approach proposed by MrBago at~~ #19904 (comment)
~~This approach resolve the model objects referenced issue, allow the model objects to be GCed in time. **BUT, in some cases, it still do not resolve the O(N) model memory occupation issue**. Let me use an extreme case to describe it:~~
~~suppose we set `parallelism = 1`, and there're 100 paramMaps. So we have 100 fitting & evaluation tasks. In this approach, because of `parallelism = 1`, the code have to wait 100 fitting tasks complete, **(at this time the memory occupation by models already reach 100 * sizeof(model) )** and then it will unpersist training dataset and then do 100 evaluation tasks.~~

### Approach 2
~~This approach is my PR old version code~~ 2cc7c28
~~This approach can make sure at any case, the peak memory occupation by models to be `O(numParallelism * sizeof(model))`, but, it exists an issue that, in some extreme case, the "unpersist training dataset" will be delayed until most of the evaluation tasks complete. Suppose the case
 `parallelism = 1`, and there're 100 fitting & evaluation tasks, each fitting&evaluation task have to be executed one by one, so only after the first 99 fitting&evaluation tasks and the 100th fitting task complete, the "unpersist training dataset" will be triggered.~~

### Approach 3
After I compared approach 1 and approach 2, I realized that, in the case which parallelism is low but there're many fitting & evaluation tasks, we cannot achieve both of the following two goals:
- Make the peak memory occupation by models(driver-side) to be O(parallelism * sizeof(model))
- unpersist training dataset before most of the evaluation tasks started.

So I vote for a simpler approach, move the unpersist training dataset to the end (Does this really matters ?)
Because the goal 1 is more important, we must make sure the peak memory occupation by models (driver-side) to be O(parallelism * sizeof(model)), otherwise it will bring high risk of OOM.
Like following code:
```
      val foldMetricFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) =>
        Future[Double] {
          val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]]
          //...other minor codes
          val metric = eval.evaluate(model.transform(validationDataset, paramMap))
          logDebug(s"Got metric metricformodeltrainedwithparamMap.")
          metric
        } (executionContext)
      }
      val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
      trainingDataset.unpersist() // <------- unpersist at the end
      validationDataset.unpersist()
```

## How was this patch tested?

N/A

Author: WeichenXu <[email protected]>

Closes #19904 from WeichenXu123/fix_cross_validator_memory_issue.
  • Loading branch information
WeichenXu123 authored and jkbradley committed Dec 25, 2017
1 parent 0bf1a74 commit fba0313
Showing 1 changed file with 3 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,24 +147,12 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
logDebug(s"Train split $splitIndex with multiple sets of parameters.")

// Fit models in a Future for training in parallel
val modelFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) =>
Future[Model[_]] {
val foldMetricFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) =>
Future[Double] {
val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]]

if (collectSubModelsParam) {
subModels.get(splitIndex)(paramIndex) = model
}
model
} (executionContext)
}

// Unpersist training data only when all models have trained
Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext)
.onComplete { _ => trainingDataset.unpersist() } (executionContext)

// Evaluate models in a Future that will calulate a metric and allow model to be cleaned up
val foldMetricFutures = modelFutures.zip(epm).map { case (modelFuture, paramMap) =>
modelFuture.map { model =>
// TODO: duplicate evaluator to take extra params from input
val metric = eval.evaluate(model.transform(validationDataset, paramMap))
logDebug(s"Got metric $metric for model trained with $paramMap.")
Expand All @@ -174,6 +162,7 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)

// Wait for metrics to be calculated before unpersisting validation dataset
val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
trainingDataset.unpersist()
validationDataset.unpersist()
foldMetrics
}.transpose.map(_.sum / $(numFolds)) // Calculate average metric over all splits
Expand Down

0 comments on commit fba0313

Please sign in to comment.