-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23727][SQL] Support for pushing down filters for DateType in parquet #20851
Conversation
ok to test |
def d: Date = new Date(Date.valueOf("2018-03-01").getTime + 24 * 60 * 60 * 1000 * (int - 1)) | ||
} | ||
|
||
withParquetDataFrame((1 to 4).map(i => Tuple1(i.d))) { implicit df => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These test cases only cover the limited cases. We need to check the boundary values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you kindly give me some examples about what kind of boundary tests? I checked parquet integer push down and ORC date type push down, seems like have covered all their tests.
@@ -148,6 +193,15 @@ private[parquet] object ParquetFilters { | |||
case BinaryType => | |||
(n: String, v: Any) => | |||
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) | |||
case DateType => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a new internal SQLConf to make it configurable. Users can turn it off if they hit a bug introduced in this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have added, kindly help review.
Test build #88344 has finished for PR 20851 at commit
|
(n: String, v: Any) => { | ||
FilterApi.eq( | ||
intColumn(n), | ||
Option(v).map{ date => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: p{
-> p {
Test build #88353 has finished for PR 20851 at commit
|
Test build #88356 has finished for PR 20851 at commit
|
intColumn(n), | ||
Option(v).map { date => | ||
val days = date.asInstanceOf[java.sql.Date].getTime / (24 * 60 * 60 * 1000) | ||
days.toInt.asInstanceOf[Integer] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use DateTimeUtils.fromJavaDate
here and below?
@@ -353,6 +353,12 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date") | |||
.doc("If true, enables Parquet filter push-down optimization for Date. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be an internal conf, i.e., .internal()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile any idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should be an internal conf. In Spark 3.0 release, we will revisit all the internal confs and remove all the unnecessary confs.
(n: String, v: Any) => { | ||
FilterApi.notEq( | ||
intColumn(n), | ||
Option(v).map { date => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 3 lines are repeated several times in this change. Please refactor to a new method.
Test build #88367 has finished for PR 20851 at commit
|
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { | ||
withSQLConf( | ||
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", | ||
SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
withSQLConf(
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
@@ -313,6 +316,36 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex | |||
} | |||
} | |||
|
|||
test("filter pushdown - date") { | |||
implicit class IntToDate(int: Int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we pass a string here and convert it into a date?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this way? It is from ORC's date push down.
test("filter pushdown - date") {
val dates = Seq("2017-08-18", "2017-08-19", "2017-08-20", "2017-08-21").map { day =>
Date.valueOf(day)
}
withOrcDataFrame(dates.map(Tuple1(_))) { implicit df =>
checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
checkFilterPredicate('_1 === dates(0), PredicateLeaf.Operator.EQUALS)
checkFilterPredicate('_1 <=> dates(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate('_1 < dates(1), PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate('_1 > dates(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate('_1 <= dates(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate('_1 >= dates(3), PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(dates(0)) === '_1, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(Literal(dates(0)) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(Literal(dates(1)) > '_1, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(dates(2)) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(dates(0)) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(dates(3)) <= '_1, PredicateLeaf.Operator.LESS_THAN)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either way looks fine but I usually stick to what other codes do around the fix though ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, do you mean this way? Looks like we need more words :).
implicit class StringToDate(s: String) {
def d: Date = Date.valueOf(s)
}
withParquetDataFrame(
Seq("2017-08-18", "2017-08-19", "2017-08-20", "2017-08-21").map(i => Tuple1(i.d))) {
implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]],
Seq("2017-08-18", "2017-08-19", "2017-08-20", "2017-08-21").map(i => Row.apply(i.d)))
checkFilterPredicate('_1 === "2017-08-18".d, classOf[Eq[_]], "2017-08-18".d)
checkFilterPredicate('_1 =!= "2017-08-18".d, classOf[NotEq[_]],
Seq("2017-08-19", "2017-08-20", "2017-08-21").map(i => Row.apply(i.d)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think "2017-08-19".d
is at least better than 1.d
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I think this one is better than the current one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per @cloud-fan 's suggestion, I have changed 1.d to 1.date, which one is perferred?
- "2017-08-18".d
- 1.date
- "2017-08-18".date
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the string one is more readable to me
@@ -72,6 +82,14 @@ private[parquet] object ParquetFilters { | |||
(n: String, v: Any) => FilterApi.notEq( | |||
binaryColumn(n), | |||
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) | |||
case DateType if SQLConf.get.parquetFilterPushDownDate => | |||
(n: String, v: Any) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
(n: String, v: Any) => FilterApi.notEq(
intColumn(n),
Option(v).map { d =>
DateTimeUtils.fromJavaDate(d.asInstanceOf[java.sql.Date]).asInstanceOf[Integer]
}.orNull)
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) | ||
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.d))) | ||
|
||
checkFilterPredicate('_1 === 1.d, classOf[Eq[_]], 1.d) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1.d
is weird, can we name it 1.date
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree 1.date is better, but binary is using "1.b", shall we keep the same pattern with it?
test("filter pushdown - binary") {
implicit class IntToBinary(int: Int) {
def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b
might be OK, but d
is confusing as it's also a postfix for double.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks very much for explanation.
(n: String, v: Any) => FilterApi.eq( | ||
intColumn(n), | ||
Option(v).map { d => | ||
DateTimeUtils.fromJavaDate(d.asInstanceOf[java.sql.Date]).asInstanceOf[Integer] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we respect session local timezone here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should do DateTimeUtils.fromJavaDate(d.asInstanceOf[java.sql.Date], SQLConf.get.sessionLocalTimeZone).asInstanceOf[Integer]
Test build #88426 has finished for PR 20851 at commit
|
Test build #88454 has finished for PR 20851 at commit
|
"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.") | ||
.internal() | ||
.booleanConf | ||
.createWithDefault(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this kind of thing, we had better start with false
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
case DateType if SQLConf.get.parquetFilterPushDownDate => | ||
(n: String, v: Any) => FilterApi.eq( | ||
intColumn(n), | ||
Option(v).map(date => dateToDays(date.asInstanceOf[java.sql.Date]).asInstanceOf[Integer]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we imported Date
at line 20, we can remove java.sql
. There are 6 occurrences.
- Option(v).map(date => dateToDays(date.asInstanceOf[java.sql.Date]).asInstanceOf[Integer])
- .orNull)
+ Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
'_1 < "2018-03-19".date || '_1 > "2018-03-20".date, | ||
classOf[Operators.Or], | ||
Seq(Row("2018-03-18".date), Row("2018-03-21".date))) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Indentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you kindly show me how to improve? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test("filter pushdown - date") {
implicit class StringToDate(s: String) {
def date: Date = Date.valueOf(s)
}
val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")
withParquetDataFrame(data.map(i => Tuple1(i.date))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], data.map(i => Row.apply(i.date)))
checkFilterPredicate('_1 === "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate('_1 <=> "2018-03-18".date, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate('_1 =!= "2018-03-18".date, classOf[NotEq[_]],
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(i.date)))
checkFilterPredicate('_1 < "2018-03-19".date, classOf[Lt[_]], "2018-03-18".date)
checkFilterPredicate('_1 > "2018-03-20".date, classOf[Gt[_]], "2018-03-21".date)
checkFilterPredicate('_1 <= "2018-03-18".date, classOf[LtEq[_]], "2018-03-18".date)
checkFilterPredicate('_1 >= "2018-03-21".date, classOf[GtEq[_]], "2018-03-21".date)
checkFilterPredicate(
Literal("2018-03-18".date) === '_1, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-18".date) <=> '_1, classOf[Eq[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-19".date) > '_1, classOf[Lt[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-20".date) < '_1, classOf[Gt[_]], "2018-03-21".date)
checkFilterPredicate(
Literal("2018-03-18".date) >= '_1, classOf[LtEq[_]], "2018-03-18".date)
checkFilterPredicate(
Literal("2018-03-21".date) <= '_1, classOf[GtEq[_]], "2018-03-21".date)
checkFilterPredicate(!('_1 < "2018-03-21".date), classOf[GtEq[_]], "2018-03-21".date)
checkFilterPredicate(
'_1 < "2018-03-19".date || '_1 > "2018-03-20".date,
classOf[Operators.Or],
Seq(Row("2018-03-18".date), Row("2018-03-21".date)))
}
}
Oops, I edited it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @HyukjinKwon ! That's better what I thought.
Seems fine otherwise. |
'_1 < "2018-03-19".date || '_1 > "2018-03-20".date, | ||
classOf[Operators.Or], | ||
Seq(Row("2018-03-18".date), Row("2018-03-21".date))) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is different from what @HyukjinKwon gave you. :)
Sorry for mistake! is it the same now? @HyukjinKwon @dongjoon-hyun |
Test build #88498 has finished for PR 20851 at commit
|
Test build #88505 has finished for PR 20851 at commit
|
Test build #88506 has finished for PR 20851 at commit
|
retest this please |
Test build #88512 has finished for PR 20851 at commit
|
"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.") | ||
.internal() | ||
.booleanConf | ||
.createWithDefault(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an internal by-default-false conf usually means it's not available for users...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun @HyukjinKwon suggested such configuration could be better to start with "false", but actually ORC does support Date push down by default, any idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yucai . The reason is that spark.sql.orc.filterPushdown
is still false
in Spark 2.3 while spark.sql.parquet.filterPushdown
is true
. We don't know this is safe or not.
Anyway, we have 6 or more months for Apache Spark 2.4. We may enable this in master
branch temporarily for testing purpose only, and are able to disable this at the last moment of 2.4 release like we did about ORC conf if there is some issue.
BTW, did you use this in your company a lot in production?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's common that we turn on new feature by default, if there is no known regression. And turn it off if we find regression later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you think so, +1. :)
BTW, based on Apache Spark way, I assume that this will not land on branch-2.3
with spark.sql.parquet.filterPushdown.date=true
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a bug fix. We will not backport it to branch-2.3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! Thank you for confirmation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun, we are investigating to use this feature in some kind of eBay's queries, if its performance is good, will benefit a lot.
As per our discussion, I will turn it on by default, thanks very much!
case DateType if SQLConf.get.parquetFilterPushDownDate => | ||
(n: String, v: Any) => FilterApi.eq( | ||
intColumn(n), | ||
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I was wrong. I took a look at how these dates get created, in DataSourceStrategy.translateFilter
. Actually they are created via DateTimeUtils.toJavaDate
without timezone, which means here we should not use timezone either.
@@ -129,6 +154,10 @@ private[parquet] object ParquetFilters { | |||
case BinaryType => | |||
(n: String, v: Any) => | |||
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) | |||
case DateType if SQLConf.get.parquetFilterPushDownDate => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should refactor it, so that adding a new data type doesn't need to touch so many places. This can be done later.
Test build #88575 has finished for PR 20851 at commit
|
Test build #88700 has finished for PR 20851 at commit
|
thanks, merging to master! |
…arquet ## What changes were proposed in this pull request? This PR supports for pushing down filters for DateType in parquet ## How was this patch tested? Added UT and tested in local. Author: yucai <[email protected]> Closes apache#20851 from yucai/SPARK-23727.
What changes were proposed in this pull request?
This PR supports for pushing down filters for DateType in parquet
How was this patch tested?
Added UT and tested in local.