-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement faster arrow array reader #384
Conversation
Codecov Report
@@ Coverage Diff @@
## master #384 +/- ##
==========================================
- Coverage 82.71% 82.65% -0.06%
==========================================
Files 163 164 +1
Lines 44795 45468 +673
==========================================
+ Hits 37051 37581 +530
- Misses 7744 7887 +143
Continue to review full report at Codecov.
|
|
||
// this operation is performed before iteration | ||
// because it is fast and allows reserving all the needed memory | ||
let filter_count = values.count_set_bits_offset(filter.offset(), filter.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this mean that the count is done multiple times now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question @Dandandan , in my opinion calculating filter_count
should not be done in the SlicesIterator
because it's not used here. It's just a convenience for many of the clients of the SlicesIterator
. Also having filter_count
calculated in SlicesIterator::new
is inflexible and in the use case of the new ArrowArrayReader
would have meant that counting would be performed twice unnecessarily. That's why I have moved it to a filter_count()
method instead - keep this convenience for users of SlicesIterator
, but make it more flexible and allow more use-cases. Where I have had to change existing code, I was careful to only invoke filter_count()
a single time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe adding a docstring to the new filter_count()
would be good enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done - added docstring for filter_count()
Thanks @yordan-pavlov -- I will try and set time aside tomorrow to review this PR. Sorry for the delay |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments from an initial review. No immediate action needed on them though
} | ||
} | ||
|
||
use arrow::datatypes::ArrowPrimitiveType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reminder to move all imports to the top
@@ -506,6 +506,11 @@ impl ArrayDataBuilder { | |||
self | |||
} | |||
|
|||
pub fn null_count(mut self, null_count: usize) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're intentionally missing this function here because we were avoiding a situation where a user could specify a null count != the actual count in the null buffer. Is there a way of avoiding it @yordan-pavlov?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without this null_count
method, count_set_bits_offset
would be called unnecessarily (because we already know the null count) a second time in ArrayData::new
when value_array_data: ArrayData
is created
@@ -63,7 +63,7 @@ struct _MutableArrayData<'a> { | |||
} | |||
|
|||
impl<'a> _MutableArrayData<'a> { | |||
fn freeze(self, dictionary: Option<ArrayData>) -> ArrayData { | |||
fn freeze(self, dictionary: Option<ArrayData>) -> ArrayDataBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorgecarleitao are you fine with returning a builder here?
consume_source_item: fn(source_item: Source, state: &mut State) -> Target, | ||
} | ||
|
||
impl<Source, Target, State> UnzipIter<Source, Target, State> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the approach of unzipping the iterator into 3 iterators. My first pass review was to look at the implementation, but not yet the finer details.
This looks great, I like the approach; and I think it won't be difficult to implement it for lists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went though this PR @yordan-pavlov really nice. 👍 👍
The high level approach looks clear and solid from my perspective. I had some minor structural things that would like to be be improved such as avoiding a new dependency in the parquet
crate and removing commented out code, but I also wouldn't be opposed to merging this as is.
I think @nevi-me is the expert here. If he is cool with this approach then so am I 👍
cc @carols10cents since you and @shepmaster worked on the array reader a bit as well.
|
||
// this operation is performed before iteration | ||
// because it is fast and allows reserving all the needed memory | ||
let filter_count = values.count_set_bits_offset(filter.offset(), filter.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe adding a docstring to the new filter_count()
would be good enough
@@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true } | |||
base64 = { version = "0.13", optional = true } | |||
clap = { version = "2.33.3", optional = true } | |||
serde_json = { version = "1.0", features = ["preserve_order"], optional = true } | |||
rand = "0.8" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if a new dependency was not needed for the main crate (it seems like it is only needed for test_util
-- so perhaps we could mark test_util as [#cfg(test)]
or something -- though I suspect this doesn't actually add any new dependency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also don't like having to add this new dependency, but couldn't get the benchmarks to compile without it; I am more than happy to remove or restrict if someone knows how
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only way I can think of is to move test_util
to a new crate (and then add it as a dev dependency)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try this over the weekend
// primitive / int32 benchmarks | ||
// ============================= | ||
let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(schema.clone(), mandatory_int32_column_desc.clone(), 0.0); | ||
// group.bench_function("clone benchmark data", |b| b.iter(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason thus bench is commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was curious what's the cost of just cloning the benchmark data; I left it commented out in case someone else is curious about this as well, but I am happy to remove it
parquet/src/arrow/array_reader.rs
Outdated
Utf8Converter, | ||
>::new( | ||
page_iterator, | ||
use crate::arrow::arrow_array_reader::{StringArrayConverter, ArrowArrayReader}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the change to move these use
statements from the top of the module to here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the only reason for the local use
statement is because currently ArrowArrayReader
is (intentionally) only used here for strings; once it's used for more types it would make sense to move most / all of these use statements to the top.
@@ -22,6 +22,4 @@ pub mod bit_util; | |||
mod bit_packing; | |||
pub mod cursor; | |||
pub mod hash_util; | |||
|
|||
#[cfg(test)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This chang means that test_common
becomes part of the public parquet
API
Was this needed to use test_common
stuff in the benchmarks? Maybe it might make sense (as a follow on PR) to move test_common
into its own (unpublished) crate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I had to make this change to make test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder}
available in the benchmark crate; I don't like making this public either, but haven't been able to find a way to only make it available to tests and benches; if anyone knows how this could be done I am more than happy to change it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed this to
pub(crate) mod test_common;
pub use self::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
in order to limit new public types to only InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder
which are used in benchmarks. I noticed that this approach is already used here https://github.com/apache/arrow-rs/blob/master/parquet/src/lib.rs#L45 and thought this would be a much simpler solution compared to a new library crate.
@alamb let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks reasonable to me. Thank you @yordan-pavlov
@nevi-me is this something you can take on reviewing / approving? I am not very familiar with this code -- it looked good to me but I don't feel super confident of approving it. However, if you don't have the time I will do the best I can |
I'll complete my review over the weekend, I like the approach; the I don't think I'll have any major items to raise though |
🙏 Thank you @nevi-me ! |
FYI I plan to make a release candidate for Arrow 4.3 on Thursday or Friday this week and release early next week. So if we want to try and get this PR into 4.3 that is the schedule. It is large enough, however, that delaying until 4.4 and giving it some more bake time is not a bad idea either |
04448d1
to
d5173db
Compare
thanks for the heads up @alamb, I have rebased and cleaned up the code in preparation for merging, but still waiting for review by @nevi-me and @jorgecarleitao |
Utf8Converter, | ||
>::new( | ||
page_iterator, | ||
use crate::arrow::arrow_array_reader::{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please move these imports to the top of the file, for consistency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yordan-pavlov @alamb I'm happy with merging this, so that we can follow up with documentation (and I'd like to try out list write support)
Merging this in and we can figure out if we want to try and put it into arrow 4.4.0. Thanks @yordan-pavlov ! |
And thanks @nevi-me for the epic review |
Which issue does this PR close?
Closes #200.
Rationale for this change
This PR attempts to implement a new, more efficient and also more generic
ArrowArrayReader
, as a replacement to both thePrimitiveArrayReader
andComplexObjectArrayReader
that exist today. The basic idea behind the newArrowArrayReader
is to copy contiguous byte slices from parquet page buffers to arrow array buffers as directly as possible, while avoiding unnecessary memory allocation as much as possible. While for primitive types such as Int32, the performance improvements are small in most cases, for complex types such as strings the performance improvements can be significant (up to 6 times faster). See benchmark results below.
I did try initially to use iterators end-to-end as suggested by the linked issue, but this required a more complex and less efficient implementation which was ultimately slower. This is why in this PR iterators are only used to map parquet pages to implementations of the
ValueDecoder
trait trait which know how to read / decode byte slices for batches of values.What changes are included in this PR?
This PR implements the new
ArrowArrayReader
and converters for strings and primitive types, but is only used / enabled for strings. The plan is to enable / use the newArrowArrayReader
for more types in subsequent PRs. Also note thatValueDecoder
s for onlyPLAIN
andRLE_DICTIONARY
encodings are currently implemented.Are there any user-facing changes?
There are some non-breaking changes to
MutableArrayData
andSlicesIterator
, @jorgecarleitao let me know what you think about those.Here are the benchmark results:
read Int32Array, plain encoded, mandatory, no NULLs - old: time: [9.0238 us 9.1121 us 9.2100 us]
read Int32Array, plain encoded, mandatory, no NULLs - new: time: [6.9506 us 7.1606 us 7.4062 us]
read Int32Array, plain encoded, optional, no NULLs - old: time: [247.66 us 252.08 us 257.12 us]
read Int32Array, plain encoded, optional, no NULLs - new: time: [40.322 us 40.736 us 41.215 us]
read Int32Array, plain encoded, optional, half NULLs - old: time: [434.25 us 438.25 us 442.92 us]
read Int32Array, plain encoded, optional, half NULLs - new: time: [326.37 us 331.68 us 337.07 us]
read Int32Array, dictionary encoded, mandatory, no NULLs - old: time: [38.876 us 39.698 us 40.805 us]
read Int32Array, dictionary encoded, mandatory, no NULLs - new: time: [150.62 us 152.38 us 154.29 us]
read Int32Array, dictionary encoded, optional, no NULLs - old: time: [265.18 us 267.54 us 270.16 us]
read Int32Array, dictionary encoded, optional, no NULLs - new: time: [167.54 us 169.15 us 170.99 us]
read Int32Array, dictionary encoded, optional, half NULLs - old: time: [442.66 us 446.42 us 450.47 us]
read Int32Array, dictionary encoded, optional, half NULLs - new: time: [418.46 us 421.81 us 425.37 us]
read StringArray, plain encoded, mandatory, no NULLs - old: time: [1.6670 ms 1.6773 ms 1.6895 ms]
read StringArray, plain encoded, mandatory, no NULLs - new: time: [264.44 us 269.63 us 275.39 us]
read StringArray, plain encoded, optional, no NULLs - old: time: [1.8602 ms 1.8753 ms 1.8913 ms]
read StringArray, plain encoded, optional, no NULLs - new: time: [363.59 us 367.03 us 370.63 us]
read StringArray, plain encoded, optional, half NULLs - old: time: [1.5216 ms 1.5346 ms 1.5486 ms]
read StringArray, plain encoded, optional, half NULLs - new: time: [514.01 us 518.68 us 524.09 us]
read StringArray, dictionary encoded, mandatory, no NULLs - old: time: [1.4903 ms 1.5129 ms 1.5358 ms]
read StringArray, dictionary encoded, mandatory, no NULLs - new: time: [218.30 us 220.54 us 223.17 us]
read StringArray, dictionary encoded, optional, no NULLs - old: time: [1.5652 ms 1.5776 ms 1.5912 ms]
read StringArray, dictionary encoded, optional, no NULLs - new: time: [249.53 us 254.14 us 258.99 us]
read StringArray, dictionary encoded, optional, half NULLs - old: time: [1.3585 ms 1.3945 ms 1.4318 ms]
read StringArray, dictionary encoded, optional, half NULLs - new: time: [496.27 us 508.28 us 522.43 us]
@nevi-me @alamb @Dandandan let me know what you think.