Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24934][SQL] Explicitly whitelist supported types in upper/lower bounds for in-memory partition pruning #21882

Closed
wants to merge 3 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jul 26, 2018

What changes were proposed in this pull request?

Looks we intentionally set null for upper/lower bounds for complex types and don't use it. However, these look used in in-memory partition pruning, which ends up with incorrect results.

This PR proposes to explicitly whitelist the supported types.

val df = Seq(Array("a", "b"), Array("c", "d")).toDF("arrayCol")
df.cache().filter("arrayCol > array('a', 'b')").show()
val df = sql("select cast('a' as binary) as a")
df.cache().filter("a == cast('a' as binary)").show()

Before:

+--------+
|arrayCol|
+--------+
+--------+
+---+
|  a|
+---+
+---+

After:

+--------+
|arrayCol|
+--------+
|  [c, d]|
+--------+
+----+
|   a|
+----+
|[61]|
+----+

How was this patch tested?

Unit tests were added and manually tested.

@HyukjinKwon
Copy link
Member Author

cc @cloud-fan, mind taking a look please?

@pwoody
Copy link

pwoody commented Jul 26, 2018

This is the same problem as #20935 , yeah?

@HyukjinKwon
Copy link
Member Author

oops it is. I didn't know. But this PR can target lower branches too since it's a correctness issue :-)

@HyukjinKwon
Copy link
Member Author

Also, I believe this issue still can happen even after your PR?

@pwoody
Copy link

pwoody commented Jul 26, 2018

The test case here works fine at least. The linked PR focuses on accurately collecting stats, so null bounds should be correct if they occur.

@mgaido91
Copy link
Contributor

@pwoody I am not sure I 100% agree on your last sentence. I agree that we should correct null bounds, but letting the users facing bugs returning wrong results meanwhile we find all the possible cases we have not thought of is not the right way to go I think. Moreover, if a datatype is not orderable, we cannot even fix the lack of an upper and lower bound...

I think the approach proposed here is safer and I like it. It would be great (but I am not sure it is feasible) if we could emit a WARN message in case a null is found (in testing we can throw an exception using AssertNotNull), in order to let users know that they are hitting a case which should not happen, so that they can report us.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jul 26, 2018

We should backport this one anyway. Actually the stats are logged in DEBUG level. So, I think we are fine. I guess no harm to add this safeguard and get rid of this hole found, and this doesn't block your PR too. We can just orthogonally proceed.

// bounds.
private def nullSafeEval(
attr: AttributeReference)(func: AttributeReference => Expression): Expression = {
attr.isNull || func(attr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this adds extra runtime null check and may introduce perf regression. How about we follow the hive partition pruning and only create filters for non-complex type? e.g.

    object ExtractableLiteral {
      def unapply(expr: Expression): Option[Expression] = {
        if (expr.dataType.isInstanceOf[AtomicType]) Some(expr) else None
      }
    }
...
case EqualTo(a: AttributeReference, ExtractableLiteral(l)) =>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @cloud-fan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this basically means turning off the filtering for complex types. Despite this may be not a big deal, as probably we won't have complex types often here, can't we instead add the isNull filter only for complex types?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmmmm .. if this can whitelist the cases we support, I thought it's okay to use the suggestion above. BTW, looks we should exclude binary type too. It will still supports itnull or isnotnull though.

checkBatchPruning("SELECT _1 FROM pruningArrayData WHERE _1 <= array(1)", 5, 10)(Seq(Array(1)))
checkBatchPruning("SELECT _1 FROM pruningArrayData WHERE _1 >= array(1)", 5, 10)(
testArrayData.map(_._1))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded blank line

// bounds.
private def nullSafeEval(
attr: AttributeReference)(func: AttributeReference => Expression): Expression = {
attr.isNull || func(attr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this basically means turning off the filtering for complex types. Despite this may be not a big deal, as probably we won't have complex types often here, can't we instead add the isNull filter only for complex types?

@HyukjinKwon HyukjinKwon changed the title [SPARK-24934][SQL] Handle missing upper/lower bounds case in in-memory partition pruning [SPARK-24934][SQL] Explicitly whitelist supported types in upper/lower bounds for in-memory partition pruning Jul 26, 2018
@SparkQA
Copy link

SparkQA commented Jul 26, 2018

Test build #93597 has finished for PR 21882 at commit ea38b56.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jul 26, 2018

Oh BTW, please let allow merge this one when there are some sign-offs and we are ready. I should test #21880 :-) .. I tested this against another PR. It's fine now..

@SparkQA
Copy link

SparkQA commented Jul 26, 2018

Test build #93608 has finished for PR 21882 at commit 8cd100e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 27, 2018

Test build #93649 has finished for PR 21882 at commit 7f1040e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private object ExtractableLiteral {
def unapply(expr: Expression): Option[Literal] = expr match {
case lit: Literal => lit.dataType match {
case BinaryType => None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add test for binary type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add late tonight or tomorrow

@cloud-fan
Copy link
Contributor

LGTM

checkBatchPruning("SELECT _1 FROM pruningArrayData WHERE _1 >= array(1)", 5, 10)(
testArrayData.map(_._1))
// Do not filter on binary type
checkBatchPruning(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this change, Expected Array(Array(1)), but got Array() Wrong query result

pruningArrayData.createOrReplaceTempView("pruningArrayData")
spark.catalog.cacheTable("pruningArrayData")

val pruningBinaryData = sparkContext.makeRDD(testBinaryData, 5).toDF()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala> spark.sparkContext.makeRDD((1 to 100).map { key => Tuple1(Array.fill(key)(key.toByte)) }, 5).toDF().printSchema()
root
 |-- _1: binary (nullable = true)

// Do not filter on binary type
checkBatchPruning(
title = "SELECT _1 FROM pruningBinaryData WHERE _1 == 0x01 (binary literal)",
actual = spark.table("pruningBinaryData").filter($"_1".equalTo(Array[Byte](1.toByte))),
Copy link
Member Author

@HyukjinKwon HyukjinKwon Jul 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is, there seems no SQL binary liternal. So, I had to use Scala API

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not very elegant, but we can do binary(chr(5)) in order to get a binary literal

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops right.

@SparkQA
Copy link

SparkQA commented Jul 28, 2018

Test build #93716 has finished for PR 21882 at commit 3e7b319.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 28, 2018

Test build #93720 has finished for PR 21882 at commit 1a0a2d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 28, 2018

Test build #93722 has finished for PR 21882 at commit fe3c0a0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

override protected def afterEach(): Unit = {
try {
spark.catalog.uncacheTable("pruningData")
spark.catalog.uncacheTable("pruningStringData")
spark.catalog.uncacheTable("pruningArrayData")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uncache the pruningBinaryData too

@SparkQA
Copy link

SparkQA commented Jul 30, 2018

Test build #93761 has finished for PR 21882 at commit deb20ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Jul 30, 2018
…r bounds for in-memory partition pruning

## What changes were proposed in this pull request?

Looks we intentionally set `null` for upper/lower bounds for complex types and don't use it. However, these look used in in-memory partition pruning, which ends up with incorrect results.

This PR proposes to explicitly whitelist the supported types.

```scala
val df = Seq(Array("a", "b"), Array("c", "d")).toDF("arrayCol")
df.cache().filter("arrayCol > array('a', 'b')").show()
```

```scala
val df = sql("select cast('a' as binary) as a")
df.cache().filter("a == cast('a' as binary)").show()
```

**Before:**

```
+--------+
|arrayCol|
+--------+
+--------+
```

```
+---+
|  a|
+---+
+---+
```

**After:**

```
+--------+
|arrayCol|
+--------+
|  [c, d]|
+--------+
```

```
+----+
|   a|
+----+
|[61]|
+----+
```

## How was this patch tested?

Unit tests were added and manually tested.

Author: hyukjinkwon <[email protected]>

Closes #21882 from HyukjinKwon/stats-filter.

(cherry picked from commit bfe60fc)
Signed-off-by: Wenchen Fan <[email protected]>
@cloud-fan
Copy link
Contributor

thanks, merging to master/2.3!

@asfgit asfgit closed this in bfe60fc Jul 30, 2018
@HyukjinKwon
Copy link
Member Author

Thank you @pwoody, @mgaido91 and @cloud-fan.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants