Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ArrowWriter memory usage: Buffer Pages in ArrowWriter instead of RecordBatch (#3871) #4280

Merged
merged 6 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ impl<W: Write + Send> ArrowWriter<W> {
self.writer.flushed_row_groups()
}

/// Returns the length in bytes of the current in progress row group
/// Returns the estimated length in bytes of the current in progress row group
pub fn in_progress_size(&self) -> usize {
alamb marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold -- I am having some trouble with this API when using it in IOx -- it is always returning zero (see https://github.com/influxdata/influxdb_iox/pull/7880/files#r1207315133)

I wrote a test for these APIs targeting this PR in tustvold#63 perhaps you can have a look at tell me if I am doing something wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah, this is an oversight on my part. The reported size will just be the size of the flushed pages, any data buffered but not yet flushed to a page will not be counted. This should be at most 1 page per column, and so should be less than the max page size (1MB) per column

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this API return the upper bound? Otherwise I am not sure how useful it is 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear here, the usecase is "I want to ensure that when writing many parquet files concurrently we don't exceed some memory limit (so that the process doing so isn't killed by k8s / the operating system)

This doesn't need to be super accurate, but enough of an upper bound to achieve the above goal.

If it would be ok to add a 1MB overhead for each column (e.g. the PAGE_SIZE)or wherever that buffer is defined, I can try and propose a patch to do so.

Copy link
Contributor Author

@tustvold tustvold May 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to think about how best to support an upper bound, we don't currently expose this information from the encoders. I had assumed, perhaps incorrectly, that an order of magnitude value was sufficient. Ultimately if your at a point where an individual page matters, you're going to have problems...

If the motivation is testing, you could always artificially lower the maximum page size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've inclued tustvold#63 in 4683a30 along with a more accurate form of memory estimation - PTAL

match &self.in_progress {
Some(in_progress) => in_progress
.writers
.iter()
.map(|(x, _)| x.lock().unwrap().length)
.map(|(_, x)| x.get_estimated_total_bytes() as usize)
.sum(),
None => 0,
}
Expand Down Expand Up @@ -354,6 +354,16 @@ enum ArrowColumnWriter {
Column(ColumnWriter<'static>),
}

impl ArrowColumnWriter {
/// Returns the estimated total bytes for this column writer
fn get_estimated_total_bytes(&self) -> u64 {
match self {
ArrowColumnWriter::ByteArray(c) => c.get_estimated_total_bytes(),
ArrowColumnWriter::Column(c) => c.get_estimated_total_bytes(),
}
}
}

/// Encodes [`RecordBatch`] to a parquet row group
struct ArrowRowGroupWriter {
writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>,
Expand Down Expand Up @@ -2531,4 +2541,40 @@ mod tests {
assert_ne!(back.schema(), batch.schema());
assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
}

#[test]
fn in_progress_accounting() {
// define schema
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);

// create some data
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);

// build a record batch
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();

let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();

// starts empty
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
writer.write(&batch).unwrap();

// updated on write
let initial_size = writer.in_progress_size();
assert!(initial_size > 0);
assert_eq!(writer.in_progress_rows(), 5);

// updated on second write
writer.write(&batch).unwrap();
assert!(writer.in_progress_size() > initial_size);
assert_eq!(writer.in_progress_rows(), 10);

// cleared on flush
writer.flush().unwrap();
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);

writer.close().unwrap();
}
}
41 changes: 31 additions & 10 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ use crate::util::memory::ByteBufferPtr;

pub(crate) mod encoder;

macro_rules! downcast_writer {
($e:expr, $i:ident, $b:expr) => {
match $e {
Self::BoolColumnWriter($i) => $b,
Self::Int32ColumnWriter($i) => $b,
Self::Int64ColumnWriter($i) => $b,
Self::Int96ColumnWriter($i) => $b,
Self::FloatColumnWriter($i) => $b,
Self::DoubleColumnWriter($i) => $b,
Self::ByteArrayColumnWriter($i) => $b,
Self::FixedLenByteArrayColumnWriter($i) => $b,
}
};
}

/// Column writer for a Parquet type.
pub enum ColumnWriter<'a> {
BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
Expand All @@ -56,18 +71,14 @@ pub enum ColumnWriter<'a> {
}

impl<'a> ColumnWriter<'a> {
/// Returns the estimated total bytes for this column writer
pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
downcast_writer!(self, typed, typed.get_estimated_total_bytes())
}

/// Close this [`ColumnWriter`]
pub fn close(self) -> Result<ColumnCloseResult> {
match self {
Self::BoolColumnWriter(typed) => typed.close(),
Self::Int32ColumnWriter(typed) => typed.close(),
Self::Int64ColumnWriter(typed) => typed.close(),
Self::Int96ColumnWriter(typed) => typed.close(),
Self::FloatColumnWriter(typed) => typed.close(),
Self::DoubleColumnWriter(typed) => typed.close(),
Self::ByteArrayColumnWriter(typed) => typed.close(),
Self::FixedLenByteArrayColumnWriter(typed) => typed.close(),
}
downcast_writer!(self, typed, typed.close())
}
}

Expand Down Expand Up @@ -441,6 +452,16 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.column_metrics.total_bytes_written
}

/// Returns the estimated total bytes for this column writer
///
/// Unlike [`Self::get_total_bytes_written`] this includes an estimate
/// of any data that has not yet been flushed to a pge
pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
Copy link
Contributor Author

@tustvold tustvold May 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left this pub(crate) as I'm not sure about the returned value, it is potentially misleading as it doesn't include the impact of any block compression on the size of the final output

self.column_metrics.total_bytes_written
+ self.encoder.estimated_data_page_size() as u64
Copy link
Contributor Author

@tustvold tustvold May 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the encoders all buffer encoded data internally, this is a fairly accurate estimation of the memory usage.

Edit: it will still be an underestimate as the allocations may be larger than needed, but this is fairly accurate, and doesn't require any breaking changes

+ self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
}

/// Returns total number of rows written by this column writer so far.
/// This value is also returned when column writer is closed.
pub fn get_total_rows_written(&self) -> u64 {
Expand Down