-
Notifications
You must be signed in to change notification settings - Fork 839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Preserve dictionary encoding when decoding parquet into Arrow arrays, 60x perf improvement (#171) #1180
Conversation
58da0f8
to
8b0adeb
Compare
@@ -0,0 +1,95 @@ | |||
// Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved from definition_levels.rs
@@ -351,39 +346,6 @@ impl PackedDecoder { | |||
} | |||
} | |||
|
|||
/// Counts the number of set bits in the provided range |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved into crate::arrow::bit_util
/// | ||
fn pad_nulls( | ||
&mut self, | ||
read_offset: usize, | ||
values_read: usize, | ||
levels_read: usize, | ||
rev_valid_position_iter: impl Iterator<Item = usize>, | ||
valid_mask: &[u8], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change makes tests easier to write, I think is clearer and may in fact optimize better.
@@ -668,15 +668,9 @@ mod tests { | |||
assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0); | |||
|
|||
let valid = vec![false, false, true, true, false, true, true, false, false]; | |||
let rev_position_iter = valid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an example of how changing pad_nulls to take a packed bitmask makes the tests less verbose
8b0adeb
to
3142ec0
Compare
|
||
#[test] | ||
fn test_byte_array_decoder() { | ||
let data: Vec<_> = vec!["hello", "world", "a", "b"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is extracted out into test_utils so that it can be reused
use arrow::array::{Array, StringArray}; | ||
use std::sync::Arc; | ||
|
||
fn column() -> ColumnDescPtr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is extracted out into test_utils
Unfortunately I needed to move some code around in order to write tests for this functionality, so this has ended up being another rather large diff 😞 but I've tried to comment where this has taken place so hopefully that helps. |
ArrowType::Dictionary(key_type, value_type) => { | ||
make_reader! { | ||
(pages, column_desc, data_type, null_mask_only) => match (key_type.as_ref(), value_type.as_ref()) { | ||
(ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8) => (u8, i32), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many of these aren't properly supported by the arrow compute kernels, but I figured better to be thorough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through this pretty carefully and I think I understand the code and it makes sense to me 👍 @tustvold
I also ran the datafusion suite (cargo test --all
) against this branch and all the tests passed
Do you have any performance results you can share here (e.g. a benchmark of reading a dictionary encoded string column from a parquet file to an Arrow array)?
Also, I think some sort of "end to end" test is probably appropriate -- i.e. that reads a parquet file and ensures that the output columns are really dictionary encoded.
FYI @yordan-pavlov and @nevi-me and @GregBowyer
I plan to write a mini program to validate that some of our internally generated parquet files did indeed read back a dictionary array rather than a StringArray
ArrowDataType::UInt64, | ||
]; | ||
|
||
for key in &key_types { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is becoming a somewhat epic single test -- I wonder if it would be valuable to split it into smaller tests that can run in parallel (as a future PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... Tbh there may be value in splitting it out as an actual fuzz testing suite in its own right...
let strings = strings.as_any().downcast_ref::<StringArray>().unwrap(); | ||
assert_eq!( | ||
strings.iter().collect::<Vec<_>>(), | ||
vec![None, None, Some("world"), Some("hello")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this was splitting off the first 4 elements (indicies 1, 0, 3, 2
and valid f,f,t,t
) I would have expected the array produced to be None, None, Some("a"), Some("")
so I clearly misunderstand something about how this code works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It reads 4 non-null values
[Some("world"), Some("hello"), Some("a"), Some(""), Some("b")]
It then pads them out based on the null mask
[None, None, Some("world"), Some("hello"), None, Some("a"), Some(""), Some("b")]
It then splits off the first four
[None, None, Some("world"), Some("hello")]
Leaving behind in the buffer
[None, Some("a"), Some(""), Some("b")]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for the clarification
} | ||
|
||
#[test] | ||
fn test_validates_keys() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to other reviewers that there is a test for decoding a dictionary whose keys are too large (e.g. an index of 500
when the keys type is Int8
) in byte_array_dictionary.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With an end to end test I think this PR is ready to merge, but I would like to see if others reviewers would like time to review as well
I tried reading one of our internal parquet files with this branch and I got an internal error -- I will post a reproducer shortly |
Yup - looks like this barfs if it encounters an empty dictionary. Will add tests and fix tomorrow 👍 |
For anyone following along, I shared the internal file with @tustvold . I also have some ideas to increase the coverage of the parquet reader (including the "end to end" test I mentioned reading out a file). I'll try and write it up (and maybe even code it) tomorrow |
Don't panic if missing dictionary page
I've pushed a commit that fixes the handling of empty dictionaries. I vacillated between returning an error if trying to read data from a column with an empty dictionary, or just returning 0 values read. In the end I went with the latter as it isn't always known how many non-null values a column contains. If GenericColumnReader thinks there should be non-null values, as determined from the definition levels, it will return an error along the lines of "insufficient values read from column". Whilst doing this I also changed to return an error instead of panicking if there is no dictionary page present. Will work on an end-to-end test along with some benchmarks next |
Tweak RecordReader buffering logic
Codecov Report
@@ Coverage Diff @@
## master #1180 +/- ##
==========================================
+ Coverage 82.67% 82.81% +0.13%
==========================================
Files 175 179 +4
Lines 51561 52164 +603
==========================================
+ Hits 42630 43199 +569
- Misses 8931 8965 +34
Continue to review full report at Codecov.
|
@tustvold do you think that benchmarks should be amended to include reading of dictionary arrays? |
Added some benchmarks
So a nice 60x performance improvement 😄 |
@@ -341,7 +341,9 @@ where | |||
Some(keys) => { | |||
// Happy path - can just copy keys | |||
// Keys will be validated on conversion to arrow | |||
decoder.get_batch(keys.spare_capacity_mut(len)) | |||
let keys_slice = keys.spare_capacity_mut(range.start + len); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to figure out why this didn't get caught by the fuzz tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix in bf28e16
Marking as a draft whilst I work on some further tweaks |
Fix bug in column writer with small page sizes
I'm happy with the state of this PR now, there is some further refinement I'd like to do on the traits, but I would rather do that as a separate PR as this one is getting quite large |
I have one more test I want to run locally before approving this:
Working on this now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran this branch against a bunch of production data we have here at Influx, using the https://github.com/alamb/pdump command built against this branch @ 8985aee and master at 0377aae
-parquet = { git = "https://github.com/apache/arrow-rs.git", rev="0377aaed5ff46214359d1b8d66c27f3afd9323c3" }
+parquet = { git = "https://github.com/tustvold/arrow-rs.git", rev="8985aeeb2ed1f55fbb061f8771ccd390df804d42" }
I dumped each file using pdump built against the two branches and compared the results using diff
-- which detected no differences in the output 🎉
Epic work @tustvold
I plan to wait for the weekend to see if anyone else wants a chance to review this before I merge it in |
@tustvold those benchmark results look very promising indeed; the next logical question is what's the impact to end-to-end performance of queries in Data Fusion |
This is highly context dependent, queries that were previously bottlenecked by parquet will of course see improvements. A simple table scan with no predicates, for example, should see most of the raw 60x performance uplift. This sort of "query" shows up in IOx when compacting or loading data from object storage into an in-memory cache. The story for more complex queries is a bit more WIP. Currently Datafusion's handling of dictionary encoded arrays isn't brilliant with it often fully materializing dictionaries when it shouldn't need to. apache/datafusion#1475 and apache/datafusion#1610 track the process of switching DataFusion to delegate comparator selection to arrow-rs, which should help to alleviate this. TLDR at this point in time I'm focused on getting arrow-rs to a good place, with the necessary base primitives, and then I'll turn my attention to what Datafusion is doing with them. Who knows someone else may even get there first 😁 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice optimization 👍
Which issue does this PR close?
Closes #171.
Rationale for this change
See ticket
What changes are included in this PR?
Adds dictionary preservation when decoding dictionary encoded parquet data into dictionary encoded arrays.
Are there any user-facing changes?
No