-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parquet Scan Filter #1191
Comments
I think it would be best to implement in DataFusion if at all possible -- this logic and need is not at all specific to IOx and so the community would benefit (and also likely help maintain this) if it were in IOx One thing to consider for the strategy described is that it may actually slow down parquet scanning for non selective predicates (e.g. predicates that filter out none of the rows). Another thing is that the order in which the predicates are evaluated may make a difference (e.g. if there is a predicate that filters out all but a few rows, and a predicate that doesn't filter any, applying the predicate that filters out most of the rows first is likely to be faster I think the challenge of non-selective predicates and order should not be handled by the actual parquet reader (that is the query engine should specify what predicates and in what order to apply them). |
In the Impala implementation, there was negligible impact on unsorted/random data https://blog.cloudera.com/speeding-up-select-queries-with-parquet-page-indexes/. If parquet-rs can correctly store and retrieve the column sort statistics, that would help with the evaluation decision of whether to use a page index or not. I think the predicate evaluation would best live in parquet as it can get complex for some pages. So datafusion and other processing engine implementing the logic on their own would repeat this non-trivial work. |
Agreed, I was somewhat hedging here 😆
That's a great link 👍, and yeah the ability to prune on aggregate statistics is very dependent on the sort order.
I was somewhat hoping to avoid using the page index, in part for this reason, as it would require pushing predicate evaluation into the parquet crate, but also because of the small matter that we don't currently read or write it 😅 I believe in most cases a selection mask will perform similarly or significantly better, allowing skipping pages and even runs within pages, whilst also not requiring predicate evaluation logic to leak into the parquet crate? |
I agree that predicate evaluation should be done in parquet -- what @e-dard did in the IOx ReadBuffer was to implement predicates that could be evaluated very fast (e.g. https://github.com/influxdata/influxdb_iox/blob/main/read_buffer/src/row_group.rs#L1517-L1524 I recommend a similar (or the same!) approach in parquet |
Shall I create a separate ticket in that case for directly evaluating predicates against encoded data, I think the two problems are separable? |
I probably don't fully understand what "evaluating predicates against encoded data" means compared to what you have proposed in this ticket. Thus I am not sure about the need for a second ticket |
In this ticket I propose passing a bitmask down to the scan, the parquet crate would have no involvement in generating this mask, nor would it understand the predicates involved in generating it. Think of it like the take kernel, you give it a mask and it returns those rows, but it has no idea why the query engine is requesting those rows. This would mean streaming out data as an arrow RecordBatch (or Array) in order to evaluate predicates, however, my hope is with the dictionary preservation this will be relatively cheap, and successive predicate evaluations will retrieve successively less rows. A further optimisation would then be to actually evaluate the predicates directly on the underlying parquet data, without first decoding to an arrow representation. This is what I'm wondering if I should create a ticket for? This still needs the "take" support in order for the generated masks to be useful, but it likely will speed up their generation vs decoding to an arrow representation first |
I think that makes sense as a follow up item; 👍 |
We can also explore lazy materialization, that is, only decode & decompress (or even pay the IO cost), when a Parquet page is actually needed. I think this is especially useful when many columns are selected and only a few very selective predicates are applied on some of the columns. It in some sense is similar to page skipping based on column index, but more powerful. It'd be very nice to implement row group and page level skipping in arrow-rs also, so that engines don't have to duplicate the work. |
Definitely. Irrespective of how the pruning takes places - whether a scan mask as proposed here, or information derived from a page index, or some other mechanism, there's definitely no point doing IO for pages that aren't going to be used. 👍 My hope with the scan masks is to provide an API that allows query engines to express what rows they want, without having to worry about the mechanics of what pages, etc... to fetch and decode for those rows, what statistics are available, how the writer interpreted the ambiguous parquet specifications, etc... This would leave the implementation in arrow-rs free to choose the most efficient way to get the requested rows of data. As an added benefit this approach would integrate well with the async plumbing I stubbed out in #1154, which needs to know what data is going to be needed ahead of time. This scan mask approach is heavily geared to the use-case of IOx where the page index is likely to not be very helpful, but they're definitely complementary approaches. I'd imagine it is possible to construct the partial scan logic in such a way that it works well for both. Ultimately the page index is just a way to generate a low granularity scan mask 😄 All that is to say, I agree entirely with handling these concerns in arrow-rs as you suggest. |
Is this scan mask some sort of bitmap alongside an Arrow vector? or it is associated with a Parquet column chunk. You can also check out this paper: Filter Representation in Vectorized Query Execution, to compare that against a selection vector based approach. AFAIK the latter may yield better performance due to no branching cost. |
I mean I was just planning to use an arrow BooleanArray that you'd pass in... I'll give the linked paper a read 👍 |
Just wanted to chip in and say that implementing PageIndex would be great even if parquet-rs doesn't use it internally for predicate push down as it can be used by other engines/implementation that do. In analytics systems these files get passed around between different systems. I'm currently rewriting a project in Java, to Rust, that uses parquet 1.11.1 (so no hadoop or Impala or Spark) which uses predicate pushdown using ColumnIndex and OffsetIndex. I'm using parquet-rs to write the file and datafusion to read the file back, unfortunately the current system can't read the file due the absent of the PageIndex. Having current systems to be able to read parquet-rs files would be a boon for backward/forward compatibly. |
@ParadoxShmaradox makes sense. I can try to find some time to bash something out in the next few weeks, time permitting. If you were willing to assist that might speed things along, but if not, no worries 😅 |
@tustvold Sure! Hit me up and I can test file generation in rust and read in Java, I can probably share the files from both implementations. |
Filed #1705 to track adding |
I think the high-level work to support this is now complete, so marking this as done. There is definitely further improvements to be made, but that can be tracked separately |
🎉 |
I hope to work on this after finishing up dictionary preservation (#1180) and async (#1154), creating now for feedback and visibility
Background
IOx is a columnar, time-series database that uses parquet for its persistence format. The data model consists of a number of relatively low cardinality tag columns, a timestamp column and a number of value columns. Data is stored in order of column cardinality, with the lowest cardinality columns first.
Almost all IOx queries contain highly selective predicates on tag columns, and therefore predicate pushdown is very important to IOx's performance story.
Currently the parquet crate only supports predicate pushdown in the form of row group pruning, that is using statistics to skip reading entire row groups. Unfortunately this is only very effective if a file is both large enough to contain multiple row groups, and the predicate is on a column early in the sort order.
An obvious next step might be to perform page-pruning, that is using page metadata to skip pages. However, the page metadata is typically stored inside the page header, and so you likely end up reading the entire page from the backing store and decompressing it only to then throw it away, which is not ideal.
There is an optional extension to parquet called PageIndex that tries to address this. However, I'm not aware of any systems outside of Impala that support this functionality, and it is still constrained by the page granularity and the limitations of relying on aggregate statistics.
Proposal
Instead I would like to propose adding functionality to allow providing a row selection mask when scanning a column chunk. When provided, only values (null or otherwise) with a corresponding set bit in the selection mask will be returned in the output. For simplicity nested data will not be supported, at least initially.
This would allow IOx, or potentially DataFusion depending on where the logic for this eventually sits, to do the following for pushing down predicates, in addition to the current row group filtering:
For each column in an order determined by some heuristic:
Once all the predicates that can be evaluated in this way have been evaluated, a final scan can be performed with the final selection mask across the full set of projected columns.
Potential Problems
I'm currently aware of the following potential problems with this approach, but let me know if I've missed something:
Despite this I am optimistic about the feasibility of this approach, as it is very similar to the one @e-dard has successfully applied to IOx's read buffer - an in-memory, read-only, queryable data structure.
The text was updated successfully, but these errors were encountered: