Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid reading partitioned files if all information is present in Hive partition paths #14936

Open
deanm0000 opened this issue Mar 8, 2024 · 9 comments
Labels
A-io-parquet Area: reading/writing Parquet files A-io-partitioning Area: reading/writing (Hive) partitioned files enhancement New feature or an improvement of an existing feature performance Performance issues or improvements

Comments

@deanm0000
Copy link
Collaborator

Description

This is easy to see on the cloud but here's one way to see it locally in a MRE

Setup:

import os
os.environ['POLARS_VERBOSE']='1'
import polars as pl
from pathlib import Path
for i in range(10):
    Path(f"./test/folder={i}").mkdir(exist_ok=True, parents=True)
    pl.select(a=10).write_parquet(f"./test/folder={i}/0000.parquet")

df=pl.scan_parquet("./test/*/*.parquet")

Delete one of the files and then do filtered collect

Path("./test/folder=3/0000.parquet").unlink()
Path("./test/folder=3").rmdir()

df.filter(pl.col('folder')==1).collect()
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
hive partitioning: skipped 9 files, first file : test/folder=0/0000.parquet

We can see that the filtered collect works and it doesn't mind that one of the files is missing because it skipped it

Now let's try to see all the unique values of folder. Of course, there is no data about folder in any of the files, it only ever existed as part of the directory paths.

df.select(pl.col("folder").unique()).collect()
# FileNotFoundError: No such file or directory (os error 2)

Even if we don't delete the file here, there are no verbose messages indicating if it's reading any/all files. The reason I deleted a file before running this is to demonstrate that it tries to read (at least one) of the files. My point isn't to say we should be able to read from hives from which we deleted files. It was just the only simple way I could think of to show its attempting to read the files when it shouldn't need to. When you're doing this with a big cloud dataset then it becomes incredibly apparent that it is scanning all the files when it doesn't need to.

The request here is to make it not scan any of the files when it has all the info it needs from just the directory names.

@deanm0000 deanm0000 added the enhancement New feature or an improvement of an existing feature label Mar 8, 2024
@deanm0000 deanm0000 changed the title selecting unique column derived from hive folder causes scan of all files Prevent lazy collect of unique column values derived from hive folder to scan all files Mar 8, 2024
@deanm0000 deanm0000 added A-io Area: reading and writing data A-io-parquet Area: reading/writing Parquet files labels Mar 8, 2024
@deanm0000 deanm0000 changed the title Prevent lazy collect of unique column values derived from hive folder to scan all files Lazy query that only uses hive directories will read all files when only directory info is needed. Mar 18, 2024
@stinodego stinodego added A-io-partitioning Area: reading/writing (Hive) partitioned files and removed A-io Area: reading and writing data labels Mar 29, 2024
@deanm0000 deanm0000 changed the title Lazy query that only uses hive directories will read all files when only directory info is needed. Lazy query that only filters by hive directories will read all files when it should read none. Apr 2, 2024
@stinodego
Copy link
Contributor

I'm having trouble understanding the issue here. Is there a bug? Is it a performance issue?

@deanm0000
Copy link
Collaborator Author

@stinodego Sorry I jumped right into the weeds of proving the problem and so I didn't do a good job summarizing the problem.

The problem is that if we do a query where all the responsive info is in the filepaths, it will read every file when it shouldn't read any of them.

Suppose we have this directory structure

mydataset/fruit=apple/0000.parquet
mydataset/fruit=banana/0000.parquet

and then we do

df=pl.scan_parquet("mydataset/**/*.parquet")

The initial scan is going to get the list of files and directories immediately so that if we do

df.select(pl.col('fruit').unique()).collect()

then it should be able to give us ['apple', 'banana'] without reading any files. In fact, reading the files provides no help to completing that query because the column doesn't even exist in the files.

Despite there being no relevant data in any of the files, it reads all the files.

For any small/simple MRE you wouldn't notice that it's doing this which is why I have my first example deletes a file so that you can see that it is really trying to read it. For big data, especially on the cloud where we're subject to internet speeds and read charges, that is the difference between a free instant answer and one where we download GBs (or maybe TBs) of data from the cloud for no reason.

I don't know if this is a bug per se but it is definitely a performance issue.

To extend this a bit.

Even if I did something like

df.select(pl.col('fruit')).collect() # not with unique

then it should still only read the meta data of the files to get the number of rows rather than reading any of the underlying data.

@deanm0000
Copy link
Collaborator Author

@stinodego When the optimizer is looking at stats to determine what it needs to do, can it do a check similar to this pseudo-code?

if stats.min_value==stats.max_value & stats.null_count==0:
    return [stats.min_value] * stats.num_rows
else:
    return read_physical_source()

@stinodego stinodego changed the title Lazy query that only filters by hive directories will read all files when it should read none. Avoid reading partitioned files if all information is present in Hive partition paths Apr 17, 2024
@stinodego
Copy link
Contributor

Ok, I see what you're saying. So we should push down predicates somehow and determine whether all information is already present in the paths themselves rather than the files.

So this is a performance issue. It would definitely be great if we could implement this.

@stinodego stinodego added the performance Performance issues or improvements label Apr 17, 2024
@deanm0000
Copy link
Collaborator Author

I don't think it is a predicate pushdown but a projection issue, isn't it? That is to say, it's not filtering anything.

When it does the initial scan of a hive path, it parses the directories and create statistics max=min=[value] for all the hive directories. If those stats are available to the projection optimizer then generically it seems it should be able to check if max==min and null_count=0 then skip reading anything and just project that value. I would think that check would work even beyond hive partitions and would work for any highly repeated data. Am I off base or could that be a way to implement it?

@stinodego
Copy link
Contributor

I meant to refer to projection pushdown indeed.

@deanm0000
Copy link
Collaborator Author

@nameexhaustion care to take a look at this one? I ask because I see you crushing other hive related issues and there might be some economies of scale.

@nameexhaustion
Copy link
Collaborator

I don't think we can do this, we need to know how many rows the files have in order to project the correct number of rows, even if the projection is hive only.

@adamreeve
Copy link
Contributor

I don't think we can do this, we need to know how many rows the files have in order to project the correct number of rows, even if the projection is hive only.

In this example, a unique expression is applied to the column so the number of rows isn't needed. Other aggregation expressions like min and max that don't need to know the number of rows could similarly be optimised. I can see that pushing that information down into the Parquet reader to optimise this could be pretty awkward though.

Even if I did something like

df.select(pl.col('fruit')).collect() # not with unique

then it should still only read the meta data of the files to get the number of rows rather than reading any of the underlying data.

I'm pretty sure this is the current behaviour (as at commit 047e578e3b), I was just debugging a very similar example and the metadata of all files is read, but when it comes to reading data the projections vector is empty so no column data is read:

let columns = projection

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-io-parquet Area: reading/writing Parquet files A-io-partitioning Area: reading/writing (Hive) partitioned files enhancement New feature or an improvement of an existing feature performance Performance issues or improvements
Projects
None yet
Development

No branches or pull requests

4 participants