Skip to content

Commit

Permalink
[SPARK-19650] Commands should not trigger a Spark job
Browse files Browse the repository at this point in the history
Spark executes SQL commands eagerly. It does this by creating an RDD which contains the command's results. The downside to this is that any action on this RDD triggers a Spark job which is expensive and is unnecessary.

This PR fixes this by avoiding the materialization of an `RDD` for `Command`s; it just materializes the result and puts them in a `LocalRelation`.

Added a regression test to `SQLQuerySuite`.

Author: Herman van Hovell <[email protected]>

Closes #17027 from hvanhovell/no-job-command.
  • Loading branch information
hvanhovell authored and cloud-fan committed Feb 25, 2017
1 parent 4cb025a commit 8f0511e
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 23 deletions.
20 changes: 7 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
import org.apache.spark.sql.catalyst.util.{usePrettyExpression, DateTimeUtils}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.DataStreamWriter
Expand Down Expand Up @@ -175,19 +175,13 @@ class Dataset[T] private[sql](
}

@transient private[sql] val logicalPlan: LogicalPlan = {
def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
case _: Command |
_: InsertIntoTable => true
case _ => false
}

// For various commands (like DDL) and queries with side effects, we force query execution
// to happen right away to let these side effects take place eagerly.
queryExecution.analyzed match {
// For various commands (like DDL) and queries with side effects, we force query execution
// to happen right away to let these side effects take place eagerly.
case p if hasSideEffects(p) =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession)
case Union(children) if children.forall(hasSideEffects) =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sparkSession)
case c: Command =>
LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
case _ =>
queryExecution.analyzed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
// SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp.
case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] =>
command.executeCollect().map(_.getString(1))
case command: ExecutedCommandExec =>
command.executeCollect().map(_.getString(0))
case other =>
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, Strategy}
import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution.streaming._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ SET spark.sql.caseSensitive=false
-- !query 19 schema
struct<key:string,value:string>
-- !query 19 output
spark.sql.caseSensitive
spark.sql.caseSensitive false


-- !query 20
Expand All @@ -212,7 +212,7 @@ SET spark.sql.caseSensitive=true
-- !query 21 schema
struct<key:string,value:string>
-- !query 21 output
spark.sql.caseSensitive
spark.sql.caseSensitive true


-- !query 22
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ set spark.sql.groupByOrdinal=false
-- !query 17 schema
struct<key:string,value:string>
-- !query 17 output
spark.sql.groupByOrdinal
spark.sql.groupByOrdinal false


-- !query 18
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ set spark.sql.orderByOrdinal=false
-- !query 9 schema
struct<key:string,value:string>
-- !query 9 output
spark.sql.orderByOrdinal
spark.sql.orderByOrdinal false


-- !query 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ set spark.sql.crossJoin.enabled = true
-- !query 5 schema
struct<key:string,value:string>
-- !query 5 output
spark.sql.crossJoin.enabled
spark.sql.crossJoin.enabled true


-- !query 6
Expand All @@ -85,4 +85,4 @@ set spark.sql.crossJoin.enabled = false
-- !query 7 schema
struct<key:string,value:string>
-- !query 7 output
spark.sql.crossJoin.enabled
spark.sql.crossJoin.enabled false
25 changes: 25 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package org.apache.spark.sql
import java.io.File
import java.math.MathContext
import java.sql.Timestamp
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
Expand Down Expand Up @@ -2564,4 +2566,27 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(sql(badQuery), Row(1) :: Nil)
}

test("SPARK-19650: An action on a Command should not trigger a Spark job") {
// Create a listener that checks if new jobs have started.
val jobStarted = new AtomicBoolean(false)
val listener = new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
jobStarted.set(true)
}
}

// Make sure no spurious job starts are pending in the listener bus.
sparkContext.listenerBus.waitUntilEmpty(500)
sparkContext.addSparkListener(listener)
try {
// Execute the command.
sql("show databases").head()

// Make sure we have seen all events triggered by DataFrame.show()
sparkContext.listenerBus.waitUntilEmpty(500)
} finally {
sparkContext.removeSparkListener(listener)
}
assert(!jobStarted.get(), "Command should not trigger a Spark job.")
}
}

0 comments on commit 8f0511e

Please sign in to comment.