-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1544 Add support for deep decision trees. #475
Changes from 7 commits
50b143a
abc5a23
2f6072c
2f1e093
0287772
fecf89a
719d009
9dbdabe
1517155
718506b
e0426ee
dad9652
cbd9f14
5e82202
4731cda
5eca9e4
8053fed
426bb28
b27ad2c
ce004a1
7fc9545
968ca9d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ import org.apache.spark.mllib.tree.impurity.{Entropy, Gini, Impurity, Variance} | |
import org.apache.spark.mllib.tree.model._ | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.util.random.XORShiftRandom | ||
import org.apache.spark.util.Utils.memoryStringToMb | ||
import org.apache.spark.mllib.linalg.{Vector, Vectors} | ||
|
||
/** | ||
|
@@ -58,7 +59,8 @@ class DecisionTree (private val strategy: Strategy) extends Serializable with Lo | |
// Find the splits and the corresponding bins (interval between the splits) using a sample | ||
// of the input data. | ||
val (splits, bins) = DecisionTree.findSplitsBins(input, strategy) | ||
logDebug("numSplits = " + bins(0).length) | ||
val numBins = bins(0).length | ||
logDebug("numBins = " + numBins) | ||
|
||
// depth of the decision tree | ||
val maxDepth = strategy.maxDepth | ||
|
@@ -72,7 +74,28 @@ class DecisionTree (private val strategy: Strategy) extends Serializable with Lo | |
val parentImpurities = new Array[Double](maxNumNodes) | ||
// dummy value for top node (updated during first split calculation) | ||
val nodes = new Array[Node](maxNumNodes) | ||
// num features | ||
val numFeatures = input.take(1)(0).features.size | ||
|
||
// Calculate level for single group construction | ||
|
||
// Max memory usage for aggregates | ||
val maxMemoryUsage = strategy.maxMemory * 1024 * 1024 | ||
logDebug("max memory usage for aggregates = " + maxMemoryUsage) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
val numElementsPerNode = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessary to have an extra |
||
strategy.algo match { | ||
case Classification => 2 * numBins * numFeatures | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove extra space between |
||
case Regression => 3 * numBins * numFeatures | ||
} | ||
} | ||
logDebug("numElementsPerNode = " + numElementsPerNode) | ||
val arraySizePerNode = 8 * numElementsPerNode // approx. memory usage for bin aggregate array | ||
val maxNumberOfNodesPerGroup = scala.math.max(maxMemoryUsage / arraySizePerNode, 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not just use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @techaddict Happy to change it. It is cosmetic or is there something more to it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @manishamde just cleanliness. |
||
logDebug("maxNumberOfNodesPerGroup = " + maxNumberOfNodesPerGroup) | ||
// nodes at a level is 2^(level-1). level is zero indexed. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
val maxLevelForSingleGroup = scala.math.max( | ||
(scala.math.log(maxNumberOfNodesPerGroup) / scala.math.log(2)).floor.toInt - 1, 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here |
||
logDebug("max level for single group = " + maxLevelForSingleGroup) | ||
|
||
/* | ||
* The main idea here is to perform level-wise training of the decision tree nodes thus | ||
|
@@ -92,7 +115,7 @@ class DecisionTree (private val strategy: Strategy) extends Serializable with Lo | |
|
||
// Find best split for all nodes at a level. | ||
val splitsStatsForLevel = DecisionTree.findBestSplits(input, parentImpurities, strategy, | ||
level, filters, splits, bins) | ||
level, filters, splits, bins, maxLevelForSingleGroup) | ||
|
||
for ((nodeSplitStats, index) <- splitsStatsForLevel.view.zipWithIndex) { | ||
// Extract info for nodes at the current level. | ||
|
@@ -110,6 +133,10 @@ class DecisionTree (private val strategy: Strategy) extends Serializable with Lo | |
} | ||
} | ||
|
||
logDebug("#####################################") | ||
logDebug("Extracting tree model") | ||
logDebug("#####################################") | ||
|
||
// Initialize the top or root node of the tree. | ||
val topNode = nodes(0) | ||
// Build the full tree using the node info calculated in the level-wise best split calculations. | ||
|
@@ -260,6 +287,7 @@ object DecisionTree extends Serializable with Logging { | |
* @param filters Filters for all nodes at a given level | ||
* @param splits possible splits for all features | ||
* @param bins possible bins for all features | ||
* @param maxLevelForSingleGroup the deepest level for single-group level-wise computation. | ||
* @return array of splits with best splits for all nodes at a given level. | ||
*/ | ||
protected[tree] def findBestSplits( | ||
|
@@ -269,7 +297,50 @@ object DecisionTree extends Serializable with Logging { | |
level: Int, | ||
filters: Array[List[Filter]], | ||
splits: Array[Array[Split]], | ||
bins: Array[Array[Bin]]): Array[(Split, InformationGainStats)] = { | ||
bins: Array[Array[Bin]], | ||
maxLevelForSingleGroup: Int): Array[(Split, InformationGainStats)] = { | ||
// split into groups to avoid memory overflow during aggregation | ||
if (level > maxLevelForSingleGroup) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove extra space after |
||
val numGroups = scala.math.pow(2, (level - maxLevelForSingleGroup)).toInt | ||
logDebug("numGroups = " + numGroups) | ||
var groupIndex = 0 | ||
var bestSplits = new Array[(Split, InformationGainStats)](0) | ||
while (groupIndex < numGroups) { | ||
val bestSplitsForGroup = findBestSplitsPerGroup(input, parentImpurities, strategy, level, | ||
filters, splits, bins, numGroups, groupIndex) | ||
bestSplits = Array.concat(bestSplits, bestSplitsForGroup) | ||
groupIndex += 1 | ||
} | ||
bestSplits | ||
} else { | ||
findBestSplitsPerGroup(input, parentImpurities, strategy, level, filters, splits, bins) | ||
} | ||
} | ||
|
||
/** | ||
* Returns an array of optimal splits for a group of nodes at a given level | ||
* | ||
* @param input RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] used as training data | ||
* for DecisionTree | ||
* @param parentImpurities Impurities for all parent nodes for the current level | ||
* @param strategy [[org.apache.spark.mllib.tree.configuration.Strategy]] instance containing | ||
* parameters for construction the DecisionTree | ||
* @param level Level of the tree | ||
* @param filters Filters for all nodes at a given level | ||
* @param splits possible splits for all features | ||
* @param bins possible bins for all features | ||
* @return array of splits with best splits for all nodes at a given level. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add docs for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mengxr I already added more documentation. Is there something more I am missing here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see the docs of |
||
*/ | ||
private def findBestSplitsPerGroup( | ||
input: RDD[LabeledPoint], | ||
parentImpurities: Array[Double], | ||
strategy: Strategy, | ||
level: Int, | ||
filters: Array[List[Filter]], | ||
splits: Array[Array[Split]], | ||
bins: Array[Array[Bin]], | ||
numGroups: Int = 1, | ||
groupIndex: Int = 0): Array[(Split, InformationGainStats)] = { | ||
|
||
/* | ||
* The high-level description for the best split optimizations are noted here. | ||
|
@@ -296,20 +367,23 @@ object DecisionTree extends Serializable with Logging { | |
*/ | ||
|
||
// common calculations for multiple nested methods | ||
val numNodes = scala.math.pow(2, level).toInt | ||
val numNodes = scala.math.pow(2, level).toInt / numGroups | ||
logDebug("numNodes = " + numNodes) | ||
// Find the number of features by looking at the first sample. | ||
val numFeatures = input.first().features.size | ||
logDebug("numFeatures = " + numFeatures) | ||
val numBins = bins(0).length | ||
logDebug("numBins = " + numBins) | ||
|
||
// shift when more than one group is used at deep tree level | ||
val groupShift = numNodes * groupIndex | ||
|
||
/** Find the filters used before reaching the current code. */ | ||
def findParentFilters(nodeIndex: Int): List[Filter] = { | ||
if (level == 0) { | ||
List[Filter]() | ||
} else { | ||
val nodeFilterIndex = scala.math.pow(2, level).toInt - 1 + nodeIndex | ||
val nodeFilterIndex = scala.math.pow(2, level).toInt - 1 + nodeIndex + groupShift | ||
filters(nodeFilterIndex) | ||
} | ||
} | ||
|
@@ -878,7 +952,7 @@ object DecisionTree extends Serializable with Logging { | |
// Iterating over all nodes at this level | ||
var node = 0 | ||
while (node < numNodes) { | ||
val nodeImpurityIndex = scala.math.pow(2, level).toInt - 1 + node | ||
val nodeImpurityIndex = scala.math.pow(2, level).toInt - 1 + node + groupShift | ||
val binsForNode: Array[Double] = getBinDataForNode(node) | ||
logDebug("nodeImpurityIndex = " + nodeImpurityIndex) | ||
val parentNodeImpurity = parentImpurities(nodeImpurityIndex) | ||
|
@@ -1085,10 +1159,13 @@ object DecisionTree extends Serializable with Logging { | |
|
||
val maxDepth = options.getOrElse('maxDepth, "1").toString.toInt | ||
val maxBins = options.getOrElse('maxBins, "100").toString.toInt | ||
val maxMemUsage = memoryStringToMb(options.getOrElse('maxMemory, "128m").toString) | ||
|
||
val strategy = new Strategy(algo, impurity, maxDepth, maxBins) | ||
val strategy = new Strategy(algo, impurity, maxDepth, maxBins, maxMemory=maxMemUsage) | ||
val model = DecisionTree.train(trainData, strategy) | ||
|
||
|
||
|
||
// Load test data. | ||
val testData = loadLabeledData(sc, options.get('testDataDir).get.toString) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,9 @@ import org.apache.spark.mllib.tree.configuration.QuantileStrategy._ | |
* k) implies the feature n is categorical with k categories 0, | ||
* 1, 2, ... , k-1. It's important to note that features are | ||
* zero-indexed. | ||
* @param maxMemory maximum memory in MB allocated to histogram aggregation. Default value is | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. |
||
* 128 MB. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the indentation correct? |
||
* | ||
*/ | ||
@Experimental | ||
class Strategy ( | ||
|
@@ -43,4 +46,5 @@ class Strategy ( | |
val maxDepth: Int, | ||
val maxBins: Int = 100, | ||
val quantileCalculationStrategy: QuantileStrategy = Sort, | ||
val categoricalFeaturesInfo: Map[Int, Int] = Map[Int, Int]()) extends Serializable | ||
val categoricalFeaturesInfo: Map[Int, Int] = Map[Int, Int](), | ||
val maxMemory: Int = 128) extends Serializable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, the decision tree guide is now in
mllib-decision-tree.md
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.