-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dynamic filter pushdown for selective broadcast joins #103
Conversation
@mbasmanova has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator. |
683db7b
to
e3ef9a1
Compare
@mbasmanova has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator. |
534e614
to
13295e2
Compare
17a0b1a
to
b89dcec
Compare
@mbasmanova has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator. |
b89dcec
to
2c86aa8
Compare
@mbasmanova has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator. |
You might prefer an API contract that has the filters take effect on the next batch, not the next split. This is for the case where the filter replaces the whole operator, you need to know when it takes effect.
|
I was thinking about that. What would it take to allow changing ScanSpec mid-split? |
You would need to keep a pointer to the ScanSpec of the current data source and you could then install the filter right there. Then you would call the predicate ordering function of ScanSpec. The next read of the top level struct would probably have the new filter as first. This could work, it would find its place soon enough. Would have to step through the mechanics but I don’t think there is anything any particular difficulty.
|
fbafc87
to
931e1ef
Compare
@oerling Orri, thank you for reviewing. I appreciate the feedback. I updated the PR to apply the pushed down filter to the next batch vs. next split. With this we can make a follow-up PR to turn the join into a no-op under certain conditions after pushing filter down. Would you take another look? |
@mbasmanova has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator. |
When filters are installed, it is not obvious if cached filtering results with dictionaries get reset. This will be somewhat rare but could happen if many joins add filters to the same dictionary encoded column or if there already is a non-join filter on the dict. If there was no filter before pushdown, it may be that the dictionary is loaded but the filter cache is not inited, which could crash.
The ScanSpec does not reference the reader. But it can have a version number that the dictionary readers will then check. Just a ‘changed’ flag would do.
|
Maybe in the documentation one could say colocated instead of broadcast since this also applies too copartitioned cases.
|
931e1ef
to
2303d79
Compare
@mbasmanova has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator. |
1 similar comment
@mbasmanova has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator. |
@oerling Orri, thanks for bring up the issue with filter caches. I added SelectiveColumnReader::resetFilterCaches() and DwrfRowReader::columnReader() methods and updated HiveDataSource::addDynamicFilter to use these:
|
@mbasmanova merged this pull request in a6b6dca. |
Summary: Pull Request resolved: facebook/sapling#103 Automate maintenance of the edenscm_* github actions yamls Add job file and name options and support for the Rust install section Reviewed By: fanzeyi Differential Revision: D34044422 fbshipit-source-id: 7d5f07d37bab1eff5de30a88e710dbf7479ca192
* Update the groupid to io.gluten * Update Gluten name and version in all pom.xml * Update gluten picture
This commit introduces a mechanism to push down dynamically generated filters from one operator to another operator upstream. The new mechanism is used to push down a filter on the join keys from a highly-selective broadcast (or simply colocated) hash join into probe-side table scan. This allows the ORC reader to skip whole files and/or row groups based on the extra filter.
The workflow is:
Individual changes that make up the new mechanism are: