Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom window frame logic (support ROWS, RANGE, PRECEDING and FOLLOWING for window functions) #3570

Merged
merged 11 commits into from
Sep 30, 2022

Conversation

metesynnada
Copy link
Contributor

Which issue does this PR close?

We offer a partial implementation of windows with custom window frames and improve the situation on #361.

As a team, this is our first contribution to Datafusion and we hope to contribute further to both Datafusion and Ballista in the future. Since we are creating a PR for this project for the first time, we would like to get feedback on how we are doing in terms of code quality, alignment with the project roadmap etc. We also would like to get your ideas on how to close this issue completely since we are providing a partial implementation as a first step. You can see which cases we cover in integration tests.

Rationale for this change

Datafusion currently does not support custom window frames, but it is on the roadmap.

What changes are included in this PR?

For now, we implemented ROWS and RANGE modes supporting PRECEDING and FOLLOWING.

As a draft, we currently do not support:

  • GROUPS mode
  • Timestamp ranges; e.g. RANGE BETWEEN '1 day' PRECEDING AND '10 days' FOLLOWING since the logical planner does not support the types other than an integer.
  • Frame exclusion, i.e EXCLUDE CURRENT ROW

Next steps

  • GROUPS mode implementation by extending calculate_current_window method.
  • Frame exclusion, by logical planner extension and adapting calculate_current_window method.

Observations

  • Some aggregation function implementations are not generic, but use f64. This can create issues with statistical aggregation functions like CORR(x, y) when greater precision is required. Fortunately, they can be enhanced to support other data types similar to SUM(x) aggregation.

    Also, evaluation() of the CovarianceAccumulator should be

    @ -374,12 +374,6 @@ impl Accumulator for CovarianceAccumulator {
      };
    
      if count <= 1 {
    -      return Err(DataFusionError::Internal(
    -          "At least two values are needed to calculate covariance".to_string(),
    -      ));
    -  }
    -
    -  if self.count == 0 {
          Ok(ScalarValue::Float64(None))
      } else {
          Ok(ScalarValue::Float64(Some(self.algo_const / count as f64)))
    

    to become compatible with PostgreSQL. However, these issues are separate from this PR and we can discuss them under a new issue. For this reason, we deferred supporting functions like CORR(x, y) to the future.

  • Since unstable sorting is used, some queries output different results than PostgreSQL. We use only unique columns for ORDER BY clauses while testing ROWS mode.

    An example query:

    SELECT c2, c3,
       SUM(c2) OVER(ROWS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation2,
       SUM(c3) OVER(ORDER BY c2 ROWS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation3,
       SUM(c3) OVER(ORDER BY c1 ROWS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation4
    FROM test
    LIMIT 10;

    The Datafusion output is:

    +----+-----+------------+------------+------------+
    | c2 | c3  | summation2 | summation3 | summation4 |
    +----+-----+------------+------------+------------+
    | 1  | 12  | 2          | 132        | -13        |
    | 1  | 120 | 3          | 203        | 263        |
    | 1  | 71  | 4          | 118        | 447        |
    | 1  | -85 | 5          | 154        | 37         |
    | 1  | 36  | 5          | 43         | 358        |
    | 1  | -99 | 5          | 48         | -81        |
    | 1  | 125 | 5          | -31        | 247        |
    | 1  | -8  | 5          | 111        | 215        |
    | 1  | 57  | 5          | 3          | 238        |
    | 1  | -72 | 5          | 140        | 247        |
    +----+-----+------------+------------+------------+

    and in PostgreSQL as

    + --- + --- + ---------- + ---------- + ---------- +
    | c2  | c3  | summation2 | summation3 | summation4 |
    | --- | --- | ---------- | ---------- | ---------- |
    | 1   | -85 | 2          | -49        | -251       |
    | 1   | 36  | 3          | 71         | 330        |
    | 1   | 120 | 4          | 46         | 284        |
    | 1   | -25 | 5          | 149        | -184       |
    | 1   | 103 | 5          | 305        | -15        |
    | 1   | 71  | 5          | 323        | 251        |
    | 1   | 54  | 5          | 286        | 48         |
    | 1   | 83  | 5          | 255        | 166        |
    | 1   | -56 | 5          | 222        | -79        |
    | 1   | 70  | 5          | 180        | -233       |
    +-----+-----+------------+------------+------------+
  • There is a minor problem in the logical planner, it should run

    SELECT 
      SUM(c2) OVER(ORDER BY c5, c6 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as sum
    FROM test;

    without a problem, however, it produces

    Error: Plan("With window frame of type RANGE, the order by expression must be of length 1, got 2")
    

@metesynnada metesynnada marked this pull request as draft September 21, 2022 11:29
@ozankabak
Copy link
Contributor

@alamb, mentioning you since you showed interest in this. Once this gets some feedback, we will perform any fixes if necessary and then add GROUPS functionality so that this work can get merged.

Note that EXCLUDE CURRENT ROW and timestamp handling requires changes to planning logic, so we will leave those to another PR.

@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions sql SQL Planner labels Sep 21, 2022
@alamb
Copy link
Contributor

alamb commented Sep 21, 2022

Thanks @ozankabak -- I will try and find time to review this later this week, though it may not be until the weekend

@alamb alamb changed the title Custom window frame logic Custom window frame logic (support ROWS, RANGE, PRECEDING and FOLLOWING for window functions) Sep 21, 2022
@alamb
Copy link
Contributor

alamb commented Sep 21, 2022

I took a brief look through this PR, and it looks quite cool -- will provide detailed comments later. For now I have started the CI checks

@ozankabak
Copy link
Contributor

Seems like we forgot the license header in one file 🙂 CI checks are passing except that. We will fix the license thing along with any other changes based on the review.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@metesynnada thank you very much for this contribution. The retract_rows is a very clever way to implement this feature.

As a team, this is our first contribution to Datafusion and we hope to contribute further to both Datafusion and Ballista in the future

That is wonderful news and welcome!

We also would like to get your ideas on how to close this issue completely since we are providing a partial implementation as a first step. You can see which cases we cover in integration tests.

As a draft, we currently do not support:

I think in general we have taken the approach of incrementally adding feature support. Thus if a feature is not entirely supported, implementing it incrementally (e.g. adding support for more of, but not all of the OVER clause) is both "allowed" and encouraged as it makes the PRs easier to review, given our limited reviewer bandwidth

While I am typing this, I would like to encourage you and your team to feel free to review / approve other PRs. One of the maintainers will review prior to merging and we have found that having community reviews helps with code quality, spreads knowledge, and makes the PR merge process more efficient.

T

cc @jimexist who contributed the initial implementation of windows functions

datafusion/core/src/physical_plan/planner.rs Show resolved Hide resolved
datafusion/core/tests/sql/window.rs Outdated Show resolved Hide resolved
}
/// The partition by clause conducts sorting according to given partition column by default. If the
/// sorting columns have non unique values, the unstable sorting may produce indeterminate results.
/// Therefore, we are commenting out the following test for now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to handle this type of non determinism would be add a where clause that filtered out any non unique values

datafusion/core/tests/sql/window.rs Outdated Show resolved Hide resolved
datafusion/core/tests/sql/window.rs Show resolved Hide resolved
@@ -217,141 +205,6 @@ macro_rules! sum_row {
}};
}

// TODO implement this in arrow-rs with simd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a long standing tension between adding more functionality to ScalarValue

Basically, the theory is that doing any sort of arithmetic on more than a few ScalarValue is likely to have terrible performance compared to using the optimized / vectorized kernels from arrow. So the thinking goes if we make it easier to write code that operates directly on ScalarValue we will end up with some very bad performance

However, this is at least the second PR (the first one is adding variance in the first place, see #1525 -- #1525 (review) from @realno ) I have seen that has found this sum code and moved it into ScalarValue and so my thinking has evolved on this matter.

I now think we should add basic support in ScalarValue for arithmetic for initial implementation and then we can work on optimizing performance as follow on Prs

datafusion/physical-expr/src/aggregate/sum.rs Show resolved Hide resolved
datafusion/common/src/bisect.rs Outdated Show resolved Hide resolved
/// we separate left and right with compile time lookup.
/// To use bisect_left give true in the template argument
/// To use bisect_right give false in the template argument
pub fn bisect<const SIDE: bool>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While reading this I can't help but wonder if the same logic could be found using https://doc.rust-lang.org/std/primitive.slice.html#method.binary_search

Though the subtlety of left and right would need some finagling

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still working through more of this code, but in summary I think it is very good, and well tested 👍 I went through the code in datafusion/physical-expr/src/window/aggregate.rs quite carefully and it looks really nice

If possible, it would be great to add some negative SQL level tests showing what types of SQL OVER clauses (e.g. with groups, and what types of bounds) are not supported (so for example we don't inadvertently plan them incorrectly)

My only real concern is about the growth of features of ScalarValue (and automatic coercion) but otherwise I think this PR is looking very close to mergable

For follow on work, in this project we typically file tickets to track the missing features and merge in the initial feature.

So all in all, thank you @metesynnada -- this is a great initial PR

integration-tests/sqls/simple_ordered_row.sql Show resolved Hide resolved
(ScalarValue::Int16(lhs), ScalarValue::Int16(rhs)) => {
typed_op!(lhs, rhs, Int16, i16, $OPERATION)
}
_ => impl_common_symmetic_cases_op!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be doing coercion in ScalarValue -- by the time the execution plan runs the coercsion should have already been done elsewhere.

In other words, I would expect Scalar::Float64 + Scalar::Float64 to work, but Scalar::Float32 + Scalar::Float64 to fail

I fear that by automatically coercing in ScalarValue we will hide errors / failures to coerce elsewhere, thus masking bugs

@liukun4515 has been working on improving coercion and moving it earlier in the plan #3396 which is perhaps related

datafusion/physical-expr/src/window/aggregate.rs Outdated Show resolved Hide resolved
datafusion/physical-expr/src/window/aggregate.rs Outdated Show resolved Hide resolved
datafusion/physical-expr/src/window/aggregate.rs Outdated Show resolved Hide resolved
datafusion/physical-expr/src/window/aggregate.rs Outdated Show resolved Hide resolved
datafusion/physical-expr/src/window/aggregate.rs Outdated Show resolved Hide resolved
datafusion/physical-expr/src/window/aggregate.rs Outdated Show resolved Hide resolved
datafusion/physical-expr/src/window/aggregate.rs Outdated Show resolved Hide resolved
datafusion/physical-expr/src/window/aggregate.rs Outdated Show resolved Hide resolved
pub fn bisect<const SIDE: bool>(
item_columns: &[ArrayRef],
target: &[ScalarValue],
) -> Result<usize> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this returns an index then it's a binary search instead of bisect?

Copy link
Member

@jimexist jimexist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the contribution

i feel that bisect and scalar changes are better to be extracted and merge first

@ozankabak
Copy link
Contributor

Thanks for the reviews, everybody! We are working on changes that address your reviews and plan to send them your way in a day or two.

@ozankabak
Copy link
Contributor

ozankabak commented Sep 28, 2022

Hello again! We resolved almost all the points you've mentioned. Additionally, we identified some challenging corner cases regarding NULL treatment and updated our code to handle these cases, along with new unit tests for these corner cases. Four main discussion points remain:

PostgreSQL compatibility of new unit tests

Expected values of all the new unit tests were sanity-checked against PostgreSQL. To remain in sync with PostgreSQL, we also think that it could be a good idea to add psql-parity tests for NULL-treatment cases. If you agree, let's make this subject of another PR. We can open an issue to track this (and will be happy to work on a PR to resolve it).

Changes to ScalarValue

At this point, we do not really have any new functionality or complexity added to this type. We just tidied up/moved some ScalarValue-only code that was scattered across multiple modules to scalar.rs. For example, the coercions you saw were already a part of the code here:

https://github.com/apache/arrow-datafusion/blob/87faf868f2276b84e63cad6721ca08bd79ed9cb8/datafusion/physical-expr/src/aggregate/sum.rs#L261

We agree that there should be no coercions in general at this stage -- any necessary work along these lines should be handled previously in the code path. However, we don't want to extend the scope of this PR to remove already-existing coercions from the codebase. If there is no issue about this, we are happy to open an issue for this and help resolve it in the near future along with similar issues like #3511.

Some teaser: I have done some preliminary tests and it seems like removing all the coercions will not be a tough task at all. However, removing this kind of code always requires more careful testing than just preliminary testing and deserves a careful study/separate PR 🙂

Bisect

We considered using the partition_point function. This requires converting &[ArrayRef] to &[&[ScalarValue]], where the inner &[ScalarValue] corresponds to each row -- we are searching for the insertion place of target &[ScalarValue] among rows. Unless we are missing something, doing this will require unnecessary memory usage and CPU time since it will involve copying. On the contrary, our implementation takes data in original columnar format and calculates rows from the index only when the comparison is done. Since we are doing log(n) comparisons on average, we need log(n) copies instead of n copies. If you have any suggestions for adding bisect functionality without changing the columnar orientation of the data, we can create an issue for this and help resolve it in a subsequent PR.

Overflows

Overflows are not handled well by the ScalarValue type in general. Since we all agree that we do not want to change this type too much in this PR, we left one TODO in the window frame boundary code so that we can upgrade the overflow-sensitive part of the code in the future once there is an agreement on how to fix it. Currently, if an overflow happens, the whole process panics and quits as there is no machinery to handle this in the codebase. If you let us know of your thinking on this matter, we will be happy to help get this fixed in general in a subsequent PR so that other code can benefit too.

Thanks again for all the reviews. Excited to see this merge and get used!

@metesynnada metesynnada marked this pull request as ready for review September 28, 2022 20:14
@alamb
Copy link
Contributor

alamb commented Sep 28, 2022

Thanks @metesynnada and @ozankabak -- I'll check this out more carefully tomorrow or Friday

@codecov-commenter
Copy link

Codecov Report

Merging #3570 (b1b27d2) into master (11abfb9) will increase coverage by 0.16%.
The diff coverage is 85.76%.

@@            Coverage Diff             @@
##           master    #3570      +/-   ##
==========================================
+ Coverage   85.91%   86.07%   +0.16%     
==========================================
  Files         301      301              
  Lines       56218    56884     +666     
==========================================
+ Hits        48301    48965     +664     
- Misses       7917     7919       +2     
Impacted Files Coverage Δ
datafusion/common/src/lib.rs 0.00% <ø> (ø)
datafusion/expr/src/accumulator.rs 58.33% <0.00%> (-19.45%) ⬇️
datafusion/expr/src/window_frame.rs 96.19% <ø> (+2.91%) ⬆️
...tafusion/physical-expr/src/aggregate/covariance.rs 91.91% <0.00%> (-6.80%) ⬇️
datafusion/physical-expr/src/aggregate/variance.rs 93.17% <0.00%> (-5.14%) ⬇️
datafusion/sql/src/planner.rs 81.25% <ø> (+<0.01%) ⬆️
...afusion/physical-expr/src/aggregate/correlation.rs 96.00% <42.85%> (-1.96%) ⬇️
datafusion/physical-expr/src/window/aggregate.rs 80.64% <79.25%> (+5.17%) ⬆️
datafusion/core/tests/sql/mod.rs 97.59% <90.90%> (-0.19%) ⬇️
datafusion/common/src/scalar.rs 85.84% <92.94%> (+0.72%) ⬆️
... and 68 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again ! I went through this PR again, and I think it is ready to merge.

Expected values of all the new unit tests were sanity-checked against PostgreSQL. To remain in sync with PostgreSQL, we also think that it could be a good idea to add psql-parity tests for NULL-treatment cases. If you agree, let's make this subject of another PR. We can open an issue to track this (and will be happy to work on a PR to resolve it).

Yes, a follow on issue would be great. Thank you

Currently, if an overflow happens, the whole process panics and quits as there is no machinery to handle this in the codebase. If you let us know of your thinking on this matter, we will be happy to help get this fixed in general in a subsequent PR so that other code can benefit too.

Likewise, I think follow on PRs are the right way to go -- thank you.

.await;
assert_eq!(
df.err().unwrap().to_string(),
"Execution error: Invalid window frame: end bound cannot be unbounded preceding"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let ctx = SessionContext::new();
register_aggregate_null_cases_csv(&ctx).await?;
let sql = "SELECT
SUM(c1) OVER (ORDER BY c2 DESC)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think sum will produce the same value regardless of the window order -- perhaps using first() might add better coverage to these tests

@@ -312,7 +326,11 @@ impl Accumulator for SumAccumulator {
fn evaluate(&self) -> Result<ScalarValue> {
// TODO: add the checker for overflow
// For the decimal(precision,_) data type, the absolute of value must be less than 10^precision.
Ok(self.sum.clone())
if self.count == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a while to understand why this count field is necessary. I am a little worried that it might add overhead to the aggregate function (aka slow down aggregates). However, we can always implement a special case sum for window functions if this is a problem.

I think adding some comments saying that the count is needed to distinguish the case of "no values" if all values are removed via remove_batches would help future readers.

I did try commenting it out and there was a test diff, so 👍 for the coverage.

@alamb
Copy link
Contributor

alamb commented Sep 29, 2022

I'll plan to merge this tomorrow unless I hear otherwise or anyone else would like longer to review. Thanks again @metesynnada and @ozankabak -- this looks epic

@ozankabak
Copy link
Contributor

Thanks @alamb! In our first follow-up PR, we will tweak the test you noted to increase coverage and also enrich the comments to explain subtle points such as the one you found.

@alamb alamb merged commit 65a5c6b into apache:master Sep 30, 2022
@alamb
Copy link
Contributor

alamb commented Sep 30, 2022

🚀 -- great first contribution

@ursabot
Copy link

ursabot commented Sep 30, 2022

Benchmark runs are scheduled for baseline = a9f7cac and contender = 65a5c6b. 65a5c6b is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants