Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13996][SQL] Add more not null attributes for Filter codegen #11810

Closed
wants to merge 6 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Mar 18, 2016

What changes were proposed in this pull request?

JIRA: https://issues.apache.org/jira/browse/SPARK-13996

Filter codegen finds the attributes not null by checking IsNotNull(a) expression with a condition if child.output.contains(a). However, the current approach to checking it is not comprehensive. We can improve it.

E.g., for this plan:

val rdd = sqlContext.sparkContext.makeRDD(Seq(Row(1, "1"), Row(null, "1"), Row(2, "2")))
val schema = new StructType().add("k", IntegerType).add("v", StringType)
val smallDF = sqlContext.createDataFrame(rdd, schema)
val df = smallDF.filter("isnotnull(k + 1)")

The code snippet generated without this patch:

/* 031 */   protected void processNext() throws java.io.IOException {
/* 032 */     /*** PRODUCE: Filter isnotnull((k#0 + 1)) */
/* 033 */
/* 034 */     /*** PRODUCE: INPUT */
/* 035 */
/* 036 */     while (!shouldStop() && inputadapter_input.hasNext()) {
/* 037 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 038 */       /*** CONSUME: Filter isnotnull((k#0 + 1)) */
/* 039 */       /* input[0, int] */
/* 040 */       boolean filter_isNull = inputadapter_row.isNullAt(0);
/* 041 */       int filter_value = filter_isNull ? -1 : (inputadapter_row.getInt(0));
/* 042 */
/* 043 */       /* isnotnull((input[0, int] + 1)) */
/* 044 */       /* (input[0, int] + 1) */
/* 045 */       boolean filter_isNull3 = true;
/* 046 */       int filter_value3 = -1;
/* 047 */   
/* 048 */       if (!filter_isNull) {
/* 049 */         filter_isNull3 = false; // resultCode could change nullability.
/* 050 */         filter_value3 = filter_value + 1;
/* 051 */     
/* 052 */       }
/* 053 */       if (!(!(filter_isNull3))) continue;
/* 054 */   
/* 055 */       filter_metricValue.add(1);

With this patch:

/* 031 */   protected void processNext() throws java.io.IOException {
/* 032 */     /*** PRODUCE: Filter isnotnull((k#0 + 1)) */
/* 033 */
/* 034 */     /*** PRODUCE: INPUT */
/* 035 */
/* 036 */     while (!shouldStop() && inputadapter_input.hasNext()) {
/* 037 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 038 */       /*** CONSUME: Filter isnotnull((k#0 + 1)) */
/* 039 */       /* input[0, int] */
/* 040 */       boolean filter_isNull = inputadapter_row.isNullAt(0);
/* 041 */       int filter_value = filter_isNull ? -1 : (inputadapter_row.getInt(0));
/* 042 */
/* 043 */       if (filter_isNull) continue;
/* 044 */
/* 045 */       filter_metricValue.add(1);

How was this patch tested?

Existing tests.

@SparkQA
Copy link

SparkQA commented Mar 18, 2016

Test build #53498 has finished for PR 11810 at commit 8e59a74.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 18, 2016

Test build #53511 has finished for PR 11810 at commit 376c6f0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 18, 2016

Test build #53523 has finished for PR 11810 at commit 3d73249.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case _ => false
}

// The columns that will filtered out by `IsNotNull` could be considered as not nullable.
private val notNullAttributes = notNullPreds.flatMap(_.references)
private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The attributes from conditions can have different qualifiers than child.output, which makes later indexOf call failed. So here use exprId instead.

@SparkQA
Copy link

SparkQA commented Mar 19, 2016

Test build #53592 has finished for PR 11810 at commit 662c4e3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Mar 21, 2016

cc @davies

@viirya
Copy link
Member Author

viirya commented Mar 23, 2016

ping @davies Please take a look at this to see if it is good for you. Thanks.

@@ -79,16 +79,16 @@ case class Filter(condition: Expression, child: SparkPlan)

// Split out all the IsNotNulls from condition.
private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition {
case IsNotNull(a) if child.output.contains(a) => true
case IsNotNull(a) if a.references.subsetOf(child.outputSet) => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct, for example, IsNotNull(IsNull(a)) does not means a can not be null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, you are right. With the NullIntolerant trait introduced in #11809, we can solve this easily. Wait for that PR to be ready then we can re-visit this. Thanks!

viirya added 2 commits April 2, 2016 07:07
…-attrs

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@SparkQA
Copy link

SparkQA commented Apr 2, 2016

Test build #54767 has finished for PR 11810 at commit 0c7afe7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Apr 2, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 2, 2016

Test build #54771 has finished for PR 11810 at commit 0c7afe7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Apr 2, 2016

retest this please.

@viirya
Copy link
Member Author

viirya commented Apr 2, 2016

Looks like StateStoreSuite a bit flaky...

@SparkQA
Copy link

SparkQA commented Apr 2, 2016

Test build #54774 has finished for PR 11810 at commit 0c7afe7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Apr 2, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 3, 2016

Test build #54786 has finished for PR 11810 at commit 0c7afe7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Apr 3, 2016

LGTM

@davies
Copy link
Contributor

davies commented Apr 3, 2016

Merging this into master, thanks!

@viirya
Copy link
Member Author

viirya commented Apr 3, 2016

Thanks for reviewing this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants