Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected results with group by and random() #7876

Closed
Blajda opened this issue Oct 20, 2023 · 7 comments
Closed

Unexpected results with group by and random() #7876

Blajda opened this issue Oct 20, 2023 · 7 comments
Labels
bug Something isn't working

Comments

@Blajda
Copy link

Blajda commented Oct 20, 2023

Describe the bug

I have table t1 with a column called file_path
I want to obtain a list of file_paths where each element is unique and then take a random subset of those columns.
I thought that this could be achieved with the following code.

  let files = ctx.sql("select file_path from t1 group by file_path").await.unwrap()
      .with_column("r", random() ).unwrap()
      .filter(col("r").lt_eq(lit(0.2))).unwrap();
  files.show().await.unwrap();

However in the output of my query I see the following entries which contains a record that should be filtered out.

| A                    | 0.8023022275259943   |
| B                    | 0.05829777789599211  |
| C                    | 0.14330028518553894  |

This is the calculated logical plan

Projection: t1.file_path, random() AS r
    Aggregate: groupBy=[[t1.file_path]], aggr=[[]]
        Filter: random() <= Float64(0.2) 
           TableScan: t1 projection=[file_path]

In this case I would expect the filter to occur after the aggregate operation not before.

To Reproduce

No response

Expected behavior

No response

Additional context

No response

@Blajda Blajda added the bug Something isn't working label Oct 20, 2023
@jonahgao
Copy link
Member

jonahgao commented Oct 20, 2023

After some initial debugging, I found that this issue is likely caused by the optimization rule PushDownFilter.
I think we should avoid pushing down non-deterministic predicates.

Before optimization:

Filter: r <= Float64(0.2)
  Projection: aggregate_test_100.c1, random() AS r
    Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[]]
      Projection: aggregate_test_100.c1
        TableScan: aggregate_test_100

After:

Projection: aggregate_test_100.c1, random() AS r
  Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[]]
    Projection: aggregate_test_100.c1
      Filter: random() <= Float64(0.2)
        TableScan: aggregate_test_100, partial_filters=[random() <= Float64(0.2)]

@haohuaijin
Copy link
Contributor

I found that even if we don't push down non-deterministic predicates, the results will also be wrong,
Because when we convert the Statement into a LogicalPlan, we will "dereferences" any aliases in the HAVING clause in the below section
https://github.com/apache/arrow-datafusion/blob/1dd887cdff518ede1d1de457f4b20c22a9c7228f/datafusion/sql/src/select.rs#L110-L122
before

select name, random() as r from test group by name having r > 0.2

after

select name, random() as r from test group by name having random() > 0.2

@alamb
Copy link
Contributor

alamb commented Oct 20, 2023

I think we should avoid pushing down non-deterministic predicates.

🤔 I was thinking about this

Would it be ok to push random() through a Sort?

It seems like these two queries would be equivalent

Non-pushdown

SELECT * from (SELECT * FROM table ORDER BY time) WHERE random<= 0.2;

Pushdown

SELECT * from (SELECT * FROM table WHERE random() <= 0.2) ORDER BY time

However, We definitely shouldn't push non deterministic (volatile) predicates on group keys through the GroupBy for the reasons explained in this ticket. Perhaps we can't push such predicate through Group or Join at all 🤔

@haohuaijin
Copy link
Contributor

Perhaps we can't push such predicate through Group or Join at all 🤔

When I use a subquery in place of the HAVING clause to meet the requirements in the ticket, like below

select t.c1, t.r from (select c1, random() as r from aggregate_test_100 group by c1) as t where t.r > 0.8

But I get the wrong result, because the push_down_filter push random() down, and when I disable the push_down_filter optimizer rule, I get the correct result.


So should we consider prohibiting volatile function alias using in HAVING clause, like

select name, random() as r from test group by name having r > 0.2

and never push down non deterministic (volatile) predicates on Aggregate to make the below sql correct

select t.c1, t.r from (select c1, random() as r from aggregate_test_100 group by c1) as t where t.r > 0.8

@alamb how do you think, If you don't mind, I will try to fix it.

@jonahgao
Copy link
Member

jonahgao commented Oct 21, 2023

@alamb @haohuaijin .

I think we can start with the projection plan first because:

  • It should be relatively simple.
  • It covers the case mentioned in this issue, where the child plan of the filter is initially a projection.
    Pushing random through the projection can also cause problems.
    If it can be fixed, then random will not be pushed further through the aggregatation in this case.

We can carefully consider other logical plans afterward.

@alamb
Copy link
Contributor

alamb commented Dec 12, 2023

This may be a dupe / related to #7976

@alamb
Copy link
Contributor

alamb commented Jun 14, 2024

I think we have fixed this issue in subsequent releases, so closing this ticket. Let's reopen / file a new ticket if we find something still is not working

@alamb alamb closed this as completed Jun 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants