Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Avoid copy if input is larger than buffer_size #3016

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions core/src/raw/oio/write/exact_buf_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ impl<W: oio::Write> ExactBufWriter<W> {
Self {
inner,
buffer_size,
buffer: Buffer::Filling(BytesMut::new()),
buffer: Buffer::Empty,
}
}
}

enum Buffer {
Empty,
Filling(BytesMut),
Consuming(Bytes),
}
Expand All @@ -65,6 +66,19 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
async fn write(&mut self, mut bs: Bytes) -> Result<u64> {
loop {
match &mut self.buffer {
Buffer::Empty => {
if bs.len() >= self.buffer_size {
bs.truncate(self.buffer_size);
self.buffer = Buffer::Consuming(bs);
return Ok(self.buffer_size as u64);
}

let size = bs.len() as u64;
let mut fill = BytesMut::with_capacity(bs.len());
fill.extend_from_slice(&bs);
self.buffer = Buffer::Filling(fill);
return Ok(size);
}
Buffer::Filling(fill) => {
if fill.len() >= self.buffer_size {
self.buffer = Buffer::Consuming(fill.split().freeze());
Expand All @@ -84,23 +98,28 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
let n = self.inner.write(consume.clone()).await?;
consume.advance(n as usize);
}
self.buffer = Buffer::Filling(BytesMut::new());
self.buffer = Buffer::Empty;
}
}
}
}

async fn copy_from(&mut self, _: u64, mut s: oio::Reader) -> Result<u64> {
async fn copy_from(&mut self, size: u64, mut s: oio::Reader) -> Result<u64> {
loop {
match &mut self.buffer {
Buffer::Empty => {
self.buffer = Buffer::Filling(BytesMut::new());
}
Buffer::Filling(fill) => {
if fill.len() >= self.buffer_size {
self.buffer = Buffer::Consuming(fill.split().freeze());
continue;
}

// Reserve to buffer size.
fill.reserve(self.buffer_size - fill.len());
// Reserve to enough size.
if size > fill.remaining_mut() as u64 {
fill.reserve(self.buffer_size - fill.len());
}
let dst = fill.spare_capacity_mut();
let dst_len = dst.len();
let mut buf = ReadBuf::uninit(dst);
Expand Down Expand Up @@ -130,13 +149,14 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
}

async fn abort(&mut self) -> Result<()> {
self.buffer = Buffer::Filling(BytesMut::new());
self.buffer = Buffer::Empty;
self.inner.abort().await
}

async fn close(&mut self) -> Result<()> {
loop {
match &mut self.buffer {
Buffer::Empty => break,
Buffer::Filling(fill) => {
self.buffer = Buffer::Consuming(fill.split().freeze());
continue;
Expand All @@ -149,6 +169,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
let n = self.inner.write(consume.clone()).await?;
consume.advance(n as usize);
}
self.buffer = Buffer::Empty;
break;
}
}
Expand Down