-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement faster arrow array reader #384
Changes from all commits
96fd31f
442db1e
98ccedf
22aacc1
1246517
aeab3d3
9ebf874
d5173db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,7 @@ use std::mem; | |
|
||
use super::{ | ||
data::{into_buffers, new_buffers}, | ||
ArrayData, | ||
ArrayData, ArrayDataBuilder, | ||
}; | ||
use crate::array::StringOffsetSizeTrait; | ||
|
||
|
@@ -63,7 +63,7 @@ struct _MutableArrayData<'a> { | |
} | ||
|
||
impl<'a> _MutableArrayData<'a> { | ||
fn freeze(self, dictionary: Option<ArrayData>) -> ArrayData { | ||
fn freeze(self, dictionary: Option<ArrayData>) -> ArrayDataBuilder { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jorgecarleitao are you fine with returning a builder here? |
||
let buffers = into_buffers(&self.data_type, self.buffer1, self.buffer2); | ||
|
||
let child_data = match self.data_type { | ||
|
@@ -76,19 +76,19 @@ impl<'a> _MutableArrayData<'a> { | |
child_data | ||
} | ||
}; | ||
ArrayData::new( | ||
self.data_type, | ||
self.len, | ||
Some(self.null_count), | ||
if self.null_count > 0 { | ||
Some(self.null_buffer.into()) | ||
} else { | ||
None | ||
}, | ||
0, | ||
buffers, | ||
child_data, | ||
) | ||
|
||
let mut array_data_builder = ArrayDataBuilder::new(self.data_type) | ||
.offset(0) | ||
.len(self.len) | ||
.null_count(self.null_count) | ||
.buffers(buffers) | ||
.child_data(child_data); | ||
if self.null_count > 0 { | ||
array_data_builder = | ||
array_data_builder.null_bit_buffer(self.null_buffer.into()); | ||
} | ||
|
||
array_data_builder | ||
} | ||
} | ||
|
||
|
@@ -552,8 +552,13 @@ impl<'a> MutableArrayData<'a> { | |
.map(|array| build_extend_null_bits(array, use_nulls)) | ||
.collect(); | ||
|
||
let null_bytes = bit_util::ceil(array_capacity, 8); | ||
let null_buffer = MutableBuffer::from_len_zeroed(null_bytes); | ||
let null_buffer = if use_nulls { | ||
let null_bytes = bit_util::ceil(array_capacity, 8); | ||
MutableBuffer::from_len_zeroed(null_bytes) | ||
} else { | ||
// create 0 capacity mutable buffer with the intention that it won't be used | ||
MutableBuffer::with_capacity(0) | ||
}; | ||
|
||
let extend_values = match &data_type { | ||
DataType::Dictionary(_, _) => { | ||
|
@@ -605,13 +610,40 @@ impl<'a> MutableArrayData<'a> { | |
|
||
/// Extends this [MutableArrayData] with null elements, disregarding the bound arrays | ||
pub fn extend_nulls(&mut self, len: usize) { | ||
// TODO: null_buffer should probably be extended here as well | ||
// otherwise is_valid() could later panic | ||
// add test to confirm | ||
self.data.null_count += len; | ||
(self.extend_nulls)(&mut self.data, len); | ||
self.data.len += len; | ||
} | ||
|
||
/// Returns the current length | ||
#[inline] | ||
pub fn len(&self) -> usize { | ||
self.data.len | ||
} | ||
|
||
/// Returns true if len is 0 | ||
#[inline] | ||
pub fn is_empty(&self) -> bool { | ||
self.data.len == 0 | ||
} | ||
|
||
/// Returns the current null count | ||
#[inline] | ||
pub fn null_count(&self) -> usize { | ||
self.data.null_count | ||
} | ||
|
||
/// Creates a [ArrayData] from the pushed regions up to this point, consuming `self`. | ||
pub fn freeze(self) -> ArrayData { | ||
self.data.freeze(self.dictionary).build() | ||
} | ||
|
||
/// Creates a [ArrayDataBuilder] from the pushed regions up to this point, consuming `self`. | ||
/// This is useful for extending the default behavior of MutableArrayData. | ||
pub fn into_builder(self) -> ArrayDataBuilder { | ||
self.data.freeze(self.dictionary) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,10 +44,10 @@ enum State { | |
/// slots of a [BooleanArray] are true. Each interval corresponds to a contiguous region of memory to be | ||
/// "taken" from an array to be filtered. | ||
#[derive(Debug)] | ||
pub(crate) struct SlicesIterator<'a> { | ||
pub struct SlicesIterator<'a> { | ||
iter: Enumerate<BitChunkIterator<'a>>, | ||
state: State, | ||
filter_count: usize, | ||
filter: &'a BooleanArray, | ||
remainder_mask: u64, | ||
remainder_len: usize, | ||
chunk_len: usize, | ||
|
@@ -59,19 +59,14 @@ pub(crate) struct SlicesIterator<'a> { | |
} | ||
|
||
impl<'a> SlicesIterator<'a> { | ||
pub(crate) fn new(filter: &'a BooleanArray) -> Self { | ||
pub fn new(filter: &'a BooleanArray) -> Self { | ||
let values = &filter.data_ref().buffers()[0]; | ||
|
||
// this operation is performed before iteration | ||
// because it is fast and allows reserving all the needed memory | ||
let filter_count = values.count_set_bits_offset(filter.offset(), filter.len()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't this mean that the count is done multiple times now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good question @Dandandan , in my opinion calculating There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe adding a docstring to the new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done - added docstring for |
||
|
||
let chunks = values.bit_chunks(filter.offset(), filter.len()); | ||
|
||
Self { | ||
iter: chunks.iter().enumerate(), | ||
state: State::Chunks, | ||
filter_count, | ||
filter, | ||
remainder_len: chunks.remainder_len(), | ||
chunk_len: chunks.chunk_len(), | ||
remainder_mask: chunks.remainder_bits(), | ||
|
@@ -83,6 +78,12 @@ impl<'a> SlicesIterator<'a> { | |
} | ||
} | ||
|
||
/// Counts the number of set bits in the filter array. | ||
fn filter_count(&self) -> usize { | ||
let values = self.filter.values(); | ||
values.count_set_bits_offset(self.filter.offset(), self.filter.len()) | ||
} | ||
|
||
#[inline] | ||
fn current_start(&self) -> usize { | ||
self.current_chunk * 64 + self.current_bit | ||
|
@@ -193,7 +194,7 @@ impl<'a> Iterator for SlicesIterator<'a> { | |
/// Therefore, it is considered undefined behavior to pass `filter` with null values. | ||
pub fn build_filter(filter: &BooleanArray) -> Result<Filter> { | ||
let iter = SlicesIterator::new(filter); | ||
let filter_count = iter.filter_count; | ||
let filter_count = iter.filter_count(); | ||
let chunks = iter.collect::<Vec<_>>(); | ||
|
||
Ok(Box::new(move |array: &ArrayData| { | ||
|
@@ -253,7 +254,8 @@ pub fn filter(array: &Array, predicate: &BooleanArray) -> Result<ArrayRef> { | |
} | ||
|
||
let iter = SlicesIterator::new(predicate); | ||
match iter.filter_count { | ||
let filter_count = iter.filter_count(); | ||
match filter_count { | ||
0 => { | ||
// return empty | ||
Ok(new_empty_array(array.data_type())) | ||
|
@@ -266,7 +268,7 @@ pub fn filter(array: &Array, predicate: &BooleanArray) -> Result<ArrayRef> { | |
_ => { | ||
// actually filter | ||
let mut mutable = | ||
MutableArrayData::new(vec![array.data_ref()], false, iter.filter_count); | ||
MutableArrayData::new(vec![array.data_ref()], false, filter_count); | ||
iter.for_each(|(start, end)| mutable.extend(0, start, end)); | ||
let data = mutable.freeze(); | ||
Ok(make_array(data)) | ||
|
@@ -599,7 +601,7 @@ mod tests { | |
let filter = BooleanArray::from(filter_values); | ||
|
||
let iter = SlicesIterator::new(&filter); | ||
let filter_count = iter.filter_count; | ||
let filter_count = iter.filter_count(); | ||
let chunks = iter.collect::<Vec<_>>(); | ||
|
||
assert_eq!(chunks, vec![(1, 2)]); | ||
|
@@ -612,7 +614,7 @@ mod tests { | |
let filter = BooleanArray::from(filter_values); | ||
|
||
let iter = SlicesIterator::new(&filter); | ||
let filter_count = iter.filter_count; | ||
let filter_count = iter.filter_count(); | ||
let chunks = iter.collect::<Vec<_>>(); | ||
|
||
assert_eq!(chunks, vec![(0, 1), (2, 64)]); | ||
|
@@ -625,7 +627,7 @@ mod tests { | |
let filter = BooleanArray::from(filter_values); | ||
|
||
let iter = SlicesIterator::new(&filter); | ||
let filter_count = iter.filter_count; | ||
let filter_count = iter.filter_count(); | ||
let chunks = iter.collect::<Vec<_>>(); | ||
|
||
assert_eq!(chunks, vec![(1, 62), (63, 124), (125, 130)]); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true } | |
base64 = { version = "0.13", optional = true } | ||
clap = { version = "2.33.3", optional = true } | ||
serde_json = { version = "1.0", features = ["preserve_order"], optional = true } | ||
rand = "0.8" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if a new dependency was not needed for the main crate (it seems like it is only needed for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also don't like having to add this new dependency, but couldn't get the benchmarks to compile without it; I am more than happy to remove or restrict if someone knows how There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only way I can think of is to move There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will try this over the weekend |
||
|
||
[dev-dependencies] | ||
criterion = "0.3" | ||
|
@@ -76,3 +77,7 @@ required-features = ["cli"] | |
[[bench]] | ||
name = "arrow_writer" | ||
harness = false | ||
|
||
[[bench]] | ||
name = "arrow_array_reader" | ||
harness = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're intentionally missing this function here because we were avoiding a situation where a user could specify a null count != the actual count in the null buffer. Is there a way of avoiding it @yordan-pavlov?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without this
null_count
method,count_set_bits_offset
would be called unnecessarily (because we already know the null count) a second time inArrayData::new
whenvalue_array_data: ArrayData
is created