-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Custom window frame logic (support ROWS
, RANGE
, PRECEDING
and FOLLOWING
for window functions)
#3570
Conversation
Partial implementation of custom window frames
@alamb, mentioning you since you showed interest in this. Once this gets some feedback, we will perform any fixes if necessary and then add Note that |
Thanks @ozankabak -- I will try and find time to review this later this week, though it may not be until the weekend |
ROWS
, RANGE
, PRECEDING
and FOLLOWING
for window functions)
I took a brief look through this PR, and it looks quite cool -- will provide detailed comments later. For now I have started the CI checks |
Seems like we forgot the license header in one file 🙂 CI checks are passing except that. We will fix the license thing along with any other changes based on the review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@metesynnada thank you very much for this contribution. The retract_rows
is a very clever way to implement this feature.
As a team, this is our first contribution to Datafusion and we hope to contribute further to both Datafusion and Ballista in the future
That is wonderful news and welcome!
We also would like to get your ideas on how to close this issue completely since we are providing a partial implementation as a first step. You can see which cases we cover in integration tests.
As a draft, we currently do not support:
I think in general we have taken the approach of incrementally adding feature support. Thus if a feature is not entirely supported, implementing it incrementally (e.g. adding support for more of, but not all of the OVER
clause) is both "allowed" and encouraged as it makes the PRs easier to review, given our limited reviewer bandwidth
While I am typing this, I would like to encourage you and your team to feel free to review / approve other PRs. One of the maintainers will review prior to merging and we have found that having community reviews helps with code quality, spreads knowledge, and makes the PR merge process more efficient.
T
cc @jimexist who contributed the initial implementation of windows functions
} | ||
/// The partition by clause conducts sorting according to given partition column by default. If the | ||
/// sorting columns have non unique values, the unstable sorting may produce indeterminate results. | ||
/// Therefore, we are commenting out the following test for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One way to handle this type of non determinism would be add a where clause that filtered out any non unique values
@@ -217,141 +205,6 @@ macro_rules! sum_row { | |||
}}; | |||
} | |||
|
|||
// TODO implement this in arrow-rs with simd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a long standing tension between adding more functionality to ScalarValue
Basically, the theory is that doing any sort of arithmetic on more than a few ScalarValue
is likely to have terrible performance compared to using the optimized / vectorized kernels from arrow. So the thinking goes if we make it easier to write code that operates directly on ScalarValue we will end up with some very bad performance
However, this is at least the second PR (the first one is adding variance in the first place, see #1525 -- #1525 (review) from @realno ) I have seen that has found this sum code and moved it into ScalarValue
and so my thinking has evolved on this matter.
I now think we should add basic support in ScalarValue for arithmetic for initial implementation and then we can work on optimizing performance as follow on Prs
/// we separate left and right with compile time lookup. | ||
/// To use bisect_left give true in the template argument | ||
/// To use bisect_right give false in the template argument | ||
pub fn bisect<const SIDE: bool>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While reading this I can't help but wonder if the same logic could be found using https://doc.rust-lang.org/std/primitive.slice.html#method.binary_search
Though the subtlety of left
and right
would need some finagling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still working through more of this code, but in summary I think it is very good, and well tested 👍 I went through the code in datafusion/physical-expr/src/window/aggregate.rs quite carefully and it looks really nice
If possible, it would be great to add some negative SQL level tests showing what types of SQL OVER clauses (e.g. with groups, and what types of bounds) are not supported (so for example we don't inadvertently plan them incorrectly)
My only real concern is about the growth of features of ScalarValue
(and automatic coercion) but otherwise I think this PR is looking very close to mergable
For follow on work, in this project we typically file tickets to track the missing features and merge in the initial feature.
So all in all, thank you @metesynnada -- this is a great initial PR
datafusion/common/src/scalar.rs
Outdated
(ScalarValue::Int16(lhs), ScalarValue::Int16(rhs)) => { | ||
typed_op!(lhs, rhs, Int16, i16, $OPERATION) | ||
} | ||
_ => impl_common_symmetic_cases_op!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should be doing coercion in ScalarValue
-- by the time the execution plan runs the coercsion should have already been done elsewhere.
In other words, I would expect Scalar::Float64
+ Scalar::Float64
to work, but Scalar::Float32
+ Scalar::Float64
to fail
I fear that by automatically coercing in ScalarValue we will hide errors / failures to coerce elsewhere, thus masking bugs
@liukun4515 has been working on improving coercion and moving it earlier in the plan #3396 which is perhaps related
pub fn bisect<const SIDE: bool>( | ||
item_columns: &[ArrayRef], | ||
target: &[ScalarValue], | ||
) -> Result<usize> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this returns an index then it's a binary search instead of bisect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the contribution
i feel that bisect and scalar changes are better to be extracted and merge first
Thanks for the reviews, everybody! We are working on changes that address your reviews and plan to send them your way in a day or two. |
Hello again! We resolved almost all the points you've mentioned. Additionally, we identified some challenging corner cases regarding NULL treatment and updated our code to handle these cases, along with new unit tests for these corner cases. Four main discussion points remain: PostgreSQL compatibility of new unit tests Expected values of all the new unit tests were sanity-checked against PostgreSQL. To remain in sync with PostgreSQL, we also think that it could be a good idea to add psql-parity tests for NULL-treatment cases. If you agree, let's make this subject of another PR. We can open an issue to track this (and will be happy to work on a PR to resolve it). Changes to At this point, we do not really have any new functionality or complexity added to this type. We just tidied up/moved some We agree that there should be no coercions in general at this stage -- any necessary work along these lines should be handled previously in the code path. However, we don't want to extend the scope of this PR to remove already-existing coercions from the codebase. If there is no issue about this, we are happy to open an issue for this and help resolve it in the near future along with similar issues like #3511. Some teaser: I have done some preliminary tests and it seems like removing all the coercions will not be a tough task at all. However, removing this kind of code always requires more careful testing than just preliminary testing and deserves a careful study/separate PR 🙂 Bisect We considered using the Overflows Overflows are not handled well by the Thanks again for all the reviews. Excited to see this merge and get used! |
Thanks @metesynnada and @ozankabak -- I'll check this out more carefully tomorrow or Friday |
Codecov Report
@@ Coverage Diff @@
## master #3570 +/- ##
==========================================
+ Coverage 85.91% 86.07% +0.16%
==========================================
Files 301 301
Lines 56218 56884 +666
==========================================
+ Hits 48301 48965 +664
- Misses 7917 7919 +2
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again ! I went through this PR again, and I think it is ready to merge.
Expected values of all the new unit tests were sanity-checked against PostgreSQL. To remain in sync with PostgreSQL, we also think that it could be a good idea to add psql-parity tests for NULL-treatment cases. If you agree, let's make this subject of another PR. We can open an issue to track this (and will be happy to work on a PR to resolve it).
Yes, a follow on issue would be great. Thank you
Currently, if an overflow happens, the whole process panics and quits as there is no machinery to handle this in the codebase. If you let us know of your thinking on this matter, we will be happy to help get this fixed in general in a subsequent PR so that other code can benefit too.
Likewise, I think follow on PRs are the right way to go -- thank you.
.await; | ||
assert_eq!( | ||
df.err().unwrap().to_string(), | ||
"Execution error: Invalid window frame: end bound cannot be unbounded preceding" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
let ctx = SessionContext::new(); | ||
register_aggregate_null_cases_csv(&ctx).await?; | ||
let sql = "SELECT | ||
SUM(c1) OVER (ORDER BY c2 DESC) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think sum
will produce the same value regardless of the window order -- perhaps using first()
might add better coverage to these tests
@@ -312,7 +326,11 @@ impl Accumulator for SumAccumulator { | |||
fn evaluate(&self) -> Result<ScalarValue> { | |||
// TODO: add the checker for overflow | |||
// For the decimal(precision,_) data type, the absolute of value must be less than 10^precision. | |||
Ok(self.sum.clone()) | |||
if self.count == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me a while to understand why this count
field is necessary. I am a little worried that it might add overhead to the aggregate function (aka slow down aggregates). However, we can always implement a special case sum for window functions if this is a problem.
I think adding some comments saying that the count is needed to distinguish the case of "no values" if all values are removed via remove_batches
would help future readers.
I did try commenting it out and there was a test diff, so 👍 for the coverage.
I'll plan to merge this tomorrow unless I hear otherwise or anyone else would like longer to review. Thanks again @metesynnada and @ozankabak -- this looks epic |
Thanks @alamb! In our first follow-up PR, we will tweak the test you noted to increase coverage and also enrich the comments to explain subtle points such as the one you found. |
🚀 -- great first contribution |
Benchmark runs are scheduled for baseline = a9f7cac and contender = 65a5c6b. 65a5c6b is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
We offer a partial implementation of windows with custom window frames and improve the situation on #361.
As a team, this is our first contribution to Datafusion and we hope to contribute further to both Datafusion and Ballista in the future. Since we are creating a PR for this project for the first time, we would like to get feedback on how we are doing in terms of code quality, alignment with the project roadmap etc. We also would like to get your ideas on how to close this issue completely since we are providing a partial implementation as a first step. You can see which cases we cover in integration tests.
Rationale for this change
Datafusion currently does not support custom window frames, but it is on the roadmap.
What changes are included in this PR?
For now, we implemented
ROWS
andRANGE
modes supportingPRECEDING
andFOLLOWING
.As a draft, we currently do not support:
GROUPS
modeRANGE BETWEEN '1 day' PRECEDING AND '10 days' FOLLOWING
since the logical planner does not support the types other than an integer.EXCLUDE CURRENT ROW
Next steps
GROUPS
mode implementation by extendingcalculate_current_window
method.calculate_current_window
method.Observations
Some aggregation function implementations are not generic, but use
f64
. This can create issues with statistical aggregation functions likeCORR(x, y)
when greater precision is required. Fortunately, they can be enhanced to support other data types similar toSUM(x)
aggregation.Also,
evaluation()
of theCovarianceAccumulator
should beto become compatible with PostgreSQL. However, these issues are separate from this PR and we can discuss them under a new issue. For this reason, we deferred supporting functions like
CORR(x, y)
to the future.Since unstable sorting is used, some queries output different results than PostgreSQL. We use only unique columns for
ORDER BY
clauses while testingROWS
mode.An example query:
The Datafusion output is:
and in PostgreSQL as
There is a minor problem in the logical planner, it should run
without a problem, however, it produces