Skip to content

Commit

Permalink
Re-enable MiMA and removes object DataFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Mar 11, 2016
1 parent 8fff0f9 commit b46e876
Show file tree
Hide file tree
Showing 33 changed files with 78 additions and 69 deletions.
3 changes: 1 addition & 2 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,7 @@ def main():
# backwards compatibility checks
if build_tool == "sbt":
# Note: compatibility tests only supported in sbt for now
# TODO Temporarily disable MiMA check for DF-to-DS migration prototyping
# detect_binary_inop_with_mima()
detect_binary_inop_with_mima()
# Since we did not build assembly/assembly before running dev/mima, we need to
# do it here because the tests still rely on it; see SPARK-13294 for details.
build_spark_assembly_sbt(hadoop_version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.spark.ml.regression.AFTSurvivalRegression;
import org.apache.spark.ml.regression.AFTSurvivalRegressionModel;
import org.apache.spark.mllib.linalg.*;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.types.DataTypes;
// $example off$

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// $example off$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
Expand Down
22 changes: 22 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,28 @@ object MimaExcludes {
// SPARK-12073: backpressure rate controller consumes events preferentially from lagging partitions
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.KafkaTestUtils.createTopic"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.DirectKafkaInputDStream.maxMessagesPerPartition")
) ++ Seq(
// [SPARK-13244][SQL] Migrates DataFrame to Dataset
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.apply"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.toDF"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.toDF"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.copy"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.copy$default$1"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.df$1"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.tables"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.tables"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.sql"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.baseRelationToDataFrame"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.table"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrame.apply"),

ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrame"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrame$"),

ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.evaluation.MultilabelMetrics.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.predictions"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.predictions")
)
case v if v.startsWith("1.6") =>
Seq(
Expand Down
16 changes: 7 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,16 @@ import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

private[sql] object DataFrame {
def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
val qe = sqlContext.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema))
}
}

private[sql] object Dataset {
def apply[T: Encoder](sqlContext: SQLContext, logicalPlan: LogicalPlan): Dataset[T] = {
new Dataset(sqlContext, logicalPlan, implicitly[Encoder[T]])
}

def newDataFrame(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
val qe = sqlContext.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema))
}
}

/**
Expand Down Expand Up @@ -2129,7 +2127,7 @@ class Dataset[T] private[sql](

/** A convenient function to wrap a logical plan and produce a DataFrame. */
@inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
DataFrame(sqlContext, logicalPlan)
Dataset.newDataFrame(sqlContext, logicalPlan)
}

/** A convenient function to wrap a logical plan and produce a DataFrame. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap)
DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation()))
Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation()))
}

/**
Expand Down Expand Up @@ -175,7 +175,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap)
DataFrame(sqlContext, StreamingRelation(dataSource.createSource()))
Dataset.newDataFrame(sqlContext, StreamingRelation(dataSource.createSource()))
}

/**
Expand Down Expand Up @@ -345,7 +345,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
InferSchema.infer(jsonRDD, sqlContext.conf.columnNameOfCorruptRecord, parsedOptions)
}

DataFrame(
Dataset.newDataFrame(
sqlContext,
LogicalRDD(
schema.toAttributes,
Expand Down Expand Up @@ -393,7 +393,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* @since 1.4.0
*/
def table(tableName: String): DataFrame = {
DataFrame(sqlContext,
Dataset.newDataFrame(sqlContext,
sqlContext.catalog.lookupRelation(sqlContext.sqlParser.parseTableIdentifier(tableName)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@ class GroupedData protected[sql](

groupType match {
case GroupedData.GroupByType =>
DataFrame(
Dataset.newDataFrame(
df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
case GroupedData.RollupType =>
DataFrame(
Dataset.newDataFrame(
df.sqlContext, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))
case GroupedData.CubeType =>
DataFrame(
Dataset.newDataFrame(
df.sqlContext, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))
case GroupedData.PivotType(pivotCol, values) =>
val aliasedGrps = groupingExprs.map(alias)
DataFrame(
Dataset.newDataFrame(
df.sqlContext, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class GroupedDataset[K, V] private[sql](

private def groupedData =
new GroupedData(
DataFrame(sqlContext, logicalPlan), groupingAttributes, GroupedData.GroupByType)
Dataset.newDataFrame(sqlContext, logicalPlan), groupingAttributes, GroupedData.GroupByType)

/**
* Returns a new [[GroupedDataset]] where the type of the key has been mapped to the specified
Expand Down
28 changes: 14 additions & 14 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ class SQLContext private[sql](
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val attributeSeq = schema.toAttributes
val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType))
DataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self))
Dataset.newDataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self))
}

/**
Expand All @@ -389,7 +389,7 @@ class SQLContext private[sql](
SQLContext.setActive(self)
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val attributeSeq = schema.toAttributes
DataFrame(self, LocalRelation.fromProduct(attributeSeq, data))
Dataset.newDataFrame(self, LocalRelation.fromProduct(attributeSeq, data))
}

/**
Expand All @@ -399,7 +399,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
DataFrame(this, LogicalRelation(baseRelation))
Dataset.newDataFrame(this, LogicalRelation(baseRelation))
}

/**
Expand Down Expand Up @@ -454,7 +454,7 @@ class SQLContext private[sql](
rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
}
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
DataFrame(this, logicalPlan)
Dataset.newDataFrame(this, logicalPlan)
}


Expand Down Expand Up @@ -489,7 +489,7 @@ class SQLContext private[sql](
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
// schema differs from the existing schema on any field data type.
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
DataFrame(this, logicalPlan)
Dataset.newDataFrame(this, logicalPlan)
}

/**
Expand Down Expand Up @@ -517,7 +517,7 @@ class SQLContext private[sql](
*/
@DeveloperApi
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = {
DataFrame(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala))
Dataset.newDataFrame(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala))
}

/**
Expand All @@ -536,7 +536,7 @@ class SQLContext private[sql](
val localBeanInfo = Introspector.getBeanInfo(Utils.classForName(className))
SQLContext.beansToRows(iter, localBeanInfo, attributeSeq)
}
DataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this))
Dataset.newDataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this))
}

/**
Expand Down Expand Up @@ -564,7 +564,7 @@ class SQLContext private[sql](
val className = beanClass.getName
val beanInfo = Introspector.getBeanInfo(beanClass)
val rows = SQLContext.beansToRows(data.asScala.iterator, beanInfo, attrSeq)
DataFrame(self, LocalRelation(attrSeq, rows.toSeq))
Dataset.newDataFrame(self, LocalRelation(attrSeq, rows.toSeq))
}

/**
Expand Down Expand Up @@ -770,7 +770,7 @@ class SQLContext private[sql](
*/
@Experimental
def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame = {
DataFrame(this, Range(start, end, step, numPartitions))
Dataset.newDataFrame(this, Range(start, end, step, numPartitions))
}

/**
Expand All @@ -781,7 +781,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def sql(sqlText: String): DataFrame = {
DataFrame(this, parseSql(sqlText))
Dataset.newDataFrame(this, parseSql(sqlText))
}

/**
Expand All @@ -795,7 +795,7 @@ class SQLContext private[sql](
}

private def table(tableIdent: TableIdentifier): DataFrame = {
DataFrame(this, catalog.lookupRelation(tableIdent))
Dataset.newDataFrame(this, catalog.lookupRelation(tableIdent))
}

/**
Expand All @@ -807,7 +807,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tables(): DataFrame = {
DataFrame(this, ShowTablesCommand(None))
Dataset.newDataFrame(this, ShowTablesCommand(None))
}

/**
Expand All @@ -819,7 +819,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tables(databaseName: String): DataFrame = {
DataFrame(this, ShowTablesCommand(Some(databaseName)))
Dataset.newDataFrame(this, ShowTablesCommand(Some(databaseName)))
}

/**
Expand Down Expand Up @@ -886,7 +886,7 @@ class SQLContext private[sql](
schema: StructType): DataFrame = {

val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow])
DataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
Dataset.newDataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.NoSuchElementException

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.{Dataset, Row, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
Expand Down Expand Up @@ -252,7 +252,7 @@ case class CacheTableCommand(

override def run(sqlContext: SQLContext): Seq[Row] = {
plan.foreach { logicalPlan =>
sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName)
sqlContext.registerDataFrameAsTable(Dataset.newDataFrame(sqlContext, logicalPlan), tableName)
}
sqlContext.cacheTable(tableName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source}
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -154,7 +154,7 @@ case class DataSource(
}

def dataFrameBuilder(files: Array[String]): DataFrame = {
DataFrame(
Dataset.newDataFrame(
sqlContext,
LogicalRelation(
DataSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[sql] case class InsertIntoDataSource(

override def run(sqlContext: SQLContext): Seq[Row] = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
val data = DataFrame(sqlContext, query)
val data = Dataset.newDataFrame(sqlContext, query)
// Apply the schema of the existing table to the new data.
val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
relation.insert(df, overwrite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
val partitionSet = AttributeSet(partitionColumns)
val dataColumns = query.output.filterNot(partitionSet.contains)

val queryExecution = DataFrame(sqlContext, query).queryExecution
val queryExecution = Dataset.newDataFrame(sqlContext, query).queryExecution
SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
val relation =
WriteRelation(
Expand Down
Loading

0 comments on commit b46e876

Please sign in to comment.