-
Notifications
You must be signed in to change notification settings - Fork 844
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve ArrowWriter
memory usage: Buffer Pages in ArrowWriter instead of RecordBatch (#3871)
#4280
Conversation
@@ -437,11 +390,11 @@ struct ByteArrayEncoder { | |||
|
|||
impl ColumnValueEncoder for ByteArrayEncoder { | |||
type T = ByteArray; | |||
type Values = ArrayRef; | |||
type Values = dyn Array; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a drive-by cleanup
self.write(&a)?; | ||
return self.write(&b); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was being a bit lazy here, but this recursion will only be a problem if the the size of the RecordBatch is orders of magnitude larger than the row group size, which would be a pretty strange situation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree -- perhaps we can fix it if this turns out to be a problem in practice.
There is also a workaround for anyone who hits this, which is to break up their input RecordBatch (using slice()
) into smaller parts
self.buffered_rows = 0; | ||
let mut row_group_writer = self.writer.next_row_group()?; | ||
for (chunk, close) in in_progress.close()? { | ||
row_group_writer.append_column(&chunk, close)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the new API from #4269
type T = ChainReader; | ||
|
||
fn get_read(&self, start: u64) -> Result<Self::T> { | ||
assert_eq!(start, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are somewhat relying on an implementation detail of append_column, but given this is the same crate, I think it is fine. It will break noisily should we change it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can add a //
comment here explaining what this is relying on, in case anyone in the future hits this assert
@@ -162,6 +162,75 @@ impl CompressedPage { | |||
pub fn data(&self) -> &[u8] { | |||
self.compressed_page.buffer().data() | |||
} | |||
|
|||
/// Returns the thrift page header | |||
pub(crate) fn to_thrift_header(&self) -> PageHeader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just moved from SerializedPageWriter
@@ -36,7 +36,7 @@ pub trait ColumnValues { | |||
} | |||
|
|||
#[cfg(feature = "arrow")] | |||
impl<T: arrow_array::Array> ColumnValues for T { | |||
impl ColumnValues for dyn arrow_array::Array { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This trait is not public, so this isn't a breaking change
#[derive(Default)] | ||
struct ArrowColumnChunk { | ||
length: usize, | ||
data: Vec<Bytes>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By buffering separate chunks, we should be performing no more memcpy of data than we were before 🎉
Benchmarks show no discernible impact on performance |
ArrowWriter
memory usage: Buffer Pages in ArrowWriter instead of RecordBatch (#3871)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @tustvold -- this looks really nice 👌
I am not sure about the num_values()
metadata change - - I recommend we make that a separate PR.
Otherwise I think it looks awesome -- I will verify the memory usage using the reproducer in https://github.com/influxdata/influxdb_iox/pull/7855
self.write(&a)?; | ||
return self.write(&b); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree -- perhaps we can fix it if this turns out to be a problem in practice.
There is also a workaround for anyone who hits this, which is to break up their input RecordBatch (using slice()
) into smaller parts
} | ||
|
||
impl ArrowRowGroupWriter { | ||
fn new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given this returns Result
I think try_new(...)
would be a more idomatic name
type T = ChainReader; | ||
|
||
fn get_read(&self, start: u64) -> Result<Self::T> { | ||
assert_eq!(start, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can add a //
comment here explaining what this is relying on, in case anyone in the future hits this assert
/// A shared [`ArrowColumnChunk`] | ||
/// | ||
/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via | ||
/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at this a bit -- is the issue that you can't get the ColumnChunk
back from the GenericColumnWriter
?
I wonder if we could add something like GenericColumnWriter::close_into_inner()
that returned the inner writer and the ColumnCloseResult 🤔
But that would still require downcasting back so 🤷 not sure it is any better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried a lot to avoid this, however, I couldn't devise any mechanism that wasn't both a breaking change and pretty grim. This was the least bad option, and doesn't commit us to anything long term
.downcast_ref::<arrow_array::Float32Array>() | ||
.expect("Unable to get Float32 array"); | ||
write_primitive(typed, array.values(), levels)? | ||
let array = column.as_primitive::<Float32Type>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 nice and tidy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good.
In order to get DataFusion to compile when testing this PR, I had to add some additional Send
bounds: 00028c9
You can see the specific error here: https://github.com/influxdata/influxdb_iox/pull/7866#discussion_r1205910780
Is it possible to add the Send
bounds (or do a better fix) in this PR?
@@ -152,43 +147,75 @@ impl<W: Write> ArrowWriter<W> { | |||
self.writer.flushed_row_groups() | |||
} | |||
|
|||
/// Enqueues the provided `RecordBatch` to be written | |||
/// Returns the length in bytes of the current in progress row group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Returns the length in bytes of the current in progress row group | |
/// The bytes of memory used to store the current in progress row group. | |
/// | |
/// The writer incrementally writes each row group on a call to `write` to | |
/// a buffer in memory. This function reports on the amount of memory | |
/// in the buffer. |
#4287 adds the necessary Send bounds, although I am also exploring an approach that doesn't need them |
@@ -152,43 +147,75 @@ impl<W: Write> ArrowWriter<W> { | |||
self.writer.flushed_row_groups() | |||
} | |||
|
|||
/// Enqueues the provided `RecordBatch` to be written | |||
/// Returns the length in bytes of the current in progress row group | |||
pub fn in_progress_size(&self) -> usize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tustvold -- I am having some trouble with this API when using it in IOx -- it is always returning zero (see https://github.com/influxdata/influxdb_iox/pull/7880/files#r1207315133)
I wrote a test for these APIs targeting this PR in tustvold#63 perhaps you can have a look at tell me if I am doing something wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aah, this is an oversight on my part. The reported size will just be the size of the flushed pages, any data buffered but not yet flushed to a page will not be counted. This should be at most 1 page per column, and so should be less than the max page size (1MB) per column
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this API return the upper bound? Otherwise I am not sure how useful it is 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear here, the usecase is "I want to ensure that when writing many parquet files concurrently we don't exceed some memory limit (so that the process doing so isn't killed by k8s / the operating system)
This doesn't need to be super accurate, but enough of an upper bound to achieve the above goal.
If it would be ok to add a 1MB overhead for each column (e.g. the PAGE_SIZE)or wherever that buffer is defined, I can try and propose a patch to do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to think about how best to support an upper bound, we don't currently expose this information from the encoders. I had assumed, perhaps incorrectly, that an order of magnitude value was sufficient. Ultimately if your at a point where an individual page matters, you're going to have problems...
If the motivation is testing, you could always artificially lower the maximum page size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i've inclued tustvold#63 in 4683a30 along with a more accurate form of memory estimation - PTAL
/// of any data that has not yet been flushed to a pge | ||
pub(crate) fn get_estimated_total_bytes(&self) -> u64 { | ||
self.column_metrics.total_bytes_written | ||
+ self.encoder.estimated_data_page_size() as u64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the encoders all buffer encoded data internally, this is a fairly accurate estimation of the memory usage.
Edit: it will still be an underestimate as the allocations may be larger than needed, but this is fairly accurate, and doesn't require any breaking changes
/// Unlike [`Self::get_total_bytes_written`] this includes an estimate | ||
/// of any data that has not yet been flushed to a page | ||
#[cfg(feature = "arrow")] | ||
pub(crate) fn get_estimated_total_bytes(&self) -> u64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left this pub(crate) as I'm not sure about the returned value, it is potentially misleading as it doesn't include the impact of any block compression on the size of the final output
65ef676
to
5fa4843
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @tustvold -- I don't have time today to pull this all together and try it inside IOx, but I will do so tomorrow.
FYI @sundy-li |
…ad of RecordBatch (apache#3871) (apache#4280) * Buffer Pages in ArrowWriter instead of RecordBatch (apache#3871) * Review feedback * Improved memory accounting * Clippy
# Description Refactors such that: 1. Runs compaction tasks in parallel, with parallelism controlled by the user but defaulting to number of cpus. (The `num_cpu` crate is used by `tokio`, so we already have it transitively.) 2. Turns on zstd compression by default at level 4. In a future PR, we can make this configurable for Python and maybe benchmark different levels. 3. Initial prep to have other types of optimize commands. However, the writer isn't very good at writing for a target row size, because the code that checks the size of the written file only knows the size of the serialized row groups and not the current row group. So if your row groups are 100MB in size, and you target 150MB, you will get 200MB files. There is upstream work in apache/arrow-rs#4280 that will allow us to write much more exactly sized files, so this will improve in the near future. # Related Issue(s) closes #1171 # Documentation <!--- Share links to useful documentation ---> --------- Co-authored-by: Robert Pack <[email protected]>
Which issue does this PR close?
Closes #3871
Rationale for this change
Buffering pages should be more memory efficient, and also allows for finer-grained control over row group sizing.
What changes are included in this PR?
Are there any user-facing changes?