Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23727][SQL] Support for pushing down filters for DateType in parquet #20851

Closed
wants to merge 13 commits into from

Conversation

yucai
Copy link
Contributor

@yucai yucai commented Mar 18, 2018

What changes were proposed in this pull request?

This PR supports for pushing down filters for DateType in parquet

How was this patch tested?

Added UT and tested in local.

@gatorsmile
Copy link
Member

ok to test

def d: Date = new Date(Date.valueOf("2018-03-01").getTime + 24 * 60 * 60 * 1000 * (int - 1))
}

withParquetDataFrame((1 to 4).map(i => Tuple1(i.d))) { implicit df =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These test cases only cover the limited cases. We need to check the boundary values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you kindly give me some examples about what kind of boundary tests? I checked parquet integer push down and ORC date type push down, seems like have covered all their tests.

@@ -148,6 +193,15 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType =>
Copy link
Member

@gatorsmile gatorsmile Mar 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new internal SQLConf to make it configurable. Users can turn it off if they hit a bug introduced in this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added, kindly help review.

@SparkQA
Copy link

SparkQA commented Mar 18, 2018

Test build #88344 has finished for PR 20851 at commit 079af71.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

(n: String, v: Any) => {
FilterApi.eq(
intColumn(n),
Option(v).map{ date =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: p{ -> p {

@yucai yucai changed the title [SPARK-23727][SQL] Support DATE predict push down in parquet [SPARK-23727][SQL] Support for pushing down filters for DateType in parquet Mar 18, 2018
@SparkQA
Copy link

SparkQA commented Mar 18, 2018

Test build #88353 has finished for PR 20851 at commit 15bd28d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 18, 2018

Test build #88356 has finished for PR 20851 at commit 1f2b450.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

intColumn(n),
Option(v).map { date =>
val days = date.asInstanceOf[java.sql.Date].getTime / (24 * 60 * 60 * 1000)
days.toInt.asInstanceOf[Integer]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use DateTimeUtils.fromJavaDate here and below?

@@ -353,6 +353,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date")
.doc("If true, enables Parquet filter push-down optimization for Date. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be an internal conf, i.e., .internal()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile any idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be an internal conf. In Spark 3.0 release, we will revisit all the internal confs and remove all the unnecessary confs.

@gatorsmile
Copy link
Member

cc @michal-databricks @cloud-fan

(n: String, v: Any) => {
FilterApi.notEq(
intColumn(n),
Option(v).map { date =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 3 lines are repeated several times in this change. Please refactor to a new method.

@SparkQA
Copy link

SparkQA commented Mar 19, 2018

Test build #88367 has finished for PR 20851 at commit 8f36d1e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withSQLConf(
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

withSQLConf(
  SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
  SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
  SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {

@@ -313,6 +316,36 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

test("filter pushdown - date") {
implicit class IntToDate(int: Int) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we pass a string here and convert it into a date?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this way? It is from ORC's date push down.

  test("filter pushdown - date") {
    val dates = Seq("2017-08-18", "2017-08-19", "2017-08-20", "2017-08-21").map { day =>
      Date.valueOf(day)
    }
    withOrcDataFrame(dates.map(Tuple1(_))) { implicit df =>
      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)

      checkFilterPredicate('_1 === dates(0), PredicateLeaf.Operator.EQUALS)
      checkFilterPredicate('_1 <=> dates(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)

      checkFilterPredicate('_1 < dates(1), PredicateLeaf.Operator.LESS_THAN)
      checkFilterPredicate('_1 > dates(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
      checkFilterPredicate('_1 <= dates(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
      checkFilterPredicate('_1 >= dates(3), PredicateLeaf.Operator.LESS_THAN)

      checkFilterPredicate(Literal(dates(0)) === '_1, PredicateLeaf.Operator.EQUALS)
      checkFilterPredicate(Literal(dates(0)) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
      checkFilterPredicate(Literal(dates(1)) > '_1, PredicateLeaf.Operator.LESS_THAN)
      checkFilterPredicate(Literal(dates(2)) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
      checkFilterPredicate(Literal(dates(0)) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
      checkFilterPredicate(Literal(dates(3)) <= '_1, PredicateLeaf.Operator.LESS_THAN)
    }
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way looks fine but I usually stick to what other codes do around the fix though ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, do you mean this way? Looks like we need more words :).

    implicit class StringToDate(s: String) {
      def d: Date = Date.valueOf(s)
    }

    withParquetDataFrame(
      Seq("2017-08-18", "2017-08-19", "2017-08-20", "2017-08-21").map(i => Tuple1(i.d))) {
        implicit df =>
          checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
          checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]],
            Seq("2017-08-18", "2017-08-19", "2017-08-20", "2017-08-21").map(i => Row.apply(i.d)))

          checkFilterPredicate('_1 === "2017-08-18".d, classOf[Eq[_]], "2017-08-18".d)
          checkFilterPredicate('_1 =!= "2017-08-18".d, classOf[NotEq[_]],
            Seq("2017-08-19", "2017-08-20", "2017-08-21").map(i => Row.apply(i.d)))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "2017-08-19".d is at least better than 1.d.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I think this one is better than the current one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per @cloud-fan 's suggestion, I have changed 1.d to 1.date, which one is perferred?

  1. "2017-08-18".d
  2. 1.date
  3. "2017-08-18".date

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the string one is more readable to me

@@ -72,6 +82,14 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

(n: String, v: Any) => FilterApi.notEq(
 intColumn(n),
 Option(v).map { d =>
   DateTimeUtils.fromJavaDate(d.asInstanceOf[java.sql.Date]).asInstanceOf[Integer]
 }.orNull)

checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.d)))

checkFilterPredicate('_1 === 1.d, classOf[Eq[_]], 1.d)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.d is weird, can we name it 1.date?

Copy link
Contributor Author

@yucai yucai Mar 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree 1.date is better, but binary is using "1.b", shall we keep the same pattern with it?

  test("filter pushdown - binary") {
    implicit class IntToBinary(int: Int) {
      def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b might be OK, but d is confusing as it's also a postfix for double.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks very much for explanation.

(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map { d =>
DateTimeUtils.fromJavaDate(d.asInstanceOf[java.sql.Date]).asInstanceOf[Integer]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we respect session local timezone here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do DateTimeUtils.fromJavaDate(d.asInstanceOf[java.sql.Date], SQLConf.get.sessionLocalTimeZone).asInstanceOf[Integer]

@SparkQA
Copy link

SparkQA commented Mar 20, 2018

Test build #88426 has finished for PR 20851 at commit 82c5a73.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 21, 2018

Test build #88454 has finished for PR 20851 at commit 7946bea.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
.internal()
.booleanConf
.createWithDefault(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this kind of thing, we had better start with false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[java.sql.Date]).asInstanceOf[Integer])
Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we imported Date at line 20, we can remove java.sql. There are 6 occurrences.

- Option(v).map(date => dateToDays(date.asInstanceOf[java.sql.Date]).asInstanceOf[Integer])
-   .orNull)
+ Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)

'_1 < "2018-03-19".date || '_1 > "2018-03-20".date,
classOf[Operators.Or],
Seq(Row("2018-03-18".date), Row("2018-03-21".date)))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Indentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you kindly show me how to improve? Thanks!

Copy link
Member

@HyukjinKwon HyukjinKwon Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  test("filter pushdown - date") {
    implicit class StringToDate(s: String) {
      def date: Date = Date.valueOf(s)
    }

    val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")

    withParquetDataFrame(data.map(i => Tuple1(i.date))) { implicit df =>
      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], data.map(i => Row.apply(i.date)))

      checkFilterPredicate('_1 === "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
      checkFilterPredicate('_1 <=> "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
      checkFilterPredicate('_1 =!= "2018-03-18".date, classOf[NotEq[_]],
        Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(i.date)))

      checkFilterPredicate('_1 < "2018-03-19".date, classOf[Lt[_]], "2018-03-18".date)
      checkFilterPredicate('_1 > "2018-03-20".date, classOf[Gt[_]], "2018-03-21".date)
      checkFilterPredicate('_1 <= "2018-03-18".date, classOf[LtEq[_]], "2018-03-18".date)
      checkFilterPredicate('_1 >= "2018-03-21".date, classOf[GtEq[_]], "2018-03-21".date)

      checkFilterPredicate(
        Literal("2018-03-18".date) === '_1, classOf[Eq[_]], "2018-03-18".date)
      checkFilterPredicate(
        Literal("2018-03-18".date) <=> '_1, classOf[Eq[_]], "2018-03-18".date)
      checkFilterPredicate(
        Literal("2018-03-19".date) > '_1, classOf[Lt[_]], "2018-03-18".date)
      checkFilterPredicate(
        Literal("2018-03-20".date) < '_1, classOf[Gt[_]], "2018-03-21".date)
      checkFilterPredicate(
        Literal("2018-03-18".date) >= '_1, classOf[LtEq[_]], "2018-03-18".date)
      checkFilterPredicate(
        Literal("2018-03-21".date) <= '_1, classOf[GtEq[_]], "2018-03-21".date)

      checkFilterPredicate(!('_1 < "2018-03-21".date), classOf[GtEq[_]], "2018-03-21".date)
      checkFilterPredicate(
        '_1 < "2018-03-19".date || '_1 > "2018-03-20".date,
        classOf[Operators.Or],
        Seq(Row("2018-03-18".date), Row("2018-03-21".date)))
    }
  }

Oops, I edited it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @HyukjinKwon ! That's better what I thought.

@HyukjinKwon
Copy link
Member

Seems fine otherwise.

'_1 < "2018-03-19".date || '_1 > "2018-03-20".date,
classOf[Operators.Or],
Seq(Row("2018-03-18".date), Row("2018-03-21".date)))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different from what @HyukjinKwon gave you. :)

@yucai
Copy link
Contributor Author

yucai commented Mar 22, 2018

Sorry for mistake! is it the same now? @HyukjinKwon @dongjoon-hyun

@SparkQA
Copy link

SparkQA commented Mar 22, 2018

Test build #88498 has finished for PR 20851 at commit 423a938.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • implicit class StringToDate(s: String)

@SparkQA
Copy link

SparkQA commented Mar 22, 2018

Test build #88505 has finished for PR 20851 at commit a58eec8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 22, 2018

Test build #88506 has finished for PR 20851 at commit b43e36c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Mar 22, 2018

Test build #88512 has finished for PR 20851 at commit b43e36c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
.internal()
.booleanConf
.createWithDefault(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an internal by-default-false conf usually means it's not available for users...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun @HyukjinKwon suggested such configuration could be better to start with "false", but actually ORC does support Date push down by default, any idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine either way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yucai . The reason is that spark.sql.orc.filterPushdown is still false in Spark 2.3 while spark.sql.parquet.filterPushdown is true. We don't know this is safe or not.

Anyway, we have 6 or more months for Apache Spark 2.4. We may enable this in master branch temporarily for testing purpose only, and are able to disable this at the last moment of 2.4 release like we did about ORC conf if there is some issue.

BTW, did you use this in your company a lot in production?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's common that we turn on new feature by default, if there is no known regression. And turn it off if we find regression later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you think so, +1. :)

BTW, based on Apache Spark way, I assume that this will not land on branch-2.3 with spark.sql.parquet.filterPushdown.date=true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a bug fix. We will not backport it to branch-2.3

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thank you for confirmation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun, we are investigating to use this feature in some kind of eBay's queries, if its performance is good, will benefit a lot.
As per our discussion, I will turn it on by default, thanks very much!

case DateType if SQLConf.get.parquetFilterPushDownDate =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n),
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I was wrong. I took a look at how these dates get created, in DataSourceStrategy.translateFilter. Actually they are created via DateTimeUtils.toJavaDate without timezone, which means here we should not use timezone either.

@@ -129,6 +154,10 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case DateType if SQLConf.get.parquetFilterPushDownDate =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should refactor it, so that adding a new data type doesn't need to touch so many places. This can be done later.

@SparkQA
Copy link

SparkQA commented Mar 26, 2018

Test build #88575 has finished for PR 20851 at commit b70def0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 29, 2018

Test build #88700 has finished for PR 20851 at commit 759f468.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in b02e76c Mar 30, 2018
mshtelma pushed a commit to mshtelma/spark that referenced this pull request Apr 5, 2018
…arquet

## What changes were proposed in this pull request?

This PR supports for pushing down filters for DateType in parquet

## How was this patch tested?

Added UT and tested in local.

Author: yucai <[email protected]>

Closes apache#20851 from yucai/SPARK-23727.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants