Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ArrowWriter memory usage: Buffer Pages in ArrowWriter instead of RecordBatch (#3871) #4280

Merged
merged 6 commits into from
May 29, 2023

Conversation

tustvold
Copy link
Contributor

Which issue does this PR close?

Closes #3871

Rationale for this change

Buffering pages should be more memory efficient, and also allows for finer-grained control over row group sizing.

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added the parquet Changes to the parquet crate label May 25, 2023
@@ -437,11 +390,11 @@ struct ByteArrayEncoder {

impl ColumnValueEncoder for ByteArrayEncoder {
type T = ByteArray;
type Values = ArrayRef;
type Values = dyn Array;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a drive-by cleanup

Comment on lines 181 to 182
self.write(&a)?;
return self.write(&b);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was being a bit lazy here, but this recursion will only be a problem if the the size of the RecordBatch is orders of magnitude larger than the row group size, which would be a pretty strange situation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- perhaps we can fix it if this turns out to be a problem in practice.

There is also a workaround for anyone who hits this, which is to break up their input RecordBatch (using slice()) into smaller parts

self.buffered_rows = 0;
let mut row_group_writer = self.writer.next_row_group()?;
for (chunk, close) in in_progress.close()? {
row_group_writer.append_column(&chunk, close)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new API from #4269

type T = ChainReader;

fn get_read(&self, start: u64) -> Result<Self::T> {
assert_eq!(start, 0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are somewhat relying on an implementation detail of append_column, but given this is the same crate, I think it is fine. It will break noisily should we change it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can add a // comment here explaining what this is relying on, in case anyone in the future hits this assert

@@ -162,6 +162,75 @@ impl CompressedPage {
pub fn data(&self) -> &[u8] {
self.compressed_page.buffer().data()
}

/// Returns the thrift page header
pub(crate) fn to_thrift_header(&self) -> PageHeader {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just moved from SerializedPageWriter

@@ -36,7 +36,7 @@ pub trait ColumnValues {
}

#[cfg(feature = "arrow")]
impl<T: arrow_array::Array> ColumnValues for T {
impl ColumnValues for dyn arrow_array::Array {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This trait is not public, so this isn't a breaking change

#[derive(Default)]
struct ArrowColumnChunk {
length: usize,
data: Vec<Bytes>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By buffering separate chunks, we should be performing no more memcpy of data than we were before 🎉

@tustvold
Copy link
Contributor Author

Benchmarks show no discernible impact on performance

@alamb alamb changed the title Buffer Pages in ArrowWriter instead of RecordBatch (#3871) Improve ArrowWriter memory usage: Buffer Pages in ArrowWriter instead of RecordBatch (#3871) May 25, 2023
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much @tustvold -- this looks really nice 👌

I am not sure about the num_values() metadata change - - I recommend we make that a separate PR.

Otherwise I think it looks awesome -- I will verify the memory usage using the reproducer in https://github.com/influxdata/influxdb_iox/pull/7855

parquet/src/arrow/arrow_writer/mod.rs Show resolved Hide resolved
parquet/src/arrow/arrow_writer/mod.rs Outdated Show resolved Hide resolved
Comment on lines 181 to 182
self.write(&a)?;
return self.write(&b);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- perhaps we can fix it if this turns out to be a problem in practice.

There is also a workaround for anyone who hits this, which is to break up their input RecordBatch (using slice()) into smaller parts

}

impl ArrowRowGroupWriter {
fn new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this returns Result I think try_new(...) would be a more idomatic name

parquet/src/arrow/arrow_writer/mod.rs Show resolved Hide resolved
type T = ChainReader;

fn get_read(&self, start: u64) -> Result<Self::T> {
assert_eq!(start, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can add a // comment here explaining what this is relying on, in case anyone in the future hits this assert

/// A shared [`ArrowColumnChunk`]
///
/// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via
/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at this a bit -- is the issue that you can't get the ColumnChunk back from the GenericColumnWriter?

I wonder if we could add something like GenericColumnWriter::close_into_inner() that returned the inner writer and the ColumnCloseResult 🤔

But that would still require downcasting back so 🤷 not sure it is any better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a lot to avoid this, however, I couldn't devise any mechanism that wasn't both a breaking change and pretty grim. This was the least bad option, and doesn't commit us to anything long term

.downcast_ref::<arrow_array::Float32Array>()
.expect("Unable to get Float32 array");
write_primitive(typed, array.values(), levels)?
let array = column.as_primitive::<Float32Type>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 nice and tidy

parquet/src/column/writer/mod.rs Outdated Show resolved Hide resolved
parquet/src/arrow/arrow_writer/mod.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good.

In order to get DataFusion to compile when testing this PR, I had to add some additional Send bounds: 00028c9

You can see the specific error here: https://github.com/influxdata/influxdb_iox/pull/7866#discussion_r1205910780

Is it possible to add the Send bounds (or do a better fix) in this PR?

@@ -152,43 +147,75 @@ impl<W: Write> ArrowWriter<W> {
self.writer.flushed_row_groups()
}

/// Enqueues the provided `RecordBatch` to be written
/// Returns the length in bytes of the current in progress row group
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Returns the length in bytes of the current in progress row group
/// The bytes of memory used to store the current in progress row group.
///
/// The writer incrementally writes each row group on a call to `write` to
/// a buffer in memory. This function reports on the amount of memory
/// in the buffer.

@tustvold
Copy link
Contributor Author

#4287 adds the necessary Send bounds, although I am also exploring an approach that doesn't need them

@@ -152,43 +147,75 @@ impl<W: Write> ArrowWriter<W> {
self.writer.flushed_row_groups()
}

/// Enqueues the provided `RecordBatch` to be written
/// Returns the length in bytes of the current in progress row group
pub fn in_progress_size(&self) -> usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold -- I am having some trouble with this API when using it in IOx -- it is always returning zero (see https://github.com/influxdata/influxdb_iox/pull/7880/files#r1207315133)

I wrote a test for these APIs targeting this PR in tustvold#63 perhaps you can have a look at tell me if I am doing something wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah, this is an oversight on my part. The reported size will just be the size of the flushed pages, any data buffered but not yet flushed to a page will not be counted. This should be at most 1 page per column, and so should be less than the max page size (1MB) per column

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this API return the upper bound? Otherwise I am not sure how useful it is 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear here, the usecase is "I want to ensure that when writing many parquet files concurrently we don't exceed some memory limit (so that the process doing so isn't killed by k8s / the operating system)

This doesn't need to be super accurate, but enough of an upper bound to achieve the above goal.

If it would be ok to add a 1MB overhead for each column (e.g. the PAGE_SIZE)or wherever that buffer is defined, I can try and propose a patch to do so.

Copy link
Contributor Author

@tustvold tustvold May 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to think about how best to support an upper bound, we don't currently expose this information from the encoders. I had assumed, perhaps incorrectly, that an order of magnitude value was sufficient. Ultimately if your at a point where an individual page matters, you're going to have problems...

If the motivation is testing, you could always artificially lower the maximum page size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've inclued tustvold#63 in 4683a30 along with a more accurate form of memory estimation - PTAL

/// of any data that has not yet been flushed to a pge
pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
self.column_metrics.total_bytes_written
+ self.encoder.estimated_data_page_size() as u64
Copy link
Contributor Author

@tustvold tustvold May 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the encoders all buffer encoded data internally, this is a fairly accurate estimation of the memory usage.

Edit: it will still be an underestimate as the allocations may be larger than needed, but this is fairly accurate, and doesn't require any breaking changes

/// Unlike [`Self::get_total_bytes_written`] this includes an estimate
/// of any data that has not yet been flushed to a page
#[cfg(feature = "arrow")]
pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
Copy link
Contributor Author

@tustvold tustvold May 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left this pub(crate) as I'm not sure about the returned value, it is potentially misleading as it doesn't include the impact of any block compression on the size of the final output

@tustvold tustvold requested a review from alamb May 29, 2023 09:50
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @tustvold -- I don't have time today to pull this all together and try it inside IOx, but I will do so tomorrow.

@alamb
Copy link
Contributor

alamb commented May 29, 2023

FYI @sundy-li

@tustvold tustvold merged commit ea00892 into apache:master May 29, 2023
alamb pushed a commit to alamb/arrow-rs that referenced this pull request May 30, 2023
…ad of RecordBatch (apache#3871) (apache#4280)

* Buffer Pages in ArrowWriter instead of RecordBatch (apache#3871)

* Review feedback

* Improved memory accounting

* Clippy
wjones127 added a commit to delta-io/delta-rs that referenced this pull request Jun 3, 2023
# Description

Refactors such that:

1. Runs compaction tasks in parallel, with parallelism controlled by the
user but defaulting to number of cpus. (The `num_cpu` crate is used by
`tokio`, so we already have it transitively.)
2. Turns on zstd compression by default at level 4. In a future PR, we
can make this configurable for Python and maybe benchmark different
levels.
3. Initial prep to have other types of optimize commands.

However, the writer isn't very good at writing for a target row size,
because the code that checks the size of the written file only knows the
size of the serialized row groups and not the current row group. So if
your row groups are 100MB in size, and you target 150MB, you will get
200MB files. There is upstream work in
apache/arrow-rs#4280 that will allow us to write
much more exactly sized files, so this will improve in the near future.

# Related Issue(s)

closes #1171

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: Robert Pack <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Avoid Buffering Arrow Data for Entire Row Group in parquet::ArrowWriter
2 participants