-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory #7180
ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory #7180
Conversation
ec0a3c2
to
99c51ca
Compare
aea5db3
to
2bb5686
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking great! I tested on some dummy datasets created by dask, and that seems to work nicely (although it is of course hard to be sure there is actually no IO going on during the factory)
Will still further check with the existing parquet tests that use _metadata
files.
Some non-inline comments:
-
Still need a
partitioning
keyword forparquet_dataset
? -
We might want to detect that, when given a directory name, this directory includes a
_metadata
file? (in some API, maybe this can be in thepyarrow.parquet
code) -
For a follow-up: handling of
_common_metadata
(just for inspecting the common schema) -
Regarding the statistics stored as an expression:
- Do we want to expose this in Python as well?
- Currently, the statistics are only available when the fragments were constructed from a
_metadata
file (or after querying once), I think? Do we want to allow to populate them on demand? - Statistics are only attached to a RowGroupInfo, and not to a fragment? Not needed for this PR, to be clear, but thinking more generally: we might want to enable that you can create a Fragment with custom statistics (eg in Kartothek ? @xhochy)
-
There are some tests needed for the new
write_metadata
capabilities (I can write/push some if you want). And the same forparquet_dataset
/ParquetDatasetFactory
.
I ran into a segfault if one of the files is not present (writing a reproducer / test case right now)
const std::shared_ptr<Expression>& predicate) { | ||
ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_)); | ||
ARROW_ASSIGN_OR_RAISE(auto row_groups, | ||
AugmentAndFilter(row_groups_, *predicate, reader.get())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that using SplitByRowGroup
currently always invokes IO? (in principle it could be filtered/split based on the already-read RowGroupInfo objects ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. It could be filtered, but only if the dataset was generated via the _metadata
file (or any explicit RowGroupInfo).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to at least try to avoid IO if the statistics are already available in the RowGroupInfo's (at least, from my understanding how this is used in RAPIDS, can check with them), but certainly not critical to do in this PR.
ParquetFileFormat, | ||
ParquetFileFragment, | ||
ParquetReadOptions, | ||
Partitioning, | ||
PartitioningFactory, | ||
RowGroupInfo, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should not expose this publicly? (is there a reason you would want to use this directly?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's required for ParquetFileFragment.row_groups
. I could change it to only return a list of integers.
I don't get a segault for the test you added, just a wrong exception being throw. > raise IOError(errno, message)
E FileNotFoundError: [Errno 2] Failed to open local file '/tmp/pytest-of-fsaintjacques/pytest-44/test_parquet_dataset_factory_i0/test_parquet_dataset/43bd0bd1002048e0b9bbc730f7614d18.parquet'. Detail: [errno 2] No such file or directory
pyarrow/error.pxi:98: FileNotFoundError |
A FileNotFoundError sounds good (the ValueError I added in the tests was just a bit random). Will rebuild locally to see if I still get this |
I'm curious about the exception/segfault. If you can reproduce, feel free to share. |
It seems this failure doesn't happen all the time for me. Running it a few times, I see also see the FileNotFoundError, but in 1 out of 2 cases, approximately. Now, when running it on an actual example in the interactive terminal (from a small dataset where I actually deleted one of the files), I consistently see the segfault:
I get a different stacktrace when running the tests, something about the "unlink", so it might be that the way I remove the file in the test is not very robust / is not similar to deleting a file in the file browser. |
3fa0657
to
fd5a4a3
Compare
- Implement ParquetDatasetFactory - Replace ParquetFileFormat::GetRowGroupFragments with ParquetFileFragment::SplitByRowGroup (and the corresponding bindings). - Add various optimizations, notably in ColumnChunkStatisticsAsExpression. - Consolidate RowGroupSkipper logic in ParquetFileFragment::GetRowGroupFragments. - Ensure FileMetaData::AppendRowGroups checks for schema equality. - Implement dataset._parquet_dataset
fd5a4a3
to
8b301d3
Compare
8b301d3
to
3b85adc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great, thanks for doing this!
A few comments:
int num_row_groups_; | ||
int64_t rows_skipped_; | ||
}; | ||
|
||
class ParquetScanTaskIterator { | ||
public: | ||
static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing here can fail so we can just make the constructor public
std::vector<int> column_projection_; | ||
RowGroupSkipper skipper_; | ||
|
||
FileSource source_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For debug purposes, this is extremely useful to introspect the object.
python/pyarrow/_dataset.pyx
Outdated
""" | ||
Split the fragment in multiple fragments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split the fragment in multiple fragments. | |
Split the fragment into multiple fragments. |
Could you replicate this comment in c++?
Result<FragmentVector> ParquetFileFragment::SplitByRowGroup( | ||
const std::shared_ptr<Expression>& predicate) { | ||
std::vector<RowGroupInfo> row_groups; | ||
if (HasCompleteMetadata()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorisvandenbossche this is now lazy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorisvandenbossche this is now lazy.
Cool, thanks!
dcf2a68
to
100a7b0
Compare
100a7b0
to
cca91b3
Compare
8df15e3
to
29f44d9
Compare
This patch adds the option to create a dataset of parquet files via `ParquetDatasetFactory`. It reads a single `_metadata` parquet file created by systems like Dask and Spark, extract the metadata of all fragments from said file, and populate each fragment with extra statistics for each columns. The `_metadata` file can be created via `pyarrow.parquet.write_metadata`. When the Scan operation is materialised, the row groups of the ParquetFileFragment are elided with the statistics _before_ reading the original file metadata. If no RowGroups from a file matches the predicate of the Scan, the file will not be read (including the metadata footer), thus avoiding expensive IO calls. The optimisation benefits are inversely proportional to the predicate's selectivity. ```python # With the plain FileSystemDataset %timeit t = nyc_tlc_fs_dataset.to_table(filter=da.field('total_amount') > 1000.0, ...) 1.55 s ± 26 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) # With ParquetDatasetFactory %timeit t = nyc_tlc_parquet_dataset.to_table(filter=da.field('total_amount') > 1000.0, ...) 336 ms ± 17.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) ``` - Implement ParquetDatasetFactory - Replace ParquetFileFormat::GetRowGroupFragments with ParquetFileFragment::SplitByRowGroup (and the corresponding bindings). - Add various optimizations, notably in ColumnChunkStatisticsAsExpression. - Consolidate RowGroupSkipper logic in ParquetFileFragment::ScanFile - Ensure FileMetaData::AppendRowGroups checks for schema equality. - Implement dataset._parquet_dataset Closes apache#7180 from fsaintjacques/ARROW-8062-parquet-dataset-metadata Lead-authored-by: François Saint-Jacques <[email protected]> Co-authored-by: Joris Van den Bossche <[email protected]> Signed-off-by: François Saint-Jacques <[email protected]>
Follow-up on ARROW-8062 (#7180) Closes #7345 from jorisvandenbossche/ARROW-8946-metadata-write Authored-by: Joris Van den Bossche <[email protected]> Signed-off-by: François Saint-Jacques <[email protected]>
This patch adds the option to create a dataset of parquet files via
ParquetDatasetFactory
. It reads a single_metadata
parquet file created by systems like Dask and Spark, extract the metadata of all fragments from said file, and populate each fragment with extra statistics for each columns. The_metadata
file can be created viapyarrow.parquet.write_metadata
.When the Scan operation is materialised, the row groups of the ParquetFileFragment are elided with the statistics before reading the original file metadata. If no RowGroups from a file matches the predicate of the Scan, the file will not be read (including the metadata footer), thus avoiding expensive IO calls. The optimisation benefits are inversely proportional to the predicate's selectivity.
ParquetFileFragment::SplitByRowGroup (and the corresponding bindings).