Skip to content

Commit

Permalink
perf: Fix quadratic in binview growable same source
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 18, 2024
1 parent 2db0ba6 commit 3a65f6d
Showing 1 changed file with 80 additions and 45 deletions.
125 changes: 80 additions & 45 deletions crates/polars-arrow/src/array/growable/binview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::array::Array;
use crate::bitmap::MutableBitmap;
use crate::buffer::Buffer;
use crate::datatypes::ArrowDataType;
use crate::legacy::utils::CustomIterTools;

struct BufferKey<'a> {
inner: &'a Buffer<u8>,
Expand Down Expand Up @@ -43,6 +44,8 @@ pub struct GrowableBinaryViewArray<'a, T: ViewType + ?Sized> {
// See: #14201
buffers: PlIndexSet<BufferKey<'a>>,
total_bytes_len: usize,
// Borrow as this can happen `N` times.
same_buffers: Option<&'a Arc<[Buffer<u8>]>>,
}

impl<'a, T: ViewType + ?Sized> GrowableBinaryViewArray<'a, T> {
Expand All @@ -62,43 +65,71 @@ impl<'a, T: ViewType + ?Sized> GrowableBinaryViewArray<'a, T> {
use_validity = true;
};

// We deduplicate the individual buffers in `buffers`.
// and the `data_buffers` in processed. As a `data_buffer` can hold M buffers, we prevent
// having N * M complexity. #15615
let mut processed_buffer_groups = PlHashSet::new();
let mut buffers = PlIndexSet::new();

for array in arrays.iter() {
let data_buffers = array.data_buffers();
if processed_buffer_groups.insert(data_buffers.as_ptr() as usize) {
buffers.extend(data_buffers.iter().map(|buf| BufferKey { inner: buf }))
// Fast case.
// This happens in group-by's
// And prevents us to push `M` buffers insert in the buffers
// #15615
let all_same_buffer = arrays
.iter()
.map(|array| array.data_buffers().as_ptr())
.all_equal()
&& !arrays.is_empty();
if all_same_buffer {
let buffers = arrays[0].data_buffers();
Self {
arrays,
data_type,
validity: prepare_validity(use_validity, capacity),
views: Vec::with_capacity(capacity),
buffers: Default::default(),
total_bytes_len: 0,
same_buffers: Some(buffers),
}
} else {
// We deduplicate the individual buffers in `buffers`.
// and the `data_buffers` in processed. As a `data_buffer` can hold M buffers, we prevent
// having N * M complexity. #15615
let mut processed_buffer_groups = PlHashSet::new();
let mut buffers = PlIndexSet::new();
for array in arrays.iter() {
let data_buffers = array.data_buffers();
if processed_buffer_groups.insert(data_buffers.as_ptr() as usize) {
buffers.extend(data_buffers.iter().map(|buf| BufferKey { inner: buf }))
}
}
}

Self {
arrays,
data_type,
validity: prepare_validity(use_validity, capacity),
views: Vec::with_capacity(capacity),
buffers,
total_bytes_len: 0,
Self {
arrays,
data_type,
validity: prepare_validity(use_validity, capacity),
views: Vec::with_capacity(capacity),
buffers,
total_bytes_len: 0,
same_buffers: None,
}
}
}

fn to(&mut self) -> BinaryViewArrayGeneric<T> {
let views = std::mem::take(&mut self.views);
let buffers = std::mem::take(&mut self.buffers);
let mut total_buffer_len = 0;
let buffers = Arc::from(
buffers
.into_iter()
.map(|buf| {
let buf = buf.inner;
total_buffer_len += buf.len();
buf.clone()
})
.collect::<Vec<_>>(),
);

let buffers = if let Some(buffers) = self.same_buffers {
buffers.clone()
} else {
Arc::from(
buffers
.into_iter()
.map(|buf| {
let buf = buf.inner;
total_buffer_len += buf.len();
buf.clone()
})
.collect::<Vec<_>>(),
)
};

let validity = self.validity.take();

unsafe {
Expand All @@ -117,28 +148,32 @@ impl<'a, T: ViewType + ?Sized> GrowableBinaryViewArray<'a, T> {
/// # Safety
/// doesn't check bounds
pub unsafe fn extend_unchecked(&mut self, index: usize, start: usize, len: usize) {
let array = *self.arrays.get_unchecked(index);
let local_buffers = array.data_buffers();
if self.same_buffers.is_none() {
let array = *self.arrays.get_unchecked(index);
let local_buffers = array.data_buffers();

extend_validity(&mut self.validity, array, start, len);
extend_validity(&mut self.validity, array, start, len);

let range = start..start + len;
let range = start..start + len;

self.views
.extend(array.views().get_unchecked(range).iter().map(|view| {
let mut view = *view;
let len = view.length as usize;
self.total_bytes_len += len;
self.views
.extend(array.views().get_unchecked(range).iter().map(|view| {
let mut view = *view;
let len = view.length as usize;
self.total_bytes_len += len;

if len > 12 {
let buffer = local_buffers.get_unchecked_release(view.buffer_idx as usize);
let key = BufferKey { inner: buffer };
let idx = self.buffers.get_full(&key).unwrap_unchecked_release().0;
if len > 12 {
let buffer = local_buffers.get_unchecked_release(view.buffer_idx as usize);
let key = BufferKey { inner: buffer };
let idx = self.buffers.get_full(&key).unwrap_unchecked_release().0;

view.buffer_idx = idx as u32;
}
view
}));
view.buffer_idx = idx as u32;
}
view
}));
} else {
self.extend_unchecked_no_buffers(index, start, len)
}
}

#[inline]
Expand Down

0 comments on commit 3a65f6d

Please sign in to comment.