-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add COW conversion for Buffer and PrimitiveArray and unary_mut #3115
Conversation
arrow-array/src/array/mod.rs
Outdated
/// Downcasts an Arc-ed sized `Array` to Arc-ed underlying arrow type. | ||
#[allow(dead_code)] | ||
pub(crate) fn downcast_array<T: Send + Sync + 'static>( | ||
array: Arc<dyn SizedArray>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you go from an ArrayRef
to Arc<dyn SizedArray>
?
Edit: Is the intention to change ArrayRef
to be Arc<dyn SizedArray>
? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cannot make AsDynAny
work with Arc<dyn Array>
. We have implemented Array
for reference type &'a T
. Any
must be 'static
and and it must be Sized
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, got what you mean. Hmm, good question. I currently don't make it work with ArrayRef
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proposal in #3117
FYI there is some overlap with #2902 |
arrow-buffer/src/buffer/immutable.rs
Outdated
let offset = self.offset; | ||
let length = self.length; | ||
Arc::try_unwrap(self.data) | ||
.map(|bytes| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to verify that bytes
has deallocation
of Deallocation::Arrow
, otherwise this is not well formed
let length = self.length; | ||
Arc::try_unwrap(self.data) | ||
.map(|bytes| { | ||
// The pointer of underlying buffer should not be offset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a somewhat annoying limitation, I wonder if there is some way to avoid it 🤔
Perhaps we could push the offset into Bytes 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the limitation, I assume that a non-zero offset means the Bytes is shared/sliced from others. So it is disallowed to be mutable here.
Wondering pushing the offset into Bytes can change it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah ideally we'd do something like make MutableArray hold a Bytes, and then push an offset into Bytes. We could even extend the Allocation trait to allow reallocation of custom allocated data. Not sure how much, if any, of that you would be interested in doing 😅 was more just an observation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the limitation, I assume that a non-zero offset means the Bytes is shared/sliced from others. So it is disallowed to be mutable here.
Just question: i think Arc::try_unwrap
has check the exactly one strong reference.
And all sliced
use the clone()
, So this is impossible panic right? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clone on an Arc increments the strong count, it does not perform a deep copy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tustvold Sorry another question: I think in datafusion most PrimitiveArray
data have only one reference during query. So base on this, most unary change using unary_mut
like +1, will avoid memcpy
than before using unary
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would likely need to update the kernels to make use of this, but theoretically. Whether this will make a major difference in practice I'm not sure, sorts, aggregates and joins, tend to dominate queries in my experience
/// assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None])); | ||
/// # } | ||
/// ``` | ||
pub fn unary_mut<F>(self, op: F) -> Result<PrimitiveArray<T>, ArrowError> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should return Self
as an error to match the other similar methods?
@@ -42,6 +42,15 @@ impl NullBufferBuilder { | |||
} | |||
} | |||
|
|||
pub fn new_from_buffer(buffer: Option<MutableBuffer>, capacity: usize) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the option here is necessary, as if there is no MutableBuffer, callers can just call the regular constructor?
arrow-buffer/src/buffer/mutable.rs
Outdated
@@ -92,6 +92,15 @@ impl MutableBuffer { | |||
} | |||
} | |||
|
|||
/// Allocates a new [MutableBuffer] from given pointer `ptr`, `capacity`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be "safer" for the to be from_bytes
and contain the necessary logic to make that well-formed, i.e. check the deallocation is as expected, etc...
arrow-data/src/data.rs
Outdated
@@ -387,6 +387,10 @@ impl ArrayData { | |||
&self.buffers[..] | |||
} | |||
|
|||
pub fn get_buffers(self) -> Vec<Buffer> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about child_data, null_buffers, etc... ?
I wonder if a more general pattern would be to use ArrayData::buffers
, clone the desired buffers and then drop the ArrayData?
let null_count = self.null_count(); | ||
let null_buffer = data.null_buffer().map(|b| b.bit_slice(data.offset(), len)); | ||
|
||
let buffer = self.data.buffers()[0].clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need call slice_with_length
to apply the offset, something like
let element_len = std::mem::size_of::<T::Native>();
let buffer = self.data.buffers()[0].slice_with_length(data.offset() * element_len, len * element_len)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really exciting, left some further comments. I had envisaged that into_builder
and related APIs would keep the existing values. Effectively they are a way to go from the immutable Array
representation back to the mutable Builder
representation?
let len = self.len(); | ||
let null_bit_buffer = self.data.null_buffer().cloned(); | ||
|
||
let buffer = self.data.buffers()[0].clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing here, I think this needs to call slice_with_length
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed to slice_with_length
but I'm wondering what's difference than clone
directly here?
arrow-buffer/src/buffer/immutable.rs
Outdated
|
||
/// Returns `MutableBuffer` for mutating the buffer if this buffer is not shared. | ||
/// Returns `Err` if this is shared or its allocation is from an external source. | ||
pub fn into_mutable(self, len: usize) -> Result<MutableBuffer, Self> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this take a len? How does this differ from Buffer::length
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as #3115 (comment)
arrow-buffer/src/buffer/mutable.rs
Outdated
@@ -92,6 +93,23 @@ impl MutableBuffer { | |||
} | |||
} | |||
|
|||
/// Allocates a new [MutableBuffer] from given `Bytes`. | |||
pub(crate) fn from_bytes(bytes: Bytes, len: usize) -> Result<Self, Bytes> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does the provided len
different from Bytes::len
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's due to different thoughts about the API. Previously I made the builder starting from length 0. But for unary_mut
I want to make a slice with same length as the array before. So this is parameterized.
Now it is changed for your suggestion so this doesn't take a len
parameter now.
pub fn new_from_buffer(buffer: MutableBuffer) -> Self { | ||
Self { | ||
buffer, | ||
len: 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
len: 0, | |
len: buffer.len() / std::mem::size_of::<T::Native>(), |
?
@@ -33,6 +33,10 @@ impl BooleanBufferBuilder { | |||
Self { buffer, len: 0 } | |||
} | |||
|
|||
pub fn new_from_buffer(buffer: MutableBuffer) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this need to be provided with the length in bits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, missed it.
builder.append_null(); | ||
builder.append_value(2); | ||
|
||
let expected: Int32Array = vec![Some(4), None, Some(2)].into_iter().collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have expected into_builder
to keep the current values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, currently the returned builder sets length to 0
so it will overwrite the existing values. Per the latest suggestion, it makes more sense to have the same length as the array. And we can ask for mutable slice if we want to mutate the existing values.
Yes, they can. For Thanks for your review! |
This is by design, as it would allow for unitialised memory. My suggestion is that You would then be able to implement things like
|
This can work as like currently One minor disadvantage of making the same length for the builder is, you cannot use builder's APIs ( Calling the builder's APIs like The builder API cannot read existing values. So it cannot be used with an From the point of view, the suggestion of |
Aah yes, my mistake I thought I had already added that, it seems I didn't get further than adding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a test of the null behaviour would be good, along with potentially one of the slicing behaviour (which I think will currently fail, which is fine).
let null_bit_buffer = self.data.null_buffer().cloned(); | ||
|
||
let buffer = self.data.buffers()[0] | ||
.slice_with_length(0, len * std::mem::size_of::<T::Native>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I suggested using slice_with_length was because the null buffer was being sliced. This is no longer the case so we could not do this, ultimately until into_mutable
supports slices this is all moot anyway
.data | ||
.null_buffer() | ||
.cloned() | ||
.and_then(|b| b.into_mutable().ok()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be split into the part that clones the null buffer and into_mutable
after data is dropped.
I also think this method should fail if the null buffer cannot be converted, where I think it currently just loses the null buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, that's why the doc test is failed now. Looking into how to make it work.
@@ -33,6 +33,10 @@ impl BooleanBufferBuilder { | |||
Self { buffer, len: 0 } | |||
} | |||
|
|||
pub fn new_from_buffer(buffer: MutableBuffer) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you?
@@ -42,6 +42,16 @@ impl NullBufferBuilder { | |||
} | |||
} | |||
|
|||
/// Creates a new builder from a `MutableBuffer`. | |||
pub fn new_from_buffer(buffer: MutableBuffer, len: usize, capacity: usize) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to verify that len
is less than capacity. And I think capacity must be buffer.len() * 8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think capacity
should not be passed in. Simply take buffer.len() * 8
now.
.map(|buffer| { | ||
NullBufferBuilder::new_from_buffer(buffer, values_builder.len(), capacity) | ||
}) | ||
.unwrap_or_else(|| NullBufferBuilder::new(capacity)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is correct, I think it needs to be created with a length of values_builder.len()
, as opposed to a capacity based off values_buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, good catch!
This has a null test now. Not sure what slicing behavior you thought? Slicing array and build builder from it? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor things 👍
/// data buffer is not shared by others. | ||
pub fn into_builder(self) -> Result<PrimitiveBuilder<T>, Self> { | ||
let len = self.len(); | ||
let null_bit_buffer = self.data.null_buffer().cloned(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to bit_slice this to be consistent with the values buffer
@@ -33,6 +33,10 @@ impl BooleanBufferBuilder { | |||
Self { buffer, len: 0 } | |||
} | |||
|
|||
pub fn new_from_buffer(buffer: MutableBuffer, len: usize) -> Self { | |||
Self { buffer, len } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to assert that len <= buffer.len() * 8
let length = self.length; | ||
Arc::try_unwrap(self.data) | ||
.map(|bytes| { | ||
// The pointer of underlying buffer should not be offset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would likely need to update the kernels to make use of this, but theoretically. Whether this will make a major difference in practice I'm not sure, sorts, aggregates and joins, tend to dominate queries in my experience
}; | ||
|
||
let try_mutable_buffers = | ||
if let Err(mutable_null_buffer) = try_mutable_null_buffer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A match expression might be clearer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, this was I took from the clippy error suggestion. 😄
I will update it to a match.
Thanks @tustvold |
Benchmark runs are scheduled for baseline = 2a065be and contender = 5bce104. 5bce104 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Part of #1981.
Rationale for this change
I've played around with the code to add some initial APIs for copy-on-write support. The COW support can benefit for our use cases to reduce memory allocation cost. I'd like to add some initial support to move it forward.
What changes are included in this PR?
This change is not fully COW support but adds some initial APIs for that.
Buffer
toMutableBuffer
PrimitiveArray
to corresponding builderunary_mut
toPrimitiveArray
for mutating its contentAre there any user-facing changes?