Skip to content

Commit

Permalink
[SPARK-17230] [SQL] Should not pass optimized query into QueryExecuti…
Browse files Browse the repository at this point in the history
…on in DataFrameWriter

## What changes were proposed in this pull request?

Some analyzer rules have assumptions on logical plans, optimizer may break these assumption, we should not pass an optimized query plan into QueryExecution (will be analyzed again), otherwise we may some weird bugs.

For example, we have a rule for decimal calculation to promote the precision before binary operations, use PromotePrecision as placeholder to indicate that this rule should not apply twice. But a Optimizer rule will remove this placeholder, that break the assumption, then the rule applied twice, cause wrong result.

Ideally, we should make all the analyzer rules all idempotent, that may require lots of effort to double checking them one by one (may be not easy).

An easier approach could be never feed a optimized plan into Analyzer, this PR fix the case for RunnableComand, they will be optimized, during execution, the passed `query` will also be passed into QueryExecution again. This PR make these `query` not part of the children, so they will not be optimized and analyzed again.

Right now, we did not know a logical plan is optimized or not, we could introduce a flag for that, and make sure a optimized logical plan will not be analyzed again.

## How was this patch tested?

Added regression tests.

Author: Davies Liu <[email protected]>

Closes #14797 from davies/fix_writer.
  • Loading branch information
Davies Liu authored and davies committed Sep 2, 2016
1 parent eac1d0e commit ed9c884
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.types._
*/
trait RunnableCommand extends LogicalPlan with logical.Command {
override def output: Seq[Attribute] = Seq.empty
override def children: Seq[LogicalPlan] = Seq.empty
final override def children: Seq[LogicalPlan] = Seq.empty
def run(sparkSession: SparkSession): Seq[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ case class CreateDataSourceTableAsSelectCommand(
query: LogicalPlan)
extends RunnableCommand {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override protected def innerChildren: Seq[LogicalPlan] = Seq(query)

override def run(sparkSession: SparkSession): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
Expand Down Expand Up @@ -479,13 +480,23 @@ case class DataSource(
}
}

// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
// not need to have the query as child, to avoid to analyze an optimized query,
// because InsertIntoHadoopFsRelationCommand will be optimized first.
val columns = partitionColumns.map { name =>
val plan = data.logicalPlan
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"Unable to resolve ${name} given [${plan.output.map(_.name).mkString(", ")}]")
}.asInstanceOf[Attribute]
}
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
val plan =
InsertIntoHadoopFsRelationCommand(
outputPath,
partitionColumns.map(UnresolvedAttribute.quoted),
columns,
bucketSpec,
format,
() => Unit, // No existing table needs to be refreshed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {

InsertIntoHadoopFsRelationCommand(
outputPath,
t.partitionSchema.fields.map(_.name).map(UnresolvedAttribute(_)),
query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
t.bucketSpec,
t.fileFormat,
() => t.refresh(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class InsertIntoHadoopFsRelationCommand(
mode: SaveMode)
extends RunnableCommand {

override def children: Seq[LogicalPlan] = query :: Nil
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil

override def run(sparkSession: SparkSession): Seq[Row] = {
// Most formats don't do well with duplicate columns, so lets not allow that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,14 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
}
}

test("SPARK-17230: write out results of decimal calculation") {
val df = spark.range(99, 101)
.selectExpr("id", "cast(id as long) * cast('1.0' as decimal(38, 18)) as num")
df.write.mode(SaveMode.Overwrite).parquet(dir)
val df2 = spark.read.parquet(dir)
checkAnswer(df2, df)
}

private def testRead(
df: => DataFrame,
expectedResult: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class CreateHiveTableAsSelectCommand(

private val tableIdentifier = tableDesc.identifier

override def children: Seq[LogicalPlan] = Seq(query)
override def innerChildren: Seq[LogicalPlan] = Seq(query)

override def run(sparkSession: SparkSession): Seq[Row] = {
lazy val metastoreRelation: MetastoreRelation = {
Expand Down

0 comments on commit ed9c884

Please sign in to comment.