diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index fbe8e5055a25c..e45038339520b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} +import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer @@ -45,6 +46,11 @@ object SparkPlan { /** The [[LogicalPlan]] inherited from its ancestor. */ val LOGICAL_PLAN_INHERITED_TAG = TreeNodeTag[LogicalPlan]("logical_plan_inherited") + + private val nextPlanId = new AtomicInteger(0) + + /** Register a new SparkPlan, returning its SparkPlan ID */ + private[execution] def newPlanId(): Int = nextPlanId.getAndIncrement() } /** @@ -63,6 +69,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def sparkContext = sqlContext.sparkContext + val id: Int = SparkPlan.newPlanId() + // sqlContext will be null when SparkPlan nodes are created without the active sessions. val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) { sqlContext.conf.subexpressionEliminationEnabled diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 56e81ce1b95a5..ce91f37f85209 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -717,6 +717,8 @@ case class SubqueryExec(name: String, child: SparkPlan) override def executeCollect(): Array[InternalRow] = { ThreadUtils.awaitResult(relationFuture, Duration.Inf) } + + override def stringArgs: Iterator[Any] = super.stringArgs ++ Iterator(s"[id=#$id]") } object SubqueryExec { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala index 1a5b7599bb7d9..9cfb62b719985 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala @@ -39,6 +39,8 @@ import org.apache.spark.sql.types.StructType */ abstract class Exchange extends UnaryExecNode { override def output: Seq[Attribute] = child.output + + override def stringArgs: Iterator[Any] = super.stringArgs ++ Iterator(s"[id=#$id]") } /**