Skip to content

Commit

Permalink
- toDataFrame -> toDF
Browse files Browse the repository at this point in the history
- Dsl -> functions
- implicits moved into SQLContext.implicits
- addColumn -> withColumn
- renameColumn -> withColumnRenamed
  • Loading branch information
rxin committed Feb 13, 2015
1 parent 807e8b1 commit 6b3086c
Show file tree
Hide file tree
Showing 59 changed files with 251 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ object CrossValidatorExample {
crossval.setNumFolds(2) // Use 3+ in practice

// Run cross-validation, and choose the best set of parameters.
val cvModel = crossval.fit(training.toDataFrame)
val cvModel = crossval.fit(training.toDF)

// Prepare test documents, which are unlabeled.
val test = sc.parallelize(Seq(
Expand All @@ -100,7 +100,7 @@ object CrossValidatorExample {
Document(7L, "apache hadoop")))

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test.toDataFrame)
cvModel.transform(test.toDF)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object DeveloperApiExample {
lr.setMaxIter(10)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model = lr.fit(training.toDataFrame)
val model = lr.fit(training.toDF)

// Prepare test data.
val test = sc.parallelize(Seq(
Expand All @@ -67,7 +67,7 @@ object DeveloperApiExample {
LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))))

// Make predictions on test data.
val sumPredictions: Double = model.transform(test.toDataFrame)
val sumPredictions: Double = model.transform(test.toDF)
.select("features", "label", "prediction")
.collect()
.map { case Row(features: Vector, label: Double, prediction: Double) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ object MovieLensALS {
.setRegParam(params.regParam)
.setNumBlocks(params.numBlocks)

val model = als.fit(training.toDataFrame)
val model = als.fit(training.toDF)

val predictions = model.transform(test.toDataFrame).cache()
val predictions = model.transform(test.toDF).cache()

// Evaluate the model.
// TODO: Create an evaluator to compute RMSE.
Expand All @@ -158,7 +158,7 @@ object MovieLensALS {

// Inspect false positives.
predictions.registerTempTable("prediction")
sc.textFile(params.movies).map(Movie.parseMovie).toDataFrame.registerTempTable("movie")
sc.textFile(params.movies).map(Movie.parseMovie).toDF.registerTempTable("movie")
sqlContext.sql(
"""
|SELECT userId, prediction.movieId, title, rating, prediction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object SimpleParamsExample {
.setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training.toDataFrame)
val model1 = lr.fit(training.toDF)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
Expand All @@ -77,7 +77,7 @@ object SimpleParamsExample {

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training.toDataFrame, paramMapCombined)
val model2 = lr.fit(training.toDF, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.fittingParamMap)

// Prepare test data.
Expand All @@ -90,7 +90,7 @@ object SimpleParamsExample {
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test.toDataFrame)
model2.transform(test.toDF)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object SimpleTextClassificationPipeline {
.setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training.toDataFrame)
val model = pipeline.fit(training.toDF)

// Prepare test documents, which are unlabeled.
val test = sc.parallelize(Seq(
Expand All @@ -79,7 +79,7 @@ object SimpleTextClassificationPipeline {
Document(7L, "apache hadoop")))

// Make predictions on test documents.
model.transform(test.toDataFrame)
model.transform(test.toDF)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ object DatasetExample {
println(s"Loaded ${origData.count()} instances from file: ${params.input}")

// Convert input data to DataFrame explicitly.
val df: DataFrame = origData.toDataFrame
val df: DataFrame = origData.toDF
println(s"Inferred schema:\n${df.schema.prettyJson}")
println(s"Converted to DataFrame with ${df.count()} records")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.examples.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._

// One method for defining the schema of an RDD is to make a case class with the desired column
// names and types.
Expand All @@ -34,7 +34,7 @@ object RDDRelation {
// Importing the SQL context gives access to all the SQL functions and implicit conversions.
import sqlContext.implicits._

val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDataFrame
val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF
// Any RDD containing case classes can be registered as a table. The schema of the table is
// automatically inferred using scala reflection.
df.registerTempTable("records")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object HiveFromSpark {

// You can also register RDDs as temporary tables within a HiveContext.
val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
rdd.toDataFrame.registerTempTable("records")
rdd.toDF.registerTempTable("records")

// Queries can then join RDD data with data stored in Hive.
println("Result of SELECT *:")
Expand Down
6 changes: 3 additions & 3 deletions mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -97,7 +97,7 @@ private[ml] abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, O
override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
transformSchema(dataset.schema, paramMap, logging = true)
val map = this.paramMap ++ paramMap
dataset.select($"*", callUDF(
this.createTransformFunc(map), outputDataType, dataset(map(inputCol))).as(map(outputCol)))
dataset.withColumn(map(outputCol),
callUDF(this.createTransformFunc(map), outputDataType, dataset(map(inputCol))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.annotation.{DeveloperApi, AlphaComponent}
import org.apache.spark.ml.impl.estimator.{PredictionModel, Predictor, PredictorParams}
import org.apache.spark.ml.param.{Params, ParamMap, HasRawPredictionCol}
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{DataType, DoubleType, StructType}

Expand Down Expand Up @@ -180,24 +180,22 @@ private[ml] object ClassificationModel {
if (map(model.rawPredictionCol) != "") {
// output raw prediction
val features2raw: FeaturesType => Vector = model.predictRaw
tmpData = tmpData.select($"*",
callUDF(features2raw, new VectorUDT,
col(map(model.featuresCol))).as(map(model.rawPredictionCol)))
tmpData = tmpData.withColumn(map(model.rawPredictionCol),
callUDF(features2raw, new VectorUDT, col(map(model.featuresCol))))
numColsOutput += 1
if (map(model.predictionCol) != "") {
val raw2pred: Vector => Double = (rawPred) => {
rawPred.toArray.zipWithIndex.maxBy(_._1)._2
}
tmpData = tmpData.select($"*", callUDF(raw2pred, DoubleType,
col(map(model.rawPredictionCol))).as(map(model.predictionCol)))
tmpData = tmpData.withColumn(map(model.predictionCol),
callUDF(raw2pred, DoubleType, col(map(model.rawPredictionCol))))
numColsOutput += 1
}
} else if (map(model.predictionCol) != "") {
// output prediction
val features2pred: FeaturesType => Double = model.predict
tmpData = tmpData.select($"*",
callUDF(features2pred, DoubleType,
col(map(model.featuresCol))).as(map(model.predictionCol)))
tmpData = tmpData.withColumn(map(model.predictionCol),
callUDF(features2pred, DoubleType, col(map(model.featuresCol))))
numColsOutput += 1
}
(numColsOutput, tmpData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.linalg.{VectorUDT, BLAS, Vector, Vectors}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -124,44 +124,39 @@ class LogisticRegressionModel private[ml] (
var numColsOutput = 0
if (map(rawPredictionCol) != "") {
val features2raw: Vector => Vector = (features) => predictRaw(features)
tmpData = tmpData.select($"*",
callUDF(features2raw, new VectorUDT, col(map(featuresCol))).as(map(rawPredictionCol)))
tmpData = tmpData.withColumn(map(rawPredictionCol),
callUDF(features2raw, new VectorUDT, col(map(featuresCol))))
numColsOutput += 1
}
if (map(probabilityCol) != "") {
if (map(rawPredictionCol) != "") {
val raw2prob: Vector => Vector = { (rawPreds: Vector) =>
val raw2prob = udf { (rawPreds: Vector) =>
val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1)))
Vectors.dense(1.0 - prob1, prob1)
Vectors.dense(1.0 - prob1, prob1): Vector
}
tmpData = tmpData.select($"*",
callUDF(raw2prob, new VectorUDT, col(map(rawPredictionCol))).as(map(probabilityCol)))
tmpData = tmpData.withColumn(map(probabilityCol), raw2prob(col(map(rawPredictionCol))))
} else {
val features2prob: Vector => Vector = (features: Vector) => predictProbabilities(features)
tmpData = tmpData.select($"*",
callUDF(features2prob, new VectorUDT, col(map(featuresCol))).as(map(probabilityCol)))
val features2prob = udf { (features: Vector) => predictProbabilities(features) : Vector }
tmpData = tmpData.withColumn(map(probabilityCol), features2prob(col(map(featuresCol))))
}
numColsOutput += 1
}
if (map(predictionCol) != "") {
val t = map(threshold)
if (map(probabilityCol) != "") {
val predict: Vector => Double = { probs: Vector =>
val predict = udf { probs: Vector =>
if (probs(1) > t) 1.0 else 0.0
}
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, col(map(probabilityCol))).as(map(predictionCol)))
tmpData = tmpData.withColumn(map(predictionCol), predict(col(map(probabilityCol))))
} else if (map(rawPredictionCol) != "") {
val predict: Vector => Double = { rawPreds: Vector =>
val predict = udf { rawPreds: Vector =>
val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1)))
if (prob1 > t) 1.0 else 0.0
}
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, col(map(rawPredictionCol))).as(map(predictionCol)))
tmpData = tmpData.withColumn(map(predictionCol), predict(col(map(rawPredictionCol))))
} else {
val predict: Vector => Double = (features: Vector) => this.predict(features)
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, col(map(featuresCol))).as(map(predictionCol)))
val predict = udf { features: Vector => this.predict(features) }
tmpData = tmpData.withColumn(map(predictionCol), predict(col(map(featuresCol))))
}
numColsOutput += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.annotation.{AlphaComponent, DeveloperApi}
import org.apache.spark.ml.param.{HasProbabilityCol, ParamMap, Params}
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, StructType}


Expand Down Expand Up @@ -120,8 +120,8 @@ private[spark] abstract class ProbabilisticClassificationModel[
val features2probs: FeaturesType => Vector = (features) => {
tmpModel.predictProbabilities(features)
}
outputData.select($"*",
callUDF(features2probs, new VectorUDT, col(map(featuresCol))).as(map(probabilityCol)))
outputData.withColumn(map(probabilityCol),
callUDF(features2probs, new VectorUDT, col(map(featuresCol))))
} else {
if (numColsOutput == 0) {
this.logWarning(s"$uid: ProbabilisticClassificationModel.transform() was called as NOOP" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.mllib.feature
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql._
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}

/**
Expand Down Expand Up @@ -82,7 +82,7 @@ class StandardScalerModel private[ml] (
transformSchema(dataset.schema, paramMap, logging = true)
val map = this.paramMap ++ paramMap
val scale = udf((v: Vector) => { scaler.transform(v) } : Vector)
dataset.select($"*", scale(col(map(inputCol))).as(map(outputCol)))
dataset.withColumn(map(outputCol), scale(col(map(inputCol))))
}

private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.mllib.linalg.{VectorUDT, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, DoubleType, StructType}


Expand Down Expand Up @@ -209,7 +209,7 @@ private[spark] abstract class PredictionModel[FeaturesType, M <: PredictionModel
val pred: FeaturesType => Double = (features) => {
tmpModel.predict(features)
}
dataset.select($"*", callUDF(pred, DoubleType, col(map(featuresCol))).as(map(predictionCol)))
dataset.withColumn(map(predictionCol), callUDF(pred, DoubleType, col(map(featuresCol))))
} else {
this.logWarning(s"$uid: Predictor.transform() was called as NOOP" +
" since no output columns were set.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.mllib.optimization.NNLS
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType, StructField, StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -124,8 +124,8 @@ class ALSModel private[ml] (
override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
import dataset.sqlContext.implicits._
val map = this.paramMap ++ paramMap
val users = userFactors.toDataFrame("id", "features")
val items = itemFactors.toDataFrame("id", "features")
val users = userFactors.toDF("id", "features")
val items = itemFactors.toDF("id", "features")

// Register a UDF for DataFrame, and then
// create a new column named map(predictionCol) by running the predict UDF.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))

// Create Parquet data.
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDataFrame
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF
dataRDD.saveAsParquetFile(dataPath(path))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[classification] object GLMClassificationModel {

// Create Parquet data.
val data = Data(weights, intercept, threshold)
sc.parallelize(Seq(data), 1).toDataFrame.saveAsParquetFile(Loader.dataPath(path))
sc.parallelize(Seq(data), 1).toDF.saveAsParquetFile(Loader.dataPath(path))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
model.userFeatures.toDataFrame("id", "features").saveAsParquetFile(userPath(path))
model.productFeatures.toDataFrame("id", "features").saveAsParquetFile(productPath(path))
model.userFeatures.toDF("id", "features").saveAsParquetFile(userPath(path))
model.productFeatures.toDF("id", "features").saveAsParquetFile(productPath(path))
}

def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[regression] object GLMRegressionModel {

// Create Parquet data.
val data = Data(weights, intercept)
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDataFrame
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF
// TODO: repartition with 1 partition after SPARK-5532 gets fixed
dataRDD.saveAsParquetFile(Loader.dataPath(path))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] {
val nodes = model.topNode.subtreeIterator.toSeq
val dataRDD: DataFrame = sc.parallelize(nodes)
.map(NodeData.apply(0, _))
.toDataFrame
.toDF
dataRDD.saveAsParquetFile(Loader.dataPath(path))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ private[tree] object TreeEnsembleModel {
// Create Parquet data.
val dataRDD = sc.parallelize(model.trees.zipWithIndex).flatMap { case (tree, treeId) =>
tree.topNode.subtreeIterator.toSeq.map(node => NodeData(treeId, node))
}.toDataFrame
}.toDF
dataRDD.saveAsParquetFile(Loader.dataPath(path))
}

Expand Down
Loading

0 comments on commit 6b3086c

Please sign in to comment.