Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18206][ML]Add instrumentation for MLP,NB,LDA,AFT,GLM,Isotonic,LiR #15671

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ class GBTClassifier @Since("1.4.0") (
val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Classification)

val instr = Instrumentation.create(this, oldDataset)
instr.logParams(params: _*)
instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType,
maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode,
seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval)
instr.logNumFeatures(numFeatures)
instr.logNumClasses(2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,15 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
* @return Fitted model
*/
override protected def train(dataset: Dataset[_]): MultilayerPerceptronClassificationModel = {
val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, predictionCol, layers, maxIter, tol,
blockSize, solver, stepSize, seed)

val myLayers = $(layers)
val labels = myLayers.last
instr.logNumClasses(labels)
instr.logNumFeatures(myLayers.head)

val lpData = extractLabeledPoints(dataset)
val data = lpData.map(lp => LabelConverter.encodeLabeledPoint(lp, labels))
val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, softmaxOnTop = true)
Expand All @@ -258,7 +265,10 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
}
trainer.setStackSize($(blockSize))
val mlpModel = trainer.train(data)
new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights)
val model = new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights)

instr.logSuccess(model)
model
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,12 @@ class NaiveBayes @Since("1.5.0") (
}
}

val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, weightCol, predictionCol, rawPredictionCol,
probabilityCol, modelType, smoothing, thresholds)

val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
instr.logNumFeatures(numFeatures)
val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))

// Aggregates term frequencies per label.
Expand All @@ -169,6 +174,7 @@ class NaiveBayes @Since("1.5.0") (
}).collect().sortBy(_._1)

val numLabels = aggregated.length
instr.logNumClasses(numLabels)
val numDocuments = aggregated.map(_._2._1).sum

val labelArray = new Array[Double](numLabels)
Expand Down Expand Up @@ -198,7 +204,9 @@ class NaiveBayes @Since("1.5.0") (

val pi = Vectors.dense(piArray)
val theta = new DenseMatrix(numLabels, numFeatures, thetaArray, true)
new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
val model = new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
instr.logSuccess(model)
model
}

@Since("1.5.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ class RandomForestClassifier @Since("1.4.0") (
super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.Classification, getOldImpurity)

val instr = Instrumentation.create(this, oldDataset)
instr.logParams(params: _*)
instr.logParams(labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol,
impurity, numTrees, featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain,
minInstancesPerNode, seed, subsamplingRate, thresholds, cacheNodeIds, checkpointInterval)

val trees = RandomForest
.run(oldDataset, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr))
Expand Down
12 changes: 11 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,12 @@ class LDA @Since("1.6.0") (
@Since("2.0.0")
override def fit(dataset: Dataset[_]): LDAModel = {
transformSchema(dataset.schema, logging = true)

val instr = Instrumentation.create(this, dataset)
instr.logParams(featuresCol, topicDistributionCol, k, maxIter, subsamplingRate,
checkpointInterval, keepLastCheckpoint, optimizeDocConcentration, topicConcentration,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing "docConcentration"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I leave it out by intention. Because docConcentration may be a double arrary of size k, it maybe too large to log.

learningDecay, optimizer, learningOffset, seed)

val oldLDA = new OldLDA()
.setK($(k))
.setDocConcentration(getOldDocConcentration)
Expand All @@ -905,7 +911,11 @@ class LDA @Since("1.6.0") (
case m: OldDistributedLDAModel =>
new DistributedLDAModel(uid, m.vocabSize, m, dataset.sparkSession, None)
}
copyValues(newModel).setParent(this)

instr.logNumFeatures(newModel.vocabSize)
val model = copyValues(newModel).setParent(this)
instr.logSuccess(model)
model
}

@Since("1.6.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,12 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
.map { row =>
Rating(row.getInt(0), row.getInt(1), row.getFloat(2))
}

val instr = Instrumentation.create(this, ratings)
instr.logParams(rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha,
userCol, itemCol, ratingCol, predictionCol, maxIter,
regParam, nonnegative, checkpointInterval, seed)
instr.logParams(rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval,
seed, intermediateStorageLevel, finalStorageLevel)

val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank),
numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
maxIter = $(maxIter), regParam = $(regParam), implicitPrefs = $(implicitPrefs),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
val numFeatures = featuresStd.size

val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, censorCol, predictionCol, quantilesCol,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also log "quantileProbabilities"

Copy link
Contributor Author

@zhengruifeng zhengruifeng Jan 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a DoubleArrayParam, so it maybe too large. Shall we always log it?

fitIntercept, maxIter, tol, aggregationDepth)
instr.logNamedValue("quantileProbabilities.size", $(quantileProbabilities).length)
instr.logNumFeatures(numFeatures)

if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
featuresStd(i) == 0.0 && featuresSummarizer.mean(i) != 0.0 }) {
logWarning("Fitting AFTSurvivalRegressionModel without intercept on dataset with " +
Expand Down Expand Up @@ -276,8 +282,10 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
val coefficients = Vectors.dense(rawCoefficients)
val intercept = parameters(1)
val scale = math.exp(parameters(0))
val model = new AFTSurvivalRegressionModel(uid, coefficients, intercept, scale)
copyValues(model.setParent(this))
val model = copyValues(new AFTSurvivalRegressionModel(uid, coefficients,
intercept, scale).setParent(this))
instr.logSuccess(model)
model
}

@Since("1.6.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String)
val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Regression)

val instr = Instrumentation.create(this, oldDataset)
instr.logParams(params: _*)
instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType,
maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode,
seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval)
instr.logNumFeatures(numFeatures)

val (baseLearners, learnerWeights) = GradientBoostedTrees.run(oldDataset, boostingStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
val familyAndLink = new FamilyAndLink(familyObj, linkObj)

val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size
val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, weightCol, predictionCol, linkPredictionCol,
family, solver, fitIntercept, link, maxIter, regParam, tol)
instr.logNumFeatures(numFeatures)

if (numFeatures > WeightedLeastSquares.MAX_NUM_FEATURES) {
val msg = "Currently, GeneralizedLinearRegression only supports number of features" +
s" <= ${WeightedLeastSquares.MAX_NUM_FEATURES}. Found $numFeatures in the input dataset."
Expand All @@ -264,7 +269,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
Instance(label, weight, features)
}

if (familyObj == Gaussian && linkObj == Identity) {
val model = if (familyObj == Gaussian && linkObj == Identity) {
// TODO: Make standardizeFeatures and standardizeLabel configurable.
val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), elasticNetParam = 0.0,
standardizeFeatures = true, standardizeLabel = true)
Expand All @@ -274,21 +279,23 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
.setParent(this))
val trainingSummary = new GeneralizedLinearRegressionTrainingSummary(dataset, model,
wlsModel.diagInvAtWA.toArray, 1, getSolver)
return model.setSummary(Some(trainingSummary))
model.setSummary(Some(trainingSummary))
} else {
// Fit Generalized Linear Model by iteratively reweighted least squares (IRLS).
val initialModel = familyAndLink.initialize(instances, $(fitIntercept), $(regParam))
val optimizer = new IterativelyReweightedLeastSquares(initialModel,
familyAndLink.reweightFunc, $(fitIntercept), $(regParam), $(maxIter), $(tol))
val irlsModel = optimizer.fit(instances)
val model = copyValues(
new GeneralizedLinearRegressionModel(uid, irlsModel.coefficients, irlsModel.intercept)
.setParent(this))
val trainingSummary = new GeneralizedLinearRegressionTrainingSummary(dataset, model,
irlsModel.diagInvAtWA.toArray, irlsModel.numIterations, getSolver)
model.setSummary(Some(trainingSummary))
}

// Fit Generalized Linear Model by iteratively reweighted least squares (IRLS).
val initialModel = familyAndLink.initialize(instances, $(fitIntercept), $(regParam))
val optimizer = new IterativelyReweightedLeastSquares(initialModel, familyAndLink.reweightFunc,
$(fitIntercept), $(regParam), $(maxIter), $(tol))
val irlsModel = optimizer.fit(instances)

val model = copyValues(
new GeneralizedLinearRegressionModel(uid, irlsModel.coefficients, irlsModel.intercept)
.setParent(this))
val trainingSummary = new GeneralizedLinearRegressionTrainingSummary(dataset, model,
irlsModel.diagInvAtWA.toArray, irlsModel.numIterations, getSolver)
model.setSummary(Some(trainingSummary))
instr.logSuccess(model)
model
}

@Since("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,16 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, weightCol, predictionCol, featureIndex, isotonic)
instr.logNumFeatures(1)

val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonic))
val oldModel = isotonicRegression.run(instances)

copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this))
val model = copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this))
instr.logSuccess(model)
model
}

@Since("1.5.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
setDefault(regParam -> 0.0)

/**
* Set if we should fit the intercept
* Set if we should fit the intercept.
* Default is true.
*
* @group setParam
Expand Down Expand Up @@ -204,6 +204,11 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
Instance(label, weight, features)
}

val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, weightCol, predictionCol, solver, tol,
elasticNetParam, fitIntercept, maxIter, regParam, standardization, aggregationDepth)
instr.logNumFeatures(numFeatures)

if (($(solver) == "auto" &&
numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) == "normal") {
// For low dimensional data, WeightedLeastSquares is more efficient since the
Expand All @@ -226,7 +231,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
model.diagInvAtWA.toArray,
model.objectiveHistory)

return lrModel.setSummary(Some(trainingSummary))
lrModel.setSummary(Some(trainingSummary))
instr.logSuccess(lrModel)
return lrModel
}

val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
Expand All @@ -251,10 +258,10 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val rawYStd = math.sqrt(ySummarizer.variance(0))
if (rawYStd == 0.0) {
if ($(fitIntercept) || yMean == 0.0) {
// If the rawYStd is zero and fitIntercept=true, then the intercept is yMean with
// If the rawYStd==0 and fitIntercept==true, then the intercept is yMean with
// zero coefficient; as a result, training is not needed.
// Also, if yMean==0 and rawYStd==0, all the coefficients are zero regardless of
// the fitIntercept
// the fitIntercept.
if (yMean == 0.0) {
logWarning(s"Mean and standard deviation of the label are zero, so the coefficients " +
s"and the intercept will all be zero; as a result, training is not needed.")
Expand All @@ -279,7 +286,10 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
model,
Array(0D),
Array(0D))
return model.setSummary(Some(trainingSummary))

model.setSummary(Some(trainingSummary))
instr.logSuccess(model)
return model
} else {
require($(regParam) == 0.0, "The standard deviation of the label is zero. " +
"Model cannot be regularized.")
Expand Down Expand Up @@ -401,7 +411,10 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
model,
Array(0D),
objectiveHistory)

model.setSummary(Some(trainingSummary))
instr.logSuccess(model)
model
}

@Since("1.4.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S
super.getOldStrategy(categoricalFeatures, numClasses = 0, OldAlgo.Regression, getOldImpurity)

val instr = Instrumentation.create(this, oldDataset)
instr.logParams(params: _*)
instr.logParams(labelCol, featuresCol, predictionCol, impurity, numTrees,
featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain,
minInstancesPerNode, seed, subsamplingRate, cacheNodeIds, checkpointInterval)

val trees = RandomForest
.run(oldDataset, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private[spark] class Instrumentation[E <: Estimator[_]] private (
}

/**
* Logs the successful completion of the training session and the value of the learned model.
* Logs the successful completion of the training session.
*/
def logSuccess(model: Model[_]): Unit = {
log(s"training finished")
Expand Down