From 8e59a7450db57ca210010fa3df1cb41a84311fed Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 18 Mar 2016 03:36:40 +0000 Subject: [PATCH 1/4] Add more not null attributes for Filter codegen. --- .../org/apache/spark/sql/execution/basicOperators.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 6e2a5aa4f97c7..7fec582e82920 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -79,12 +79,13 @@ case class Filter(condition: Expression, child: SparkPlan) // Split out all the IsNotNulls from condition. private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition { - case IsNotNull(a) if child.output.contains(a) => true + case IsNotNull(a) if a.references.subsetOf(child.outputSet) => true case _ => false } // The columns that will filtered out by `IsNotNull` could be considered as not nullable. - private val notNullAttributes = notNullPreds.flatMap(_.references) + private val notNullAttributes = + notNullPreds.flatMap(_.references) ++ child.output.filterNot(_.nullable) override def output: Seq[Attribute] = { child.output.map { a => From 376c6f0f57767045732c47d20fa682d12e4f8c6d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 18 Mar 2016 06:05:42 +0000 Subject: [PATCH 2/4] Fix bug. --- .../org/apache/spark/sql/execution/basicOperators.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 7fec582e82920..ca27d42c88186 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -85,7 +85,7 @@ case class Filter(condition: Expression, child: SparkPlan) // The columns that will filtered out by `IsNotNull` could be considered as not nullable. private val notNullAttributes = - notNullPreds.flatMap(_.references) ++ child.output.filterNot(_.nullable) + (notNullPreds.flatMap(_.references) ++ child.output.filterNot(_.nullable)).distinct override def output: Seq[Attribute] = { child.output.map { a => @@ -114,7 +114,11 @@ case class Filter(condition: Expression, child: SparkPlan) // filter out the nulls val filterOutNull = notNullAttributes.map { a => val idx = child.output.indexOf(a) - s"if (${input(idx).isNull}) continue;" + if (idx != -1) { + s"if (${input(idx).isNull}) continue;" + } else { + "" + } }.mkString("\n") ctx.currentVars = input From 3d732491bce8898ff5ab167b792c9929c9471c1f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 18 Mar 2016 08:09:32 +0000 Subject: [PATCH 3/4] Fix it. --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index ca27d42c88186..5f60f7f20d26f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -84,8 +84,7 @@ case class Filter(condition: Expression, child: SparkPlan) } // The columns that will filtered out by `IsNotNull` could be considered as not nullable. - private val notNullAttributes = - (notNullPreds.flatMap(_.references) ++ child.output.filterNot(_.nullable)).distinct + private val notNullAttributes = notNullPreds.flatMap(_.references).distinct override def output: Seq[Attribute] = { child.output.map { a => From 662c4e35b3c96df69f5c712a28e302abd623231e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 19 Mar 2016 02:24:58 +0000 Subject: [PATCH 4/4] Use exprId to deal with different qualifiers between child.outout and attributes from conditions. --- .../org/apache/spark/sql/execution/basicOperators.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 5f60f7f20d26f..8aa64aa524c7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -84,11 +84,11 @@ case class Filter(condition: Expression, child: SparkPlan) } // The columns that will filtered out by `IsNotNull` could be considered as not nullable. - private val notNullAttributes = notNullPreds.flatMap(_.references).distinct + private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId) override def output: Seq[Attribute] = { child.output.map { a => - if (a.nullable && notNullAttributes.contains(a)) { + if (a.nullable && notNullAttributes.contains(a.exprId)) { a.withNullability(false) } else { a @@ -112,7 +112,7 @@ case class Filter(condition: Expression, child: SparkPlan) // filter out the nulls val filterOutNull = notNullAttributes.map { a => - val idx = child.output.indexOf(a) + val idx = child.output.map(_.exprId).indexOf(a) if (idx != -1) { s"if (${input(idx).isNull}) continue;" } else { @@ -139,7 +139,7 @@ case class Filter(condition: Expression, child: SparkPlan) // Reset the isNull to false for the not-null columns, then the followed operators could // generate better code (remove dead branches). val resultVars = input.zipWithIndex.map { case (ev, i) => - if (notNullAttributes.contains(child.output(i))) { + if (notNullAttributes.contains(child.output(i).exprId)) { ev.isNull = "false" } ev