Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support user defined ParquetAccessPlan in ParquetExec, validation to ParquetAccessPlan::select #10813

Merged
merged 6 commits into from
Jun 9, 2024

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Jun 6, 2024

This PR looks big but the 85% of it is tests and documentation

Which issue does this PR close?

Closes #9929

Rationale for this change

Many query engines / use cases have some sort of a specialized index for data stored in parquet. This index can be used to determine which row groups / selections within a file are needed

However, the DataFusion ParquetExec has no way for users to pass this information in. Instead it tries to prune row groups based on the min/max statistics and other information in the file's metadata.

This PR makes it possible for users to pass in a ParquetAccessPlan added in #10738 to ParquetExec with a starting plan, which is then further pruned based on the file's metadata.

What changes are included in this PR?

  1. Allow users to pass in a ParquetAccessPlan for each PartitionedFile read by ParquetExec
  2. Add error checking to ParquetAccessPlan now that it can be specified by users
  3. Document how this works
  4. Add tests for this new API

Are these changes tested?

Yes, new tests are added

Are there any user-facing changes?

a new API

Here is a complete end to end example of using this API: #10701

@github-actions github-actions bot added the core Core DataFusion crate label Jun 6, 2024
@alamb alamb force-pushed the alamb/user_access_plan branch from 3e839ef to 38fa45a Compare June 6, 2024 19:13
@alamb alamb changed the title Allow ParquetAccessPlan to be passed in to ParquetExec, add validation to ParquetAccessPlan::select Support user defined ParquetAccessPlan in ParquetExec, validation to ParquetAccessPlan::select Jun 6, 2024
@@ -182,6 +183,11 @@ impl ParquetAccessPlan {
/// is returned for *all* the rows in the row groups that are not skipped.
/// Thus it includes a `Select` selection for any [`RowGroupAccess::Scan`].
///
/// # Errors
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since users can now provide a ParquetAccessPlan it is important to do validation on the contents.

While technically we could avoid doing this validation when the selections came from the page pruning, I think it would be a good check to have to catch future bugs rather than subtle wrong results so I chose to always validate

@@ -366,7 +410,8 @@ mod test {
RowSelector::select(10),
// selectors from the second row group
RowSelector::select(5),
RowSelector::skip(7)
RowSelector::skip(7),
Copy link
Contributor Author

@alamb alamb Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

turns out that some of the existing unit tests were actually invalid. However, I think the issues are actually test problem, not actual code problems. All the actual parquet reader tests passed

@@ -139,7 +139,8 @@ impl FileOpener for ParquetOpener {
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
let access_plan = ParquetAccessPlan::new_all(rg_metadata.len());
let access_plan =
Copy link
Contributor Author

@alamb alamb Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the required plumbing, which I am quite pleased with -- it is quite straightforward now

let mut finder = MetricsFinder { metrics: None };
accept(physical_plan.as_ref(), &mut finder).unwrap();
let parquet_metrics = finder.metrics.unwrap();
let parquet_metrics =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulled into a new file so I could reuse it

///
/// For a complete example, see the [`parquet_index_advanced` example]).
///
/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_index_advanced.rs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be added in #10701

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW #10701 (example of how to use this API) is ready for review

}

// validate all Selections
for (idx, (rg, rg_meta)) in self
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new checking added as users can pass in ParquetAccessPlan and the semantics are quite subtle

Copy link
Contributor

@Jefffrey Jefffrey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good explanation, makes sense to me coming in with no context on the original issue 👍

Copy link
Contributor Author

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Jefffrey for your review

@alamb alamb merged commit 9503456 into apache:main Jun 9, 2024
23 checks passed
@alamb alamb deleted the alamb/user_access_plan branch June 9, 2024 20:06
@alamb
Copy link
Contributor Author

alamb commented Jun 9, 2024

BTW I am happy to make additional corrections as follow on PRs if anyone has additional notes

cc @advancedxy @thinkharderdev @crepererum @NGA-TRAN and @Ted-Jiang @xinlifoobar and @hengfeiyang who reviewed the original PR to create ParquetAccessPlan -- this PR adds the ability to specify a starting access plan for a scan

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alamb, this is great work. I think the code is pretty good.

Sorry for being late to the party, left some minor style issues comment.

@@ -348,14 +384,22 @@ mod test {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
// select / skip all 20 rows in row group 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: to be consistent with L427 in this file, it would be better to call it as
specifies all 20 rows in row group .

fn test_invalid_too_few() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 12 rows, but row group 1 has 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: -> specifies 12 rows?

I think the select is referred to as selection, which the following code also includes a skip.

fn test_invalid_too_many() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 22 rows, but row group 1 has only 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Comment on lines +159 to +160
/// The `ParquetExec` will try and further reduce any provided
/// `ParquetAccessPlan` further based on the contents of `ParquetMetadata` and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: there are two further in this sentence. How about:

/// The `ParquetExec` will try and reduce any provided
/// `ParquetAccessPlan` further based on the contents ...


// check row group count matches the plan
return Ok(access_plan.clone());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: is it better to add a logging in the else branch?

@alamb
Copy link
Contributor Author

alamb commented Jun 12, 2024

Thanks @alamb, this is great work. I think the code is pretty good.

Sorry for being late to the party, left some minor style issues comment.

No worries -- thank you for the comments. I will make a PR to address them shortly

@alamb
Copy link
Contributor Author

alamb commented Jun 13, 2024

PR with comments: #10896

phillipleblanc pushed a commit to spiceai/datafusion that referenced this pull request Jul 8, 2024
… to `ParquetAccessPlan::select` (apache#10813)

* Allow `ParquetAccessPlan` to be passed in to `ParquetExec`, add validation to ParquetAccessPlan::select

* Add test for filtering and user supplied access plan

* fix on windows

* Apply suggestions from code review

Co-authored-by: Jeffrey Vo <[email protected]>

---------

Co-authored-by: Jeffrey Vo <[email protected]>
phillipleblanc added a commit to spiceai/datafusion that referenced this pull request Jul 8, 2024
… to `ParquetAccessPlan::select` (apache#10813) (#8)

* Allow `ParquetAccessPlan` to be passed in to `ParquetExec`, add validation to ParquetAccessPlan::select

* Add test for filtering and user supplied access plan

* fix on windows

* Apply suggestions from code review



---------

Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Jeffrey Vo <[email protected]>
findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
… to `ParquetAccessPlan::select` (apache#10813)

* Allow `ParquetAccessPlan` to be passed in to `ParquetExec`, add validation to ParquetAccessPlan::select

* Add test for filtering and user supplied access plan

* fix on windows

* Apply suggestions from code review

Co-authored-by: Jeffrey Vo <[email protected]>

---------

Co-authored-by: Jeffrey Vo <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

API in ParquetExec to pass in RowSelections to ParquetExec (enable custom indexes, finer grained pushdown)
3 participants