Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add COW conversion for Buffer and PrimitiveArray and unary_mut #3115

Merged
merged 11 commits into from
Nov 17, 2022

Conversation

viirya
Copy link
Member

@viirya viirya commented Nov 15, 2022

Which issue does this PR close?

Part of #1981.

Rationale for this change

I've played around with the code to add some initial APIs for copy-on-write support. The COW support can benefit for our use cases to reduce memory allocation cost. I'd like to add some initial support to move it forward.

What changes are included in this PR?

This change is not fully COW support but adds some initial APIs for that.

  • Add COW conversion from Buffer to MutableBuffer
  • Add COW conversion from PrimitiveArray to corresponding builder
  • Add unary_mut to PrimitiveArray for mutating its content

Are there any user-facing changes?

@github-actions github-actions bot added the arrow Changes to the arrow crate label Nov 15, 2022
/// Downcasts an Arc-ed sized `Array` to Arc-ed underlying arrow type.
#[allow(dead_code)]
pub(crate) fn downcast_array<T: Send + Sync + 'static>(
array: Arc<dyn SizedArray>,
Copy link
Contributor

@tustvold tustvold Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you go from an ArrayRef to Arc<dyn SizedArray>?

Edit: Is the intention to change ArrayRef to be Arc<dyn SizedArray>? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot make AsDynAny work with Arc<dyn Array>. We have implemented Array for reference type &'a T. Any must be 'static and and it must be Sized as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, got what you mean. Hmm, good question. I currently don't make it work with ArrayRef.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposal in #3117

@tustvold
Copy link
Contributor

tustvold commented Nov 15, 2022

FYI there is some overlap with #2902

@viirya viirya changed the title Add some initial APIs for copy-on-write support Add COW conversion for Buffer and PrimitiveArray and unary_mut Nov 15, 2022
let offset = self.offset;
let length = self.length;
Arc::try_unwrap(self.data)
.map(|bytes| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to verify that bytes has deallocation of Deallocation::Arrow, otherwise this is not well formed

let length = self.length;
Arc::try_unwrap(self.data)
.map(|bytes| {
// The pointer of underlying buffer should not be offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a somewhat annoying limitation, I wonder if there is some way to avoid it 🤔

Perhaps we could push the offset into Bytes 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the limitation, I assume that a non-zero offset means the Bytes is shared/sliced from others. So it is disallowed to be mutable here.

Wondering pushing the offset into Bytes can change it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah ideally we'd do something like make MutableArray hold a Bytes, and then push an offset into Bytes. We could even extend the Allocation trait to allow reallocation of custom allocated data. Not sure how much, if any, of that you would be interested in doing 😅 was more just an observation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the limitation, I assume that a non-zero offset means the Bytes is shared/sliced from others. So it is disallowed to be mutable here.

Just question: i think Arc::try_unwrap has check the exactly one strong reference.
And all sliced use the clone(), So this is impossible panic right? 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clone on an Arc increments the strong count, it does not perform a deep copy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold Sorry another question: I think in datafusion most PrimitiveArray data have only one reference during query. So base on this, most unary change using unary_mut like +1, will avoid memcpy than before using unary ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would likely need to update the kernels to make use of this, but theoretically. Whether this will make a major difference in practice I'm not sure, sorts, aggregates and joins, tend to dominate queries in my experience

/// assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None]));
/// # }
/// ```
pub fn unary_mut<F>(self, op: F) -> Result<PrimitiveArray<T>, ArrowError>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should return Self as an error to match the other similar methods?

@@ -42,6 +42,15 @@ impl NullBufferBuilder {
}
}

pub fn new_from_buffer(buffer: Option<MutableBuffer>, capacity: usize) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the option here is necessary, as if there is no MutableBuffer, callers can just call the regular constructor?

@@ -92,6 +92,15 @@ impl MutableBuffer {
}
}

/// Allocates a new [MutableBuffer] from given pointer `ptr`, `capacity`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be "safer" for the to be from_bytes and contain the necessary logic to make that well-formed, i.e. check the deallocation is as expected, etc...

@@ -387,6 +387,10 @@ impl ArrayData {
&self.buffers[..]
}

pub fn get_buffers(self) -> Vec<Buffer> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about child_data, null_buffers, etc... ?

I wonder if a more general pattern would be to use ArrayData::buffers, clone the desired buffers and then drop the ArrayData?

let null_count = self.null_count();
let null_buffer = data.null_buffer().map(|b| b.bit_slice(data.offset(), len));

let buffer = self.data.buffers()[0].clone();
Copy link
Contributor

@tustvold tustvold Nov 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need call slice_with_length to apply the offset, something like

let element_len = std::mem::size_of::<T::Native>();
let buffer = self.data.buffers()[0].slice_with_length(data.offset() * element_len, len * element_len)

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really exciting, left some further comments. I had envisaged that into_builder and related APIs would keep the existing values. Effectively they are a way to go from the immutable Array representation back to the mutable Builder representation?

let len = self.len();
let null_bit_buffer = self.data.null_buffer().cloned();

let buffer = self.data.buffers()[0].clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here, I think this needs to call slice_with_length

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to slice_with_length but I'm wondering what's difference than clone directly here?

arrow-array/src/array/primitive_array.rs Show resolved Hide resolved

/// Returns `MutableBuffer` for mutating the buffer if this buffer is not shared.
/// Returns `Err` if this is shared or its allocation is from an external source.
pub fn into_mutable(self, len: usize) -> Result<MutableBuffer, Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this take a len? How does this differ from Buffer::length?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as #3115 (comment)

@@ -92,6 +93,23 @@ impl MutableBuffer {
}
}

/// Allocates a new [MutableBuffer] from given `Bytes`.
pub(crate) fn from_bytes(bytes: Bytes, len: usize) -> Result<Self, Bytes> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the provided len different from Bytes::len?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's due to different thoughts about the API. Previously I made the builder starting from length 0. But for unary_mut I want to make a slice with same length as the array before. So this is parameterized.

Now it is changed for your suggestion so this doesn't take a len parameter now.

pub fn new_from_buffer(buffer: MutableBuffer) -> Self {
Self {
buffer,
len: 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
len: 0,
len: buffer.len() / std::mem::size_of::<T::Native>(),

?

@@ -33,6 +33,10 @@ impl BooleanBufferBuilder {
Self { buffer, len: 0 }
}

pub fn new_from_buffer(buffer: MutableBuffer) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this need to be provided with the length in bits?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, missed it.

builder.append_null();
builder.append_value(2);

let expected: Int32Array = vec![Some(4), None, Some(2)].into_iter().collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected into_builder to keep the current values?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, currently the returned builder sets length to 0 so it will overwrite the existing values. Per the latest suggestion, it makes more sense to have the same length as the array. And we can ask for mutable slice if we want to mutate the existing values.

@viirya
Copy link
Member Author

viirya commented Nov 16, 2022

This is really exciting, left some further comments. I had envisaged that into_builder and related APIs would keep the existing values. Effectively they are a way to go from the immutable Array representation back to the mutable Builder representation?

Yes, they can. unary_mut demonstrates it. You can give it a op which returns original values for all or partial indexes to keep the existing values.

For into_builder, PrimitiveBuilder currently doesn't offer the capacity to advance without setting its value buffer. For keeping the existing values, we may need to add an API for advancing without changing buffer values.

Thanks for your review!

@tustvold
Copy link
Contributor

tustvold commented Nov 16, 2022

currently doesn't offer the capacity to advance without setting its value buffer

This is by design, as it would allow for unitialised memory. My suggestion is that into_builder would create a builder with the same length as the array from which it was created, i.e. array.into_builder().finish() would be a no-op. Does that make sense?

You would then be able to implement things like unary_mut by doing something like

let mut builder = array.into_builder()?;
builder.as_mut_slice().for_each(op);
builder.finish()

@viirya
Copy link
Member Author

viirya commented Nov 16, 2022

This is by design, as it would allow for unitialised memory. My suggestion is that into_builder would create a builder with the same length as the array from which it was created, i.e. array.into_builder().finish() would be a no-op. Does that make sense?

You would then be able to implement things like unary_mut by doing something like

let mut builder = array.into_builder()?;
builder.as_mut_slice().for_each(op);
builder.finish()

This can work as like currently unary_mut does that iterating existing values as slice and mutating values as like.

One minor disadvantage of making the same length for the builder is, you cannot use builder's APIs (append_value , etc.) to simply overwrite the existing values.

Calling the builder's APIs like append_value will continue from the end of the existing array instead of mutating the existing values.

The builder API cannot read existing values. So it cannot be used with an op which reads existing values and mutates that.

From the point of view, the suggestion of as_mut_slice makes sense to me.

@tustvold
Copy link
Contributor

Aah yes, my mistake I thought I had already added that, it seems I didn't get further than adding as_slice_mut to BufferBuilder

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a test of the null behaviour would be good, along with potentially one of the slicing behaviour (which I think will currently fail, which is fine).

arrow-array/src/array/primitive_array.rs Show resolved Hide resolved
Comment on lines 533 to 536
let null_bit_buffer = self.data.null_buffer().cloned();

let buffer = self.data.buffers()[0]
.slice_with_length(0, len * std::mem::size_of::<T::Native>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I suggested using slice_with_length was because the null buffer was being sliced. This is no longer the case so we could not do this, ultimately until into_mutable supports slices this is all moot anyway

.data
.null_buffer()
.cloned()
.and_then(|b| b.into_mutable().ok());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be split into the part that clones the null buffer and into_mutable after data is dropped.

I also think this method should fail if the null buffer cannot be converted, where I think it currently just loses the null buffer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that's why the doc test is failed now. Looking into how to make it work.

@@ -33,6 +33,10 @@ impl BooleanBufferBuilder {
Self { buffer, len: 0 }
}

pub fn new_from_buffer(buffer: MutableBuffer) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you?

@@ -42,6 +42,16 @@ impl NullBufferBuilder {
}
}

/// Creates a new builder from a `MutableBuffer`.
pub fn new_from_buffer(buffer: MutableBuffer, len: usize, capacity: usize) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to verify that len is less than capacity. And I think capacity must be buffer.len() * 8

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think capacity should not be passed in. Simply take buffer.len() * 8 now.

.map(|buffer| {
NullBufferBuilder::new_from_buffer(buffer, values_builder.len(), capacity)
})
.unwrap_or_else(|| NullBufferBuilder::new(capacity));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct, I think it needs to be created with a length of values_builder.len(), as opposed to a capacity based off values_buffer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, good catch!

@viirya
Copy link
Member Author

viirya commented Nov 16, 2022

I think a test of the null behaviour would be good, along with potentially one of the slicing behaviour (which I think will currently fail, which is fine).

This has a null test now. Not sure what slicing behavior you thought? Slicing array and build builder from it?

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some minor things 👍

/// data buffer is not shared by others.
pub fn into_builder(self) -> Result<PrimitiveBuilder<T>, Self> {
let len = self.len();
let null_bit_buffer = self.data.null_buffer().cloned();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to bit_slice this to be consistent with the values buffer

@@ -33,6 +33,10 @@ impl BooleanBufferBuilder {
Self { buffer, len: 0 }
}

pub fn new_from_buffer(buffer: MutableBuffer, len: usize) -> Self {
Self { buffer, len }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to assert that len <= buffer.len() * 8

let length = self.length;
Arc::try_unwrap(self.data)
.map(|bytes| {
// The pointer of underlying buffer should not be offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would likely need to update the kernels to make use of this, but theoretically. Whether this will make a major difference in practice I'm not sure, sorts, aggregates and joins, tend to dominate queries in my experience

};

let try_mutable_buffers =
if let Err(mutable_null_buffer) = try_mutable_null_buffer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A match expression might be clearer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, this was I took from the clippy error suggestion. 😄
I will update it to a match.

@viirya viirya merged commit 5bce104 into apache:master Nov 17, 2022
@viirya
Copy link
Member Author

viirya commented Nov 17, 2022

Thanks @tustvold

@ursabot
Copy link

ursabot commented Nov 17, 2022

Benchmark runs are scheduled for baseline = 2a065be and contender = 5bce104. 5bce104 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants