-
Notifications
You must be signed in to change notification settings - Fork 224
Allow parquet column chunks to be deserialized to multiple arrays (e.g. iterator) #768
Comments
It isn't so much decompressed column chunks being smaller than a corresponding array, as a single decompressed page within a column chunk being smaller than an array of the entire column chunk. Ideally you would stream pages from the column chunk, decompressing them as needed, and only keeping them around until all the values have been yielded from that page. |
I have been thinking about this. I haven't found this analysis anywhere else before, so will just leave it here for posterity. data is split in row groups to allow for group pruning at the expense of lower compression - larger groups allow higher compression but less possible prunes, smaller groups allow for more pruning but can't be compressed as much. We could separate this analysis between dict-encoded and non-dict encoded, but the overall conclusion is the same: the more values we pack to an compressor, the higher the maximum possible compression gains. For dict-encoded this is because dicts aren't shared between row groups. The potential benefit to use more pages would be to allow page pruning. However, parquet itself deprecated writing page statistics since 2018. Thus, I do not really see how more pages per column chunk help us. From this perspective, imo we should assume the optimal storage conditions: reading a column chunk with a single page (non-dict-encoded) or a column chunk with 2-3 pages (dict page + indices page + fallback page for groups with more than Strategy 1One strategy is to load a parquet row group in "mini-batches" of size
Strategy 2An alternative load strategy is to load column by column:
Consequences of eachLet's now see how this pans out in two different data carrier patterns, "stream of chunks", Strategy 1 on stream of chunks
Strategy 1 on vector of streams
Strategy 2 on stream of chunks
Strategy 2 on vector of streams
AnalysisFrom this, my understanding is that the core difference here is between
Let's do some math and see the difference. Defining
Defining
The ratio
which is the important formula here. We can now take two limits: Low number of columnsFor
For highly-encoded columns, say
For low-encoded columns, say
showing that S1 requires less memory (~40%) Loading high number of columnsFor
For highly-encoded columns, say
I.e. S1 requires less memory (~66%) For low-encoded columns, say
I.e. S1 requires more memory (~20%) ConclusionsMy conclusion atm is that Strategy 1 dominates in most cases and it should be the default (i.e. the goal of this PR). IMO this is interesting because it just shows that it is better to read the whole (after projection pushdown) row group into memory, decompress it, and then iterate on its individual items. |
This is only partly true, the other motivator is that the row group is typically unit of IO and distributed parallelism. Separate row groups are not all that dissimilar from separate files, with all the costs and benefits thereof.
Pages are significantly smaller than column chunks, row groups may be up to a GB compressed, whereas pages (other than dictionary pages) are typically closer to 8KB compressed. See the end of this document. So another benefit is reducing the amount of uncompressed data that must be buffered in the read path at any point in time.
That is because it now writes them into a different data structure called PageIndex that is slightly less useless. That being said I'm not aware of many systems that make use of this...
Regardless of the utility of multiple pages per column chunk, if interoperability with other tooling is important, multiple small pages per column chunk is likely to be an assumption of other tooling. FWIW I agree that page pruning based on statistics is not particularly effective for most use-cases, I have a proposal for selection masks in |
I now have a fix for this on my local, I am just cleaning it up before a PR. It reads the pages as they are needed. |
As described by @tusvold here, there are important cases where it is more advantageous to deserialize a column chunk into more than one array.
Specifically, the main issue is that a decompressed parquet column chunk is still much smaller than its corresponding arrow array. Therefore, on large column chunks, the user may prefer to deserialize only part of the decompressed column chunk into arrow while still holding the chunk in memory.
The equation is something along the lines of
To allow users to tradeoff memory usage and data fragmentation (of each array), we should offer something like
that when
length: None
we recover our single array, whenlength: Some(n)
, we iterate over the chunk and outputn
items at the time.Thanks again to @tusvold for this important insight.
The text was updated successfully, but these errors were encountered: