Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13817][BUILD][SQL] Re-enable MiMA and removes object DataFrame #11656

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,7 @@ def main():
# backwards compatibility checks
if build_tool == "sbt":
# Note: compatibility tests only supported in sbt for now
# TODO Temporarily disable MiMA check for DF-to-DS migration prototyping
# detect_binary_inop_with_mima()
detect_binary_inop_with_mima()
# Since we did not build assembly/assembly before running dev/mima, we need to
# do it here because the tests still rely on it; see SPARK-13294 for details.
build_spark_assembly_sbt(hadoop_version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.spark.ml.regression.AFTSurvivalRegression;
import org.apache.spark.ml.regression.AFTSurvivalRegressionModel;
import org.apache.spark.mllib.linalg.*;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.types.DataTypes;
// $example off$

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// $example off$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
Expand Down
22 changes: 22 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,28 @@ object MimaExcludes {
// SPARK-12073: backpressure rate controller consumes events preferentially from lagging partitions
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.KafkaTestUtils.createTopic"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.DirectKafkaInputDStream.maxMessagesPerPartition")
) ++ Seq(
// [SPARK-13244][SQL] Migrates DataFrame to Dataset
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.apply"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.toDF"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.toDF"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.copy"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.copy$default$1"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.df$1"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.tables"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.tables"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.sql"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.baseRelationToDataFrame"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.table"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrame.apply"),

ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrame"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrame$"),

ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.evaluation.MultilabelMetrics.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.predictions"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.predictions")
)
case v if v.startsWith("1.6") =>
Seq(
Expand Down
16 changes: 7 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,16 @@ import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

private[sql] object DataFrame {
def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
val qe = sqlContext.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema))
}
}

private[sql] object Dataset {
def apply[T: Encoder](sqlContext: SQLContext, logicalPlan: LogicalPlan): Dataset[T] = {
new Dataset(sqlContext, logicalPlan, implicitly[Encoder[T]])
}

def newDataFrame(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
val qe = sqlContext.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema))
}
}

/**
Expand Down Expand Up @@ -2129,7 +2127,7 @@ class Dataset[T] private[sql](

/** A convenient function to wrap a logical plan and produce a DataFrame. */
@inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
DataFrame(sqlContext, logicalPlan)
Dataset.newDataFrame(sqlContext, logicalPlan)
}

/** A convenient function to wrap a logical plan and produce a DataFrame. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap)
DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation()))
Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation()))
}

/**
Expand Down Expand Up @@ -175,7 +175,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap)
DataFrame(sqlContext, StreamingRelation(dataSource.createSource()))
Dataset.newDataFrame(sqlContext, StreamingRelation(dataSource.createSource()))
}

/**
Expand Down Expand Up @@ -345,7 +345,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
InferSchema.infer(jsonRDD, sqlContext.conf.columnNameOfCorruptRecord, parsedOptions)
}

DataFrame(
Dataset.newDataFrame(
sqlContext,
LogicalRDD(
schema.toAttributes,
Expand Down Expand Up @@ -393,7 +393,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* @since 1.4.0
*/
def table(tableName: String): DataFrame = {
DataFrame(sqlContext,
Dataset.newDataFrame(sqlContext,
sqlContext.catalog.lookupRelation(sqlContext.sqlParser.parseTableIdentifier(tableName)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@ class GroupedData protected[sql](

groupType match {
case GroupedData.GroupByType =>
DataFrame(
Dataset.newDataFrame(
df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
case GroupedData.RollupType =>
DataFrame(
Dataset.newDataFrame(
df.sqlContext, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))
case GroupedData.CubeType =>
DataFrame(
Dataset.newDataFrame(
df.sqlContext, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))
case GroupedData.PivotType(pivotCol, values) =>
val aliasedGrps = groupingExprs.map(alias)
DataFrame(
Dataset.newDataFrame(
df.sqlContext, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class GroupedDataset[K, V] private[sql](

private def groupedData =
new GroupedData(
DataFrame(sqlContext, logicalPlan), groupingAttributes, GroupedData.GroupByType)
Dataset.newDataFrame(sqlContext, logicalPlan), groupingAttributes, GroupedData.GroupByType)

/**
* Returns a new [[GroupedDataset]] where the type of the key has been mapped to the specified
Expand Down
28 changes: 14 additions & 14 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ class SQLContext private[sql](
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val attributeSeq = schema.toAttributes
val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType))
DataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self))
Dataset.newDataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self))
}

/**
Expand All @@ -389,7 +389,7 @@ class SQLContext private[sql](
SQLContext.setActive(self)
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val attributeSeq = schema.toAttributes
DataFrame(self, LocalRelation.fromProduct(attributeSeq, data))
Dataset.newDataFrame(self, LocalRelation.fromProduct(attributeSeq, data))
}

/**
Expand All @@ -399,7 +399,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
DataFrame(this, LogicalRelation(baseRelation))
Dataset.newDataFrame(this, LogicalRelation(baseRelation))
}

/**
Expand Down Expand Up @@ -454,7 +454,7 @@ class SQLContext private[sql](
rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
}
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
DataFrame(this, logicalPlan)
Dataset.newDataFrame(this, logicalPlan)
}


Expand Down Expand Up @@ -489,7 +489,7 @@ class SQLContext private[sql](
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
// schema differs from the existing schema on any field data type.
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
DataFrame(this, logicalPlan)
Dataset.newDataFrame(this, logicalPlan)
}

/**
Expand Down Expand Up @@ -517,7 +517,7 @@ class SQLContext private[sql](
*/
@DeveloperApi
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = {
DataFrame(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala))
Dataset.newDataFrame(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala))
}

/**
Expand All @@ -536,7 +536,7 @@ class SQLContext private[sql](
val localBeanInfo = Introspector.getBeanInfo(Utils.classForName(className))
SQLContext.beansToRows(iter, localBeanInfo, attributeSeq)
}
DataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this))
Dataset.newDataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this))
}

/**
Expand Down Expand Up @@ -564,7 +564,7 @@ class SQLContext private[sql](
val className = beanClass.getName
val beanInfo = Introspector.getBeanInfo(beanClass)
val rows = SQLContext.beansToRows(data.asScala.iterator, beanInfo, attrSeq)
DataFrame(self, LocalRelation(attrSeq, rows.toSeq))
Dataset.newDataFrame(self, LocalRelation(attrSeq, rows.toSeq))
}

/**
Expand Down Expand Up @@ -770,7 +770,7 @@ class SQLContext private[sql](
*/
@Experimental
def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame = {
DataFrame(this, Range(start, end, step, numPartitions))
Dataset.newDataFrame(this, Range(start, end, step, numPartitions))
}

/**
Expand All @@ -781,7 +781,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def sql(sqlText: String): DataFrame = {
DataFrame(this, parseSql(sqlText))
Dataset.newDataFrame(this, parseSql(sqlText))
}

/**
Expand All @@ -795,7 +795,7 @@ class SQLContext private[sql](
}

private def table(tableIdent: TableIdentifier): DataFrame = {
DataFrame(this, catalog.lookupRelation(tableIdent))
Dataset.newDataFrame(this, catalog.lookupRelation(tableIdent))
}

/**
Expand All @@ -807,7 +807,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tables(): DataFrame = {
DataFrame(this, ShowTablesCommand(None))
Dataset.newDataFrame(this, ShowTablesCommand(None))
}

/**
Expand All @@ -819,7 +819,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tables(databaseName: String): DataFrame = {
DataFrame(this, ShowTablesCommand(Some(databaseName)))
Dataset.newDataFrame(this, ShowTablesCommand(Some(databaseName)))
}

/**
Expand Down Expand Up @@ -886,7 +886,7 @@ class SQLContext private[sql](
schema: StructType): DataFrame = {

val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow])
DataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
Dataset.newDataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.NoSuchElementException

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.{Dataset, Row, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
Expand Down Expand Up @@ -252,7 +252,7 @@ case class CacheTableCommand(

override def run(sqlContext: SQLContext): Seq[Row] = {
plan.foreach { logicalPlan =>
sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName)
sqlContext.registerDataFrameAsTable(Dataset.newDataFrame(sqlContext, logicalPlan), tableName)
}
sqlContext.cacheTable(tableName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source}
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -154,7 +154,7 @@ case class DataSource(
}

def dataFrameBuilder(files: Array[String]): DataFrame = {
DataFrame(
Dataset.newDataFrame(
sqlContext,
LogicalRelation(
DataSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[sql] case class InsertIntoDataSource(

override def run(sqlContext: SQLContext): Seq[Row] = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
val data = DataFrame(sqlContext, query)
val data = Dataset.newDataFrame(sqlContext, query)
// Apply the schema of the existing table to the new data.
val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
relation.insert(df, overwrite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
val partitionSet = AttributeSet(partitionColumns)
val dataColumns = query.output.filterNot(partitionSet.contains)

val queryExecution = DataFrame(sqlContext, query).queryExecution
val queryExecution = Dataset.newDataFrame(sqlContext, query).queryExecution
SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
val relation =
WriteRelation(
Expand Down
Loading