Skip to content

Commit

Permalink
[SPARK-18206][ML] Add instrumentation for MLP,NB,LDA,AFT,GLM,Isotonic…
Browse files Browse the repository at this point in the history
…,LiR

## What changes were proposed in this pull request?

add instrumentation for MLP,NB,LDA,AFT,GLM,Isotonic,LiR
## How was this patch tested?

local test in spark-shell

Author: Zheng RuiFeng <[email protected]>
Author: Ruifeng Zheng <[email protected]>

Closes apache#15671 from zhengruifeng/lir_instr.
  • Loading branch information
zhengruifeng authored and uzadude committed Jan 27, 2017
1 parent ba8eb58 commit ba27ed7
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ class GBTClassifier @Since("1.4.0") (
val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Classification)

val instr = Instrumentation.create(this, oldDataset)
instr.logParams(params: _*)
instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType,
maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode,
seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval)
instr.logNumFeatures(numFeatures)
instr.logNumClasses(2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,15 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
* @return Fitted model
*/
override protected def train(dataset: Dataset[_]): MultilayerPerceptronClassificationModel = {
val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, predictionCol, layers, maxIter, tol,
blockSize, solver, stepSize, seed)

val myLayers = $(layers)
val labels = myLayers.last
instr.logNumClasses(labels)
instr.logNumFeatures(myLayers.head)

val lpData = extractLabeledPoints(dataset)
val data = lpData.map(lp => LabelConverter.encodeLabeledPoint(lp, labels))
val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, softmaxOnTop = true)
Expand All @@ -258,7 +265,10 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
}
trainer.setStackSize($(blockSize))
val mlpModel = trainer.train(data)
new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights)
val model = new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights)

instr.logSuccess(model)
model
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,12 @@ class NaiveBayes @Since("1.5.0") (
}
}

val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, weightCol, predictionCol, rawPredictionCol,
probabilityCol, modelType, smoothing, thresholds)

val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
instr.logNumFeatures(numFeatures)
val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))

// Aggregates term frequencies per label.
Expand All @@ -169,6 +174,7 @@ class NaiveBayes @Since("1.5.0") (
}).collect().sortBy(_._1)

val numLabels = aggregated.length
instr.logNumClasses(numLabels)
val numDocuments = aggregated.map(_._2._1).sum

val labelArray = new Array[Double](numLabels)
Expand Down Expand Up @@ -198,7 +204,9 @@ class NaiveBayes @Since("1.5.0") (

val pi = Vectors.dense(piArray)
val theta = new DenseMatrix(numLabels, numFeatures, thetaArray, true)
new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
val model = new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
instr.logSuccess(model)
model
}

@Since("1.5.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ class RandomForestClassifier @Since("1.4.0") (
super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.Classification, getOldImpurity)

val instr = Instrumentation.create(this, oldDataset)
instr.logParams(params: _*)
instr.logParams(labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol,
impurity, numTrees, featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain,
minInstancesPerNode, seed, subsamplingRate, thresholds, cacheNodeIds, checkpointInterval)

val trees = RandomForest
.run(oldDataset, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr))
Expand Down
12 changes: 11 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,12 @@ class LDA @Since("1.6.0") (
@Since("2.0.0")
override def fit(dataset: Dataset[_]): LDAModel = {
transformSchema(dataset.schema, logging = true)

val instr = Instrumentation.create(this, dataset)
instr.logParams(featuresCol, topicDistributionCol, k, maxIter, subsamplingRate,
checkpointInterval, keepLastCheckpoint, optimizeDocConcentration, topicConcentration,
learningDecay, optimizer, learningOffset, seed)

val oldLDA = new OldLDA()
.setK($(k))
.setDocConcentration(getOldDocConcentration)
Expand All @@ -905,7 +911,11 @@ class LDA @Since("1.6.0") (
case m: OldDistributedLDAModel =>
new DistributedLDAModel(uid, m.vocabSize, m, dataset.sparkSession, None)
}
copyValues(newModel).setParent(this)

instr.logNumFeatures(newModel.vocabSize)
val model = copyValues(newModel).setParent(this)
instr.logSuccess(model)
model
}

@Since("1.6.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,12 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
.map { row =>
Rating(row.getInt(0), row.getInt(1), row.getFloat(2))
}

val instr = Instrumentation.create(this, ratings)
instr.logParams(rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha,
userCol, itemCol, ratingCol, predictionCol, maxIter,
regParam, nonnegative, checkpointInterval, seed)
instr.logParams(rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval,
seed, intermediateStorageLevel, finalStorageLevel)

val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank),
numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
maxIter = $(maxIter), regParam = $(regParam), implicitPrefs = $(implicitPrefs),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
val numFeatures = featuresStd.size

val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, censorCol, predictionCol, quantilesCol,
fitIntercept, maxIter, tol, aggregationDepth)
instr.logNamedValue("quantileProbabilities.size", $(quantileProbabilities).length)
instr.logNumFeatures(numFeatures)

if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
featuresStd(i) == 0.0 && featuresSummarizer.mean(i) != 0.0 }) {
logWarning("Fitting AFTSurvivalRegressionModel without intercept on dataset with " +
Expand Down Expand Up @@ -276,8 +282,10 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
val coefficients = Vectors.dense(rawCoefficients)
val intercept = parameters(1)
val scale = math.exp(parameters(0))
val model = new AFTSurvivalRegressionModel(uid, coefficients, intercept, scale)
copyValues(model.setParent(this))
val model = copyValues(new AFTSurvivalRegressionModel(uid, coefficients,
intercept, scale).setParent(this))
instr.logSuccess(model)
model
}

@Since("1.6.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String)
val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Regression)

val instr = Instrumentation.create(this, oldDataset)
instr.logParams(params: _*)
instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType,
maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode,
seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval)
instr.logNumFeatures(numFeatures)

val (baseLearners, learnerWeights) = GradientBoostedTrees.run(oldDataset, boostingStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
val familyAndLink = new FamilyAndLink(familyObj, linkObj)

val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size
val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, weightCol, predictionCol, linkPredictionCol,
family, solver, fitIntercept, link, maxIter, regParam, tol)
instr.logNumFeatures(numFeatures)

if (numFeatures > WeightedLeastSquares.MAX_NUM_FEATURES) {
val msg = "Currently, GeneralizedLinearRegression only supports number of features" +
s" <= ${WeightedLeastSquares.MAX_NUM_FEATURES}. Found $numFeatures in the input dataset."
Expand All @@ -264,7 +269,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
Instance(label, weight, features)
}

if (familyObj == Gaussian && linkObj == Identity) {
val model = if (familyObj == Gaussian && linkObj == Identity) {
// TODO: Make standardizeFeatures and standardizeLabel configurable.
val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), elasticNetParam = 0.0,
standardizeFeatures = true, standardizeLabel = true)
Expand All @@ -274,21 +279,23 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
.setParent(this))
val trainingSummary = new GeneralizedLinearRegressionTrainingSummary(dataset, model,
wlsModel.diagInvAtWA.toArray, 1, getSolver)
return model.setSummary(Some(trainingSummary))
model.setSummary(Some(trainingSummary))
} else {
// Fit Generalized Linear Model by iteratively reweighted least squares (IRLS).
val initialModel = familyAndLink.initialize(instances, $(fitIntercept), $(regParam))
val optimizer = new IterativelyReweightedLeastSquares(initialModel,
familyAndLink.reweightFunc, $(fitIntercept), $(regParam), $(maxIter), $(tol))
val irlsModel = optimizer.fit(instances)
val model = copyValues(
new GeneralizedLinearRegressionModel(uid, irlsModel.coefficients, irlsModel.intercept)
.setParent(this))
val trainingSummary = new GeneralizedLinearRegressionTrainingSummary(dataset, model,
irlsModel.diagInvAtWA.toArray, irlsModel.numIterations, getSolver)
model.setSummary(Some(trainingSummary))
}

// Fit Generalized Linear Model by iteratively reweighted least squares (IRLS).
val initialModel = familyAndLink.initialize(instances, $(fitIntercept), $(regParam))
val optimizer = new IterativelyReweightedLeastSquares(initialModel, familyAndLink.reweightFunc,
$(fitIntercept), $(regParam), $(maxIter), $(tol))
val irlsModel = optimizer.fit(instances)

val model = copyValues(
new GeneralizedLinearRegressionModel(uid, irlsModel.coefficients, irlsModel.intercept)
.setParent(this))
val trainingSummary = new GeneralizedLinearRegressionTrainingSummary(dataset, model,
irlsModel.diagInvAtWA.toArray, irlsModel.numIterations, getSolver)
model.setSummary(Some(trainingSummary))
instr.logSuccess(model)
model
}

@Since("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,16 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, weightCol, predictionCol, featureIndex, isotonic)
instr.logNumFeatures(1)

val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonic))
val oldModel = isotonicRegression.run(instances)

copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this))
val model = copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this))
instr.logSuccess(model)
model
}

@Since("1.5.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
setDefault(regParam -> 0.0)

/**
* Set if we should fit the intercept
* Set if we should fit the intercept.
* Default is true.
*
* @group setParam
Expand Down Expand Up @@ -204,6 +204,11 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
Instance(label, weight, features)
}

val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, weightCol, predictionCol, solver, tol,
elasticNetParam, fitIntercept, maxIter, regParam, standardization, aggregationDepth)
instr.logNumFeatures(numFeatures)

if (($(solver) == "auto" &&
numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) == "normal") {
// For low dimensional data, WeightedLeastSquares is more efficient since the
Expand All @@ -226,7 +231,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
model.diagInvAtWA.toArray,
model.objectiveHistory)

return lrModel.setSummary(Some(trainingSummary))
lrModel.setSummary(Some(trainingSummary))
instr.logSuccess(lrModel)
return lrModel
}

val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
Expand All @@ -251,10 +258,10 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val rawYStd = math.sqrt(ySummarizer.variance(0))
if (rawYStd == 0.0) {
if ($(fitIntercept) || yMean == 0.0) {
// If the rawYStd is zero and fitIntercept=true, then the intercept is yMean with
// If the rawYStd==0 and fitIntercept==true, then the intercept is yMean with
// zero coefficient; as a result, training is not needed.
// Also, if yMean==0 and rawYStd==0, all the coefficients are zero regardless of
// the fitIntercept
// the fitIntercept.
if (yMean == 0.0) {
logWarning(s"Mean and standard deviation of the label are zero, so the coefficients " +
s"and the intercept will all be zero; as a result, training is not needed.")
Expand All @@ -279,7 +286,10 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
model,
Array(0D),
Array(0D))
return model.setSummary(Some(trainingSummary))

model.setSummary(Some(trainingSummary))
instr.logSuccess(model)
return model
} else {
require($(regParam) == 0.0, "The standard deviation of the label is zero. " +
"Model cannot be regularized.")
Expand Down Expand Up @@ -401,7 +411,10 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
model,
Array(0D),
objectiveHistory)

model.setSummary(Some(trainingSummary))
instr.logSuccess(model)
model
}

@Since("1.4.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S
super.getOldStrategy(categoricalFeatures, numClasses = 0, OldAlgo.Regression, getOldImpurity)

val instr = Instrumentation.create(this, oldDataset)
instr.logParams(params: _*)
instr.logParams(labelCol, featuresCol, predictionCol, impurity, numTrees,
featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain,
minInstancesPerNode, seed, subsamplingRate, cacheNodeIds, checkpointInterval)

val trees = RandomForest
.run(oldDataset, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private[spark] class Instrumentation[E <: Estimator[_]] private (
}

/**
* Logs the successful completion of the training session and the value of the learned model.
* Logs the successful completion of the training session.
*/
def logSuccess(model: Model[_]): Unit = {
log(s"training finished")
Expand Down

0 comments on commit ba27ed7

Please sign in to comment.