Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-8062: [C++][Dataset] Implement ParquetDatasetFactory #7180

Conversation

fsaintjacques
Copy link
Contributor

@fsaintjacques fsaintjacques commented May 14, 2020

This patch adds the option to create a dataset of parquet files via ParquetDatasetFactory. It reads a single _metadata parquet file created by systems like Dask and Spark, extract the metadata of all fragments from said file, and populate each fragment with extra statistics for each columns. The _metadata file can be created via pyarrow.parquet.write_metadata.

When the Scan operation is materialised, the row groups of the ParquetFileFragment are elided with the statistics before reading the original file metadata. If no RowGroups from a file matches the predicate of the Scan, the file will not be read (including the metadata footer), thus avoiding expensive IO calls. The optimisation benefits are inversely proportional to the predicate's selectivity.

# With the plain FileSystemDataset
%timeit t = nyc_tlc_fs_dataset.to_table(filter=da.field('total_amount') > 1000.0, ...)                                                                                                                                      
1.55 s ± 26 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# With ParquetDatasetFactory
%timeit t = nyc_tlc_parquet_dataset.to_table(filter=da.field('total_amount') > 1000.0, ...)                                                                                                                                 
336 ms ± 17.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
  • Implement ParquetDatasetFactory
  • Replace ParquetFileFormat::GetRowGroupFragments with
    ParquetFileFragment::SplitByRowGroup (and the corresponding bindings).
  • Add various optimizations, notably in ColumnChunkStatisticsAsExpression.
  • Consolidate RowGroupSkipper logic in ParquetFileFragment::ScanFile
  • Ensure FileMetaData::AppendRowGroups checks for schema equality.
  • Implement dataset._parquet_dataset

@github-actions
Copy link

@fsaintjacques fsaintjacques force-pushed the ARROW-8062-parquet-dataset-metadata branch from ec0a3c2 to 99c51ca Compare May 14, 2020 19:01
@fsaintjacques fsaintjacques requested a review from xhochy May 18, 2020 18:06
@fsaintjacques fsaintjacques force-pushed the ARROW-8062-parquet-dataset-metadata branch 6 times, most recently from aea5db3 to 2bb5686 Compare May 18, 2020 20:33
Copy link
Member

@jorisvandenbossche jorisvandenbossche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking great! I tested on some dummy datasets created by dask, and that seems to work nicely (although it is of course hard to be sure there is actually no IO going on during the factory)

Will still further check with the existing parquet tests that use _metadata files.

Some non-inline comments:

  • Still need a partitioning keyword for parquet_dataset ?

  • We might want to detect that, when given a directory name, this directory includes a _metadata file? (in some API, maybe this can be in the pyarrow.parquet code)

  • For a follow-up: handling of _common_metadata (just for inspecting the common schema)

  • Regarding the statistics stored as an expression:

    • Do we want to expose this in Python as well?
    • Currently, the statistics are only available when the fragments were constructed from a _metadata file (or after querying once), I think? Do we want to allow to populate them on demand?
    • Statistics are only attached to a RowGroupInfo, and not to a fragment? Not needed for this PR, to be clear, but thinking more generally: we might want to enable that you can create a Fragment with custom statistics (eg in Kartothek ? @xhochy)
  • There are some tests needed for the new write_metadata capabilities (I can write/push some if you want). And the same for parquet_dataset/ParquetDatasetFactory.

I ran into a segfault if one of the files is not present (writing a reproducer / test case right now)

const std::shared_ptr<Expression>& predicate) {
ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
ARROW_ASSIGN_OR_RAISE(auto row_groups,
AugmentAndFilter(row_groups_, *predicate, reader.get()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that using SplitByRowGroup currently always invokes IO? (in principle it could be filtered/split based on the already-read RowGroupInfo objects ?

Copy link
Contributor Author

@fsaintjacques fsaintjacques May 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. It could be filtered, but only if the dataset was generated via the _metadata file (or any explicit RowGroupInfo).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to at least try to avoid IO if the statistics are already available in the RowGroupInfo's (at least, from my understanding how this is used in RAPIDS, can check with them), but certainly not critical to do in this PR.

ParquetFileFormat,
ParquetFileFragment,
ParquetReadOptions,
Partitioning,
PartitioningFactory,
RowGroupInfo,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should not expose this publicly? (is there a reason you would want to use this directly?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's required for ParquetFileFragment.row_groups. I could change it to only return a list of integers.

@fsaintjacques
Copy link
Contributor Author

I don't get a segault for the test you added, just a wrong exception being throw.

>   raise IOError(errno, message)
E   FileNotFoundError: [Errno 2] Failed to open local file '/tmp/pytest-of-fsaintjacques/pytest-44/test_parquet_dataset_factory_i0/test_parquet_dataset/43bd0bd1002048e0b9bbc730f7614d18.parquet'. Detail: [errno 2] No such file or directory

pyarrow/error.pxi:98: FileNotFoundError

@jorisvandenbossche
Copy link
Member

I don't get a segault for the test you added, just a wrong exception being throw.

A FileNotFoundError sounds good (the ValueError I added in the tests was just a bit random). Will rebuild locally to see if I still get this

@fsaintjacques
Copy link
Contributor Author

I'm curious about the exception/segfault. If you can reproduce, feel free to share.

@jorisvandenbossche
Copy link
Member

It seems this failure doesn't happen all the time for me. Running it a few times, I see also see the FileNotFoundError, but in 1 out of 2 cases, approximately.

Now, when running it on an actual example in the interactive terminal (from a small dataset where I actually deleted one of the files), I consistently see the segfault:

In [1]: import pyarrow.dataset as ds                                                                                                                                                                               

In [2]: dataset = ds.parquet_dataset("/tmp/tmp9qt6cph5/_metadata")                                                                                                                                                 

In [3]: dataset.to_table()                                                                                                                                                                                         
terminate called after throwing an instance of 'std::system_error'
  what():  Invalid argument
Aborted (core dumped)

I get a different stacktrace when running the tests, something about the "unlink", so it might be that the way I remove the file in the test is not very robust / is not similar to deleting a file in the file browser.

@fsaintjacques fsaintjacques force-pushed the ARROW-8062-parquet-dataset-metadata branch from 3fa0657 to fd5a4a3 Compare May 20, 2020 17:06
- Implement ParquetDatasetFactory
- Replace ParquetFileFormat::GetRowGroupFragments with
  ParquetFileFragment::SplitByRowGroup (and the corresponding bindings).
- Add various optimizations, notably in ColumnChunkStatisticsAsExpression.
- Consolidate RowGroupSkipper logic in ParquetFileFragment::GetRowGroupFragments.
- Ensure FileMetaData::AppendRowGroups checks for schema equality.
- Implement dataset._parquet_dataset
@fsaintjacques fsaintjacques force-pushed the ARROW-8062-parquet-dataset-metadata branch from fd5a4a3 to 8b301d3 Compare May 20, 2020 17:11
@fsaintjacques fsaintjacques force-pushed the ARROW-8062-parquet-dataset-metadata branch from 8b301d3 to 3b85adc Compare May 20, 2020 18:27
Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great, thanks for doing this!

A few comments:

int num_row_groups_;
int64_t rows_skipped_;
};

class ParquetScanTaskIterator {
public:
static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing here can fail so we can just make the constructor public

std::vector<int> column_projection_;
RowGroupSkipper skipper_;

FileSource source_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For debug purposes, this is extremely useful to introspect the object.

"""
Split the fragment in multiple fragments.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Split the fragment in multiple fragments.
Split the fragment into multiple fragments.

Could you replicate this comment in c++?

Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
const std::shared_ptr<Expression>& predicate) {
std::vector<RowGroupInfo> row_groups;
if (HasCompleteMetadata()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorisvandenbossche this is now lazy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorisvandenbossche this is now lazy.

Cool, thanks!

@fsaintjacques fsaintjacques force-pushed the ARROW-8062-parquet-dataset-metadata branch 2 times, most recently from dcf2a68 to 100a7b0 Compare May 21, 2020 15:40
@fsaintjacques fsaintjacques force-pushed the ARROW-8062-parquet-dataset-metadata branch from 100a7b0 to cca91b3 Compare May 21, 2020 19:12
@fsaintjacques fsaintjacques marked this pull request as ready for review May 21, 2020 21:20
@fsaintjacques fsaintjacques force-pushed the ARROW-8062-parquet-dataset-metadata branch from 8df15e3 to 29f44d9 Compare May 25, 2020 18:42
pprudhvi pushed a commit to pprudhvi/arrow that referenced this pull request May 26, 2020
This patch adds the option to create a dataset of parquet files via `ParquetDatasetFactory`. It reads a single  `_metadata` parquet file created by systems like Dask and Spark, extract the metadata of all fragments from said file, and populate each fragment with extra statistics for each columns. The `_metadata` file can be created via `pyarrow.parquet.write_metadata`.

When the Scan operation is materialised, the row groups of the ParquetFileFragment are elided with the statistics _before_ reading the original file metadata. If no RowGroups from a file matches the predicate of the Scan, the file will not be read (including the metadata footer), thus avoiding expensive IO calls. The optimisation benefits are inversely proportional to the predicate's selectivity.

```python
# With the plain FileSystemDataset
%timeit t = nyc_tlc_fs_dataset.to_table(filter=da.field('total_amount') > 1000.0, ...)
1.55 s ± 26 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# With ParquetDatasetFactory
%timeit t = nyc_tlc_parquet_dataset.to_table(filter=da.field('total_amount') > 1000.0, ...)
336 ms ± 17.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```

- Implement ParquetDatasetFactory
- Replace ParquetFileFormat::GetRowGroupFragments with
  ParquetFileFragment::SplitByRowGroup (and the corresponding bindings).
- Add various optimizations, notably in ColumnChunkStatisticsAsExpression.
- Consolidate RowGroupSkipper logic in ParquetFileFragment::ScanFile
- Ensure FileMetaData::AppendRowGroups checks for schema equality.
- Implement dataset._parquet_dataset

Closes apache#7180 from fsaintjacques/ARROW-8062-parquet-dataset-metadata

Lead-authored-by: François Saint-Jacques <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: François Saint-Jacques <[email protected]>
fsaintjacques pushed a commit that referenced this pull request Jun 3, 2020
Follow-up on ARROW-8062 (#7180)

Closes #7345 from jorisvandenbossche/ARROW-8946-metadata-write

Authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: François Saint-Jacques <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants