From 153385eafafe235266a3f80e3aaf79d44c140b1f Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 11 Jul 2024 16:11:31 +0800 Subject: [PATCH 1/4] Remove returning n in write Signed-off-by: Xuanwo --- core/src/layers/complete.rs | 7 +- core/src/layers/concurrent_limit.rs | 4 +- core/src/layers/error_context.rs | 14 ++- core/src/layers/logging.rs | 25 +++--- core/src/layers/retry.rs | 4 +- core/src/layers/timeout.rs | 2 +- core/src/raw/adapters/kv/backend.rs | 10 +-- core/src/raw/adapters/typed_kv/backend.rs | 10 +-- core/src/raw/enum_utils.rs | 4 +- core/src/raw/oio/write/api.rs | 30 ++----- core/src/raw/oio/write/append_write.rs | 8 +- core/src/raw/oio/write/block_write.rs | 10 +-- core/src/raw/oio/write/multipart_write.rs | 10 +-- core/src/raw/oio/write/one_shot_write.rs | 5 +- core/src/raw/oio/write/position_write.rs | 10 +-- core/src/raw/oio/write/range_write.rs | 10 +-- .../types/blocking_write/blocking_writer.rs | 8 +- core/src/types/blocking_write/std_writer.rs | 10 +-- core/src/types/context/write.rs | 89 +++++++------------ core/src/types/write/buffer_sink.rs | 12 ++- core/src/types/write/writer.rs | 14 +-- 21 files changed, 114 insertions(+), 182 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index c2156a0418e6..68b0340aa71a 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -654,7 +654,7 @@ impl oio::Write for CompleteWriter where W: oio::Write, { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; @@ -689,13 +689,12 @@ impl oio::BlockingWrite for CompleteWriter where W: oio::BlockingWrite, { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { let w = self.inner.as_mut().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "writer has been closed or aborted") })?; - let n = w.write(bs)?; - Ok(n) + w.write(bs) } fn close(&mut self) -> Result<()> { diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index a1a61ad01dc8..87ad19b50c92 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -262,7 +262,7 @@ impl oio::BlockingRead for ConcurrentLimitWrapper { } impl oio::Write for ConcurrentLimitWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs).await } @@ -276,7 +276,7 @@ impl oio::Write for ConcurrentLimitWrapper { } impl oio::BlockingWrite for ConcurrentLimitWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 86ae9dba80cd..cabe84b0538e 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -385,14 +385,13 @@ impl oio::BlockingRead for ErrorContextWrapper { } impl oio::Write for ErrorContextWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let size = bs.len(); self.inner .write(bs) .await - .map(|n| { - self.processed += n as u64; - n + .map(|_| { + self.processed += size as u64; }) .map_err(|err| { err.with_operation(WriteOperation::Write) @@ -423,13 +422,12 @@ impl oio::Write for ErrorContextWrapper { } impl oio::BlockingWrite for ErrorContextWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { let size = bs.len(); self.inner .write(bs) - .map(|n| { - self.processed += n as u64; - n + .map(|_| { + self.processed += size as u64; }) .map_err(|err| { err.with_operation(WriteOperation::BlockingWrite) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 1e0d80d26475..507745c6d147 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1072,21 +1072,20 @@ impl LoggingWriter { } impl oio::Write for LoggingWriter { - async fn write(&mut self, bs: Buffer) -> Result { - match self.inner.write(bs.clone()).await { - Ok(n) => { - self.written += n as u64; + async fn write(&mut self, bs: Buffer) -> Result<()> { + let size = bs.len(); + match self.inner.write(bs).await { + Ok(_) => { trace!( target: LOGGING_TARGET, - "service={} operation={} path={} written={}B -> input data {}B, write {}B", + "service={} operation={} path={} written={}B -> data write {}B", self.ctx.scheme, WriteOperation::Write, self.path, self.written, - bs.len(), - n, + size, ); - Ok(n) + Ok(()) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { @@ -1170,21 +1169,19 @@ impl oio::Write for LoggingWriter { } impl oio::BlockingWrite for LoggingWriter { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { match self.inner.write(bs.clone()) { - Ok(n) => { - self.written += n as u64; + Ok(_) => { trace!( target: LOGGING_TARGET, - "service={} operation={} path={} written={}B -> input data {}B, write {}B", + "service={} operation={} path={} written={}B -> data write {}B", self.ctx.scheme, WriteOperation::BlockingWrite, self.path, self.written, bs.len(), - n ); - Ok(n) + Ok(()) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index c48307fdbc62..5bc37e614549 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -626,7 +626,7 @@ impl oio::BlockingRead for RetryWrapp } impl oio::Write for RetryWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { use backon::RetryableWithContext; let inner = self.take_inner()?; @@ -694,7 +694,7 @@ impl oio::Write for RetryWrapper { } impl oio::BlockingWrite for RetryWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { { || self.inner.as_mut().unwrap().write(bs.clone()) } .retry(&self.builder) .when(|e| e.is_temporary()) diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 246049dfbf24..1cbc0c5ac106 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -350,7 +350,7 @@ impl oio::Read for TimeoutWrapper { } impl oio::Write for TimeoutWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let fut = self.inner.write(bs); Self::io_timeout(self.timeout, WriteOperation::Write.into_static(), fut).await } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index bb08e4cb19f6..625e7ea9829d 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -242,10 +242,9 @@ impl KvWriter { unsafe impl Sync for KvWriter {} impl oio::Write for KvWriter { - async fn write(&mut self, bs: Buffer) -> Result { - let ret = bs.len(); + async fn write(&mut self, bs: Buffer) -> Result<()> { self.buffer.push(bs); - Ok(ret) + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -260,10 +259,9 @@ impl oio::Write for KvWriter { } impl oio::BlockingWrite for KvWriter { - fn write(&mut self, bs: Buffer) -> Result { - let ret = bs.len(); + fn write(&mut self, bs: Buffer) -> Result<()> { self.buffer.push(bs); - Ok(ret) + Ok(()) } fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index ecce2eb8792f..fd6271691b16 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -275,12 +275,11 @@ impl KvWriter { } impl oio::Write for KvWriter { - async fn write(&mut self, bs: Buffer) -> Result { - let size = bs.len(); + async fn write(&mut self, bs: Buffer) -> Result<()> { let mut buf = self.buf.take().unwrap_or_default(); buf.push(bs); self.buf = Some(buf); - Ok(size) + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -303,12 +302,11 @@ impl oio::Write for KvWriter { } impl oio::BlockingWrite for KvWriter { - fn write(&mut self, bs: Buffer) -> Result { - let size = bs.len(); + fn write(&mut self, bs: Buffer) -> Result<()> { let mut buf = self.buf.take().unwrap_or_default(); buf.push(bs); self.buf = Some(buf); - Ok(size) + Ok(()) } fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs index c22411904dc9..111da78be090 100644 --- a/core/src/raw/enum_utils.rs +++ b/core/src/raw/enum_utils.rs @@ -70,7 +70,7 @@ impl oio::BlockingRead for TwoWa } impl oio::Write for TwoWays { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { match self { Self::One(v) => v.write(bs).await, Self::Two(v) => v.write(bs).await, @@ -129,7 +129,7 @@ impl o impl oio::Write for ThreeWays { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { match self { Self::One(v) => v.write(bs).await, Self::Two(v) => v.write(bs).await, diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index e6c7c05918e2..4ec53adab67d 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -77,31 +77,19 @@ pub trait Write: Unpin + Send + Sync { /// /// # Behavior /// - /// - `Ok(n)` means `n` bytes has been written successfully. + /// - `Ok(())` means all bytes has been written successfully. /// - `Err(err)` means error happens and no bytes has been written. - /// - /// It's possible that `n < bs.len()`, caller should pass the remaining bytes - /// repeatedly until all bytes has been written. - #[cfg(not(target_arch = "wasm32"))] - fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend; - #[cfg(target_arch = "wasm32")] - fn write(&mut self, bs: Buffer) -> impl Future>; + fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend; /// Close the writer and make sure all data has been flushed. - #[cfg(not(target_arch = "wasm32"))] fn close(&mut self) -> impl Future> + MaybeSend; - #[cfg(target_arch = "wasm32")] - fn close(&mut self) -> impl Future>; /// Abort the pending writer. - #[cfg(not(target_arch = "wasm32"))] fn abort(&mut self) -> impl Future> + MaybeSend; - #[cfg(target_arch = "wasm32")] - fn abort(&mut self) -> impl Future>; } impl Write for () { - async fn write(&mut self, _: Buffer) -> Result { + async fn write(&mut self, _: Buffer) -> Result<()> { unimplemented!("write is required to be implemented for oio::Write") } @@ -121,7 +109,7 @@ impl Write for () { } pub trait WriteDyn: Unpin + Send + Sync { - fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture>; + fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture>; fn close_dyn(&mut self) -> BoxedFuture>; @@ -129,7 +117,7 @@ pub trait WriteDyn: Unpin + Send + Sync { } impl WriteDyn for T { - fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture> { + fn write_dyn(&mut self, bs: Buffer) -> BoxedFuture> { Box::pin(self.write(bs)) } @@ -143,7 +131,7 @@ impl WriteDyn for T { } impl Write for Box { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { self.deref_mut().write_dyn(bs).await } @@ -170,14 +158,14 @@ pub trait BlockingWrite: Send + Sync + 'static { /// /// It's possible that `n < bs.len()`, caller should pass the remaining bytes /// repeatedly until all bytes has been written. - fn write(&mut self, bs: Buffer) -> Result; + fn write(&mut self, bs: Buffer) -> Result<()>; /// Close the writer and make sure all data has been flushed. fn close(&mut self) -> Result<()>; } impl BlockingWrite for () { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { let _ = bs; unimplemented!("write is required to be implemented for oio::BlockingWrite") @@ -195,7 +183,7 @@ impl BlockingWrite for () { /// /// To make BlockingWriter work as expected, we must add this impl. impl BlockingWrite for Box { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { (**self).write(bs) } diff --git a/core/src/raw/oio/write/append_write.rs b/core/src/raw/oio/write/append_write.rs index 2f48b683079c..06c72cc5e2cc 100644 --- a/core/src/raw/oio/write/append_write.rs +++ b/core/src/raw/oio/write/append_write.rs @@ -80,7 +80,7 @@ impl oio::Write for AppendWriter where W: AppendWrite, { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let offset = match self.offset { Some(offset) => offset, None => { @@ -91,12 +91,10 @@ where }; let size = bs.len(); - self.inner - .append(offset, size as u64, Buffer::from(bs.to_bytes())) - .await?; + self.inner.append(offset, size as u64, bs).await?; // Update offset after succeed. self.offset = Some(offset + size as u64); - Ok(size) + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/block_write.rs b/core/src/raw/oio/write/block_write.rs index 99c76562ca85..cd0ec43b45a1 100644 --- a/core/src/raw/oio/write/block_write.rs +++ b/core/src/raw/oio/write/block_write.rs @@ -162,10 +162,10 @@ impl oio::Write for BlockWriter where W: BlockWrite, { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { if !self.started && self.cache.is_none() { - let size = self.fill_cache(bs); - return Ok(size); + self.fill_cache(bs); + return Ok(()); } // The block upload process has been started. @@ -181,8 +181,8 @@ where }) .await?; self.cache = None; - let size = self.fill_cache(bs); - Ok(size) + self.fill_cache(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/multipart_write.rs b/core/src/raw/oio/write/multipart_write.rs index 0d893d7cb320..44a33c7a4ba0 100644 --- a/core/src/raw/oio/write/multipart_write.rs +++ b/core/src/raw/oio/write/multipart_write.rs @@ -203,14 +203,14 @@ impl oio::Write for MultipartWriter where W: MultipartWrite, { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let upload_id = match self.upload_id.clone() { Some(v) => v, None => { // Fill cache with the first write. if self.cache.is_none() { - let size = self.fill_cache(bs); - return Ok(size); + self.fill_cache(bs); + return Ok(()); } let upload_id = self.w.initiate_part().await?; @@ -234,8 +234,8 @@ where .await?; self.cache = None; self.next_part_number += 1; - let size = self.fill_cache(bs); - Ok(size) + self.fill_cache(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index cd056c14614d..938973c33a71 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -50,16 +50,15 @@ impl OneShotWriter { } impl oio::Write for OneShotWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { match &self.buffer { Some(_) => Err(Error::new( ErrorKind::Unsupported, "OneShotWriter doesn't support multiple write", )), None => { - let size = bs.len(); self.buffer = Some(bs); - Ok(size) + Ok(()) } } } diff --git a/core/src/raw/oio/write/position_write.rs b/core/src/raw/oio/write/position_write.rs index 3dbf5c93ef2f..5aa5ff329432 100644 --- a/core/src/raw/oio/write/position_write.rs +++ b/core/src/raw/oio/write/position_write.rs @@ -124,10 +124,10 @@ impl PositionWriter { } impl oio::Write for PositionWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { if self.cache.is_none() { - let size = self.fill_cache(bs); - return Ok(size); + let _ = self.fill_cache(bs); + return Ok(()); } let bytes = self.cache.clone().expect("pending write must exist"); @@ -144,8 +144,8 @@ impl oio::Write for PositionWriter { .await?; self.cache = None; self.next_offset += length; - let size = self.fill_cache(bs); - Ok(size) + let _ = self.fill_cache(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index 67ae619dd924..f44f06ad9cbd 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -155,14 +155,14 @@ impl RangeWriter { } impl oio::Write for RangeWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let location = match self.location.clone() { Some(location) => location, None => { // Fill cache with the first write. if self.cache.is_none() { - let size = self.fill_cache(bs); - return Ok(size); + self.fill_cache(bs); + return Ok(()); } let location = self.w.initiate_range().await?; @@ -187,8 +187,8 @@ impl oio::Write for RangeWriter { .await?; self.cache = None; self.next_offset += length; - let size = self.fill_cache(bs); - Ok(size) + self.fill_cache(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/types/blocking_write/blocking_writer.rs b/core/src/types/blocking_write/blocking_writer.rs index 489cae502a74..d97cabb1472a 100644 --- a/core/src/types/blocking_write/blocking_writer.rs +++ b/core/src/types/blocking_write/blocking_writer.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use bytes::Buf; use std::sync::Arc; use crate::raw::*; @@ -68,12 +67,7 @@ impl BlockingWriter { /// } /// ``` pub fn write(&mut self, bs: impl Into) -> Result<()> { - let mut bs = bs.into(); - while !bs.is_empty() { - let n = self.inner.write(bs.clone())?; - bs.advance(n); - } - Ok(()) + self.inner.write(bs.into()) } /// Close the writer and make sure all data have been committed. diff --git a/core/src/types/blocking_write/std_writer.rs b/core/src/types/blocking_write/std_writer.rs index 5b18467e369a..fe918b43bf87 100644 --- a/core/src/types/blocking_write/std_writer.rs +++ b/core/src/types/blocking_write/std_writer.rs @@ -81,10 +81,9 @@ impl Write for StdWriter { } let bs = self.buf.get().expect("frozen buffer must be valid"); - let n = w - .write(Buffer::from(bs)) + w.write(Buffer::from(bs)) .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; - self.buf.advance(n); + self.buf.clean(); } } @@ -103,10 +102,9 @@ impl Write for StdWriter { return Ok(()); }; - let n = w - .write(Buffer::from(bs)) + w.write(Buffer::from(bs)) .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; - self.buf.advance(n); + self.buf.clean(); } } } diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs index 557341da3dc6..92248762cf87 100644 --- a/core/src/types/context/write.rs +++ b/core/src/types/context/write.rs @@ -18,7 +18,6 @@ use crate::raw::oio::Write; use crate::raw::*; use crate::*; -use bytes::Buf; use std::sync::Arc; /// WriteContext holds the immutable context for give write operation. @@ -134,15 +133,14 @@ impl WriteGenerator { impl WriteGenerator { /// Write the entire buffer into writer. - pub async fn write(&mut self, mut bs: Buffer) -> Result { + pub async fn write(&mut self, mut bs: Buffer) -> Result<()> { let Some(chunk_size) = self.chunk_size else { return self.w.write_dyn(bs).await; }; if self.buffer.len() + bs.len() < chunk_size { - let size = bs.len(); self.buffer.push(bs); - return Ok(size); + return Ok(()); } // Condition: @@ -151,13 +149,10 @@ impl WriteGenerator { // Action: // - write buffer + bs directly. if !self.exact { - let fill_size = bs.len(); self.buffer.push(bs); - let mut buf = self.buffer.take().collect(); - let written = self.w.write_dyn(buf.clone()).await?; - buf.advance(written); - self.buffer.push(buf); - return Ok(fill_size); + let buf = self.buffer.take().collect(); + self.w.write_dyn(buf).await?; + return Ok(()); } // Condition: @@ -167,10 +162,8 @@ impl WriteGenerator { // Action: // - write existing buffer in chunk_size to make more rooms for writing data. if self.buffer.len() >= chunk_size { - let mut buf = self.buffer.take().collect(); - let written = self.w.write_dyn(buf.clone()).await?; - buf.advance(written); - self.buffer.push(buf); + let buf = self.buffer.take().collect(); + self.w.write_dyn(buf).await?; } // Condition @@ -180,9 +173,8 @@ impl WriteGenerator { // - write bs to buffer with remaining size. let remaining = chunk_size - self.buffer.len(); bs.truncate(remaining); - let n = bs.len(); self.buffer.push(bs); - Ok(n) + Ok(()) } /// Finish the write process. @@ -192,8 +184,8 @@ impl WriteGenerator { break; } - let written = self.w.write_dyn(self.buffer.clone().collect()).await?; - self.buffer.advance(written); + self.w.write_dyn(self.buffer.clone().collect()).await?; + self.buffer.clear(); } self.w.close().await @@ -223,15 +215,14 @@ impl WriteGenerator { impl WriteGenerator { /// Write the entire buffer into writer. - pub fn write(&mut self, mut bs: Buffer) -> Result { + pub fn write(&mut self, mut bs: Buffer) -> Result<()> { let Some(chunk_size) = self.chunk_size else { return self.w.write(bs); }; if self.buffer.len() + bs.len() < chunk_size { - let size = bs.len(); self.buffer.push(bs); - return Ok(size); + return Ok(()); } // Condition: @@ -240,13 +231,10 @@ impl WriteGenerator { // Action: // - write buffer + bs directly. if !self.exact { - let fill_size = bs.len(); self.buffer.push(bs); - let mut buf = self.buffer.take().collect(); - let written = self.w.write(buf.clone())?; - buf.advance(written); - self.buffer.push(buf); - return Ok(fill_size); + let buf = self.buffer.take().collect(); + self.w.write(buf)?; + return Ok(()); } // Condition: @@ -256,10 +244,8 @@ impl WriteGenerator { // Action: // - write existing buffer in chunk_size to make more rooms for writing data. if self.buffer.len() >= chunk_size { - let mut buf = self.buffer.take().collect(); - let written = self.w.write(buf.clone())?; - buf.advance(written); - self.buffer.push(buf); + let buf = self.buffer.take().collect(); + self.w.write(buf)?; } // Condition @@ -269,9 +255,8 @@ impl WriteGenerator { // - write bs to buffer with remaining size. let remaining = chunk_size - self.buffer.len(); bs.truncate(remaining); - let n = bs.len(); self.buffer.push(bs); - Ok(n) + Ok(()) } /// Finish the write process. @@ -281,8 +266,8 @@ impl WriteGenerator { break; } - let written = self.w.write(self.buffer.clone().collect())?; - self.buffer.advance(written); + self.w.write(self.buffer.clone().collect())?; + self.buffer.clear(); } self.w.close() @@ -343,10 +328,7 @@ mod tests { let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() }), Some(10), true); let mut bs = Bytes::from(expected.clone()); - while !bs.is_empty() { - let n = w.write(bs.clone().into()).await?; - bs.advance(n); - } + w.write(bs.clone().into()).await?; w.close().await?; @@ -375,10 +357,7 @@ mod tests { rng.fill_bytes(&mut expected); let bs = Bytes::from(expected.clone()); - // The MockWriter always returns the first chunk size. - let n = w.write(bs.into()).await?; - assert_eq!(expected.len(), n); - + w.write(bs.into()).await?; w.close().await?; let buf = buf.lock().await; @@ -413,14 +392,13 @@ mod tests { // content > chunk size. let content = new_content(15); - assert_eq!(15, w.write(content.into()).await?); + w.write(content.into()).await?; // content < chunk size. let content = new_content(5); - assert_eq!(5, w.write(content.into()).await?); + w.write(content.into()).await?; // content > chunk size, but 5 bytes in queue. let content = new_content(15); - // The MockWriter can send all 15 bytes together, so we can only advance 5 bytes. - assert_eq!(15, w.write(content.clone().into()).await?); + w.write(content.clone().into()).await?; w.close().await?; @@ -456,16 +434,16 @@ mod tests { // content > chunk size. let content = new_content(15); - assert_eq!(15, w.write(content.into()).await?); + w.write(content.into()).await?; // content < chunk size. let content = new_content(5); - assert_eq!(5, w.write(content.into()).await?); + w.write(content.into()).await?; // content < chunk size. let content = new_content(3); - assert_eq!(3, w.write(content.into()).await?); + w.write(content.into()).await?; // content > chunk size, but can send all chunks in the queue. let content = new_content(15); - assert_eq!(15, w.write(content.clone().into()).await?); + w.write(content.clone().into()).await?; w.close().await?; @@ -539,10 +517,10 @@ mod tests { // content < chunk size. let content = new_content(5); - assert_eq!(5, w.write(content.into()).await?); + w.write(content.into()).await?; // Non-contiguous buffer. let content = Buffer::from(vec![new_content(3), new_content(2)]); - assert_eq!(5, w.write(content).await?); + w.write(content).await?; w.close().await?; @@ -584,10 +562,7 @@ mod tests { expected.extend_from_slice(&content); let mut bs = Bytes::from(content.clone()); - while !bs.is_empty() { - let n = writer.write(bs.clone().into()).await?; - bs.advance(n); - } + writer.write(bs.clone().into()).await?; } writer.close().await?; diff --git a/core/src/types/write/buffer_sink.rs b/core/src/types/write/buffer_sink.rs index 46d911253002..cc42e09090bb 100644 --- a/core/src/types/write/buffer_sink.rs +++ b/core/src/types/write/buffer_sink.rs @@ -20,8 +20,6 @@ use std::task::ready; use std::task::Context; use std::task::Poll; -use bytes::Buf; - use crate::raw::*; use crate::*; @@ -35,7 +33,7 @@ pub struct BufferSink { enum State { Idle(Option>), - Writing(BoxedStaticFuture<(WriteGenerator, Result)>), + Writing(BoxedStaticFuture<(WriteGenerator, Result<()>)>), Closing(BoxedStaticFuture<(WriteGenerator, Result<()>)>), } @@ -92,8 +90,8 @@ impl futures::Sink for BufferSink { let (w, res) = ready!(fut.as_mut().poll(cx)); this.state = State::Idle(Some(w)); match res { - Ok(n) => { - this.buf.advance(n); + Ok(_) => { + this.buf = Buffer::new(); } Err(err) => return Poll::Ready(Err(err)), } @@ -139,8 +137,8 @@ impl futures::Sink for BufferSink { let (w, res) = ready!(fut.as_mut().poll(cx)); this.state = State::Idle(Some(w)); match res { - Ok(n) => { - this.buf.advance(n); + Ok(_) => { + this.buf = Buffer::new(); } Err(err) => return Poll::Ready(Err(err)), } diff --git a/core/src/types/write/writer.rs b/core/src/types/write/writer.rs index dc81d3d71647..44755ed37cba 100644 --- a/core/src/types/write/writer.rs +++ b/core/src/types/write/writer.rs @@ -136,12 +136,7 @@ impl Writer { /// } /// ``` pub async fn write(&mut self, bs: impl Into) -> Result<()> { - let mut bs = bs.into(); - while !bs.is_empty() { - let n = self.inner.write(bs.clone()).await?; - bs.advance(n); - } - Ok(()) + self.inner.write(bs.into()).await } /// Write [`bytes::Buf`] into inner writer. @@ -153,11 +148,8 @@ impl Writer { /// Optimize this function to avoid unnecessary copy. pub async fn write_from(&mut self, bs: impl Buf) -> Result<()> { let mut bs = bs; - let mut bs = Buffer::from(bs.copy_to_bytes(bs.remaining())); - while !bs.is_empty() { - let n = self.inner.write(bs.clone()).await?; - bs.advance(n); - } + let bs = Buffer::from(bs.copy_to_bytes(bs.remaining())); + self.inner.write(bs).await?; Ok(()) } From 911ef20b98f9a49040991e3d40399efac4deac36 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 11 Jul 2024 16:47:23 +0800 Subject: [PATCH 2/4] Fix build Signed-off-by: Xuanwo --- core/src/layers/async_backtrace.rs | 4 +- core/src/layers/await_tree.rs | 4 +- core/src/layers/blocking.rs | 2 +- core/src/layers/dtrace.rs | 14 ++-- core/src/layers/metrics.rs | 17 +++-- core/src/layers/minitrace.rs | 4 +- core/src/layers/oteltrace.rs | 4 +- core/src/layers/prometheus.rs | 20 +++-- core/src/layers/prometheus_client.rs | 16 ++-- core/src/layers/retry.rs | 4 +- core/src/layers/throttle.rs | 4 +- core/src/layers/tracing.rs | 4 +- core/src/services/aliyun_drive/writer.rs | 6 +- core/src/services/alluxio/writer.rs | 7 +- core/src/services/compfs/writer.rs | 24 +++--- core/src/services/fs/writer.rs | 19 +++-- core/src/services/ftp/writer.rs | 25 ++++--- core/src/services/ghac/writer.rs | 4 +- core/src/services/hdfs/writer.rs | 18 ++++- core/src/services/hdfs_native/writer.rs | 2 +- core/src/services/sftp/writer.rs | 13 +++- core/src/types/context/write.rs | 94 ++---------------------- 22 files changed, 133 insertions(+), 176 deletions(-) diff --git a/core/src/layers/async_backtrace.rs b/core/src/layers/async_backtrace.rs index 6d77591ca003..290171c29c60 100644 --- a/core/src/layers/async_backtrace.rs +++ b/core/src/layers/async_backtrace.rs @@ -169,7 +169,7 @@ impl oio::BlockingRead for AsyncBacktraceWrapper { impl oio::Write for AsyncBacktraceWrapper { #[async_backtrace::framed] - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs).await } @@ -185,7 +185,7 @@ impl oio::Write for AsyncBacktraceWrapper { } impl oio::BlockingWrite for AsyncBacktraceWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs index 7bb20d42fb49..58fd73e8fe4f 100644 --- a/core/src/layers/await_tree.rs +++ b/core/src/layers/await_tree.rs @@ -191,7 +191,7 @@ impl oio::BlockingRead for AwaitTreeWrapper { } impl oio::Write for AwaitTreeWrapper { - fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { + fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { self.inner .write(bs) .instrument_await(format!("opendal::{}", WriteOperation::Write.into_static())) @@ -211,7 +211,7 @@ impl oio::Write for AwaitTreeWrapper { } impl oio::BlockingWrite for AwaitTreeWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 70830981c831..7293ced965af 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -288,7 +288,7 @@ impl oio::BlockingRead for BlockingWrapper { } impl oio::BlockingWrite for BlockingWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { self.handle.block_on(self.inner.write(bs)) } diff --git a/core/src/layers/dtrace.rs b/core/src/layers/dtrace.rs index c71277ed9a6d..51b001313b58 100644 --- a/core/src/layers/dtrace.rs +++ b/core/src/layers/dtrace.rs @@ -379,15 +379,14 @@ impl oio::BlockingRead for DtraceLayerWrapper { } impl oio::Write for DtraceLayerWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let c_path = CString::new(self.path.clone()).unwrap(); probe_lazy!(opendal, writer_write_start, c_path.as_ptr()); self.inner .write(bs) .await - .map(|n| { - probe_lazy!(opendal, writer_write_ok, c_path.as_ptr(), n); - n + .map(|_| { + probe_lazy!(opendal, writer_write_ok, c_path.as_ptr()); }) .map_err(|err| { probe_lazy!(opendal, writer_write_error, c_path.as_ptr()); @@ -427,14 +426,13 @@ impl oio::Write for DtraceLayerWrapper { } impl oio::BlockingWrite for DtraceLayerWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { let c_path = CString::new(self.path.clone()).unwrap(); probe_lazy!(opendal, blocking_writer_write_start, c_path.as_ptr()); self.inner .write(bs) - .map(|n| { - probe_lazy!(opendal, blocking_writer_write_ok, c_path.as_ptr(), n); - n + .map(|_| { + probe_lazy!(opendal, blocking_writer_write_ok, c_path.as_ptr()); }) .map_err(|err| { probe_lazy!(opendal, blocking_writer_write_error, c_path.as_ptr()); diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 47d7a8a936ce..decbad81a8d3 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -785,17 +785,17 @@ impl oio::BlockingRead for MetricWrapper { } impl oio::Write for MetricWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let start = Instant::now(); + let size = bs.len(); self.inner .write(bs) .await - .map(|n| { - self.bytes_counter.increment(n as u64); + .map(|_| { + self.bytes_counter.increment(size as u64); self.requests_duration_seconds .record(start.elapsed().as_secs_f64()); - n }) .map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); @@ -819,12 +819,13 @@ impl oio::Write for MetricWrapper { } impl oio::BlockingWrite for MetricWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { + let size = bs.len(); + self.inner .write(bs) - .map(|n| { - self.bytes_counter.increment(n as u64); - n + .map(|_| { + self.bytes_counter.increment(size as u64); }) .map_err(|err| { self.handle.increment_errors_total(self.op, err.kind()); diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 983d18b6b8fd..bca4a2e584df 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -306,7 +306,7 @@ impl oio::BlockingRead for MinitraceWrapper { } impl oio::Write for MinitraceWrapper { - fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { + fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { let _g = self.span.set_local_parent(); let _span = LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static()); self.inner.write(bs) @@ -326,7 +326,7 @@ impl oio::Write for MinitraceWrapper { } impl oio::BlockingWrite for MinitraceWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { let _g = self.span.set_local_parent(); let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static()); self.inner.write(bs) diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index 6fb5d582ba13..b04ad22fddba 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -284,7 +284,7 @@ impl oio::BlockingRead for OtelTraceWrapper { } impl oio::Write for OtelTraceWrapper { - fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { + fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { self.inner.write(bs) } @@ -298,7 +298,7 @@ impl oio::Write for OtelTraceWrapper { } impl oio::BlockingWrite for OtelTraceWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index ee740d55c428..a4359ead7261 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -742,7 +742,9 @@ impl oio::BlockingRead for PrometheusMetricWrapper { } impl oio::Write for PrometheusMetricWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { + let size = bs.len(); + let labels = self.stats.generate_metric_label( self.scheme.into_static(), WriteOperation::Write.into_static(), @@ -758,12 +760,12 @@ impl oio::Write for PrometheusMetricWrapper { timer.observe_duration(); match res { - Ok(n) => { + Ok(_) => { self.stats .bytes_total .with_label_values(&labels) - .observe(n as f64); - Ok(n) + .observe(size as f64); + Ok(()) } Err(err) => { self.stats.increment_errors_total(self.op, err.kind()); @@ -822,7 +824,9 @@ impl oio::Write for PrometheusMetricWrapper { } impl oio::BlockingWrite for PrometheusMetricWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { + let size = bs.len(); + let labels = self.stats.generate_metric_label( self.scheme.into_static(), Operation::BlockingWrite.into_static(), @@ -838,12 +842,12 @@ impl oio::BlockingWrite for PrometheusMetricWrapper { timer.observe_duration(); match res { - Ok(n) => { + Ok(_) => { self.stats .bytes_total .with_label_values(&labels) - .observe(n as f64); - Ok(n) + .observe(size as f64); + Ok(()) } Err(err) => { self.stats.increment_errors_total(self.op, err.kind()); diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 463e40927508..58fef7e89450 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -627,24 +627,24 @@ impl oio::BlockingRead for PrometheusMetricWrapper { } impl oio::Write for PrometheusMetricWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let start = Instant::now(); + let size = bs.len(); self.inner .write(bs) .await - .map(|n| { + .map(|_| { self.metrics.observe_bytes_total( self.scheme, WriteOperation::Write.into_static(), - n, + size, ); self.metrics.observe_request_duration( self.scheme, WriteOperation::Write.into_static(), start.elapsed(), ); - n }) .map_err(|err| { self.metrics.increment_errors_total( @@ -704,23 +704,23 @@ impl oio::Write for PrometheusMetricWrapper { } impl oio::BlockingWrite for PrometheusMetricWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { let start = Instant::now(); + let size = bs.len(); self.inner .write(bs) - .map(|n| { + .map(|_| { self.metrics.observe_bytes_total( self.scheme, WriteOperation::BlockingWrite.into_static(), - n, + size, ); self.metrics.observe_request_duration( self.scheme, WriteOperation::BlockingWrite.into_static(), start.elapsed(), ); - n }) .map_err(|err| { self.metrics.increment_errors_total( diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 5bc37e614549..4d42098394a6 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -938,8 +938,8 @@ mod tests { struct MockWriter {} impl oio::Write for MockWriter { - async fn write(&mut self, bs: Buffer) -> Result { - Ok(bs.len()) + async fn write(&mut self, _: Buffer) -> Result<()> { + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index 3b66e327f39e..f73f33d3caa6 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -191,7 +191,7 @@ impl oio::BlockingRead for ThrottleWrapper { } impl oio::Write for ThrottleWrapper { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let buf_length = NonZeroU32::new(bs.len() as u32).unwrap(); loop { @@ -226,7 +226,7 @@ impl oio::Write for ThrottleWrapper { } impl oio::BlockingWrite for ThrottleWrapper { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { let buf_length = NonZeroU32::new(bs.len() as u32).unwrap(); loop { diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 4ba829a6ec7a..4a2dc4bc006f 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -286,7 +286,7 @@ impl oio::Write for TracingWrapper { parent = &self.span, level = "trace", skip_all)] - fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { + fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { self.inner.write(bs) } @@ -312,7 +312,7 @@ impl oio::BlockingWrite for TracingWrapper { parent = &self.span, level = "trace", skip_all)] - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, bs: Buffer) -> Result<()> { self.inner.write(bs) } diff --git a/core/src/services/aliyun_drive/writer.rs b/core/src/services/aliyun_drive/writer.rs index 30ca2ef94e2e..461764541fe4 100644 --- a/core/src/services/aliyun_drive/writer.rs +++ b/core/src/services/aliyun_drive/writer.rs @@ -51,7 +51,7 @@ impl AliyunDriveWriter { } impl oio::Write for AliyunDriveWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let (upload_id, file_id) = match (self.upload_id.as_ref(), self.file_id.as_ref()) { (Some(upload_id), Some(file_id)) => (upload_id, file_id), _ => { @@ -94,8 +94,6 @@ impl oio::Write for AliyunDriveWriter { return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url")); }; - let size = bs.len(); - if let Err(err) = self.core.upload(upload_url, bs).await { if err.kind() != ErrorKind::AlreadyExists { return Err(err); @@ -104,7 +102,7 @@ impl oio::Write for AliyunDriveWriter { self.part_number += 1; - Ok(size) + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/services/alluxio/writer.rs b/core/src/services/alluxio/writer.rs index e5ca807b609c..f452b6b10bbd 100644 --- a/core/src/services/alluxio/writer.rs +++ b/core/src/services/alluxio/writer.rs @@ -43,7 +43,7 @@ impl AlluxioWriter { } impl oio::Write for AlluxioWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let stream_id = match self.stream_id { Some(stream_id) => stream_id, None => { @@ -52,9 +52,8 @@ impl oio::Write for AlluxioWriter { stream_id } }; - self.core - .write(stream_id, Buffer::from(bs.to_bytes())) - .await + self.core.write(stream_id, bs).await?; + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/services/compfs/writer.rs b/core/src/services/compfs/writer.rs index 2e12ea2dc77f..749cab10ffcc 100644 --- a/core/src/services/compfs/writer.rs +++ b/core/src/services/compfs/writer.rs @@ -15,13 +15,12 @@ // specific language governing permissions and limitations // under the License. -use std::{io::Cursor, sync::Arc}; - -use compio::{buf::buf_try, fs::File, io::AsyncWrite}; - use super::core::CompfsCore; use crate::raw::*; use crate::*; +use compio::io::AsyncWriteExt; +use compio::{buf::buf_try, fs::File}; +use std::{io::Cursor, sync::Arc}; #[derive(Debug)] pub struct CompfsWriter { @@ -36,17 +35,22 @@ impl CompfsWriter { } impl oio::Write for CompfsWriter { - async fn write(&mut self, bs: Buffer) -> Result { + /// FIXME + /// + /// the write_all doesn't work correctly if `bs` is non-contiguous. + /// + /// The IoBuf::buf_len() only returns the length of the current buffer. + async fn write(&mut self, bs: Buffer) -> Result<()> { let mut file = self.file.clone(); - let n = self - .core + self.core .exec(move || async move { - let (n, _) = buf_try!(@try file.write(bs).await); - Ok(n) + buf_try!(@try file.write_all(bs).await); + Ok(()) }) .await?; - Ok(n) + + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index b9e84d736c75..d0d1bc66741c 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -53,11 +53,15 @@ impl FsWriter { unsafe impl Sync for FsWriter {} impl oio::Write for FsWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, mut bs: Buffer) -> Result<()> { let f = self.f.as_mut().expect("FsWriter must be initialized"); - // TODO: use write_vectored instead. - f.write(bs.chunk()).await.map_err(new_std_io_error) + while bs.has_remaining() { + let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -88,10 +92,15 @@ impl oio::Write for FsWriter { } impl oio::BlockingWrite for FsWriter { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, mut bs: Buffer) -> Result<()> { let f = self.f.as_mut().expect("FsWriter must be initialized"); - f.write(bs.chunk()).map_err(new_std_io_error) + while bs.has_remaining() { + let n = f.write(bs.chunk()).map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } fn close(&mut self) -> Result<()> { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index ec21609c1862..f7f6864dd9c2 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -53,7 +53,7 @@ impl FtpWriter { } impl oio::Write for FtpWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, mut bs: Buffer) -> Result<()> { let path = if let Some(tmp_path) = &self.tmp_path { tmp_path } else { @@ -69,17 +69,20 @@ impl oio::Write for FtpWriter { )); } - let size = self - .data_stream - .as_mut() - .unwrap() - .write(bs.chunk()) - .await - .map_err(|err| { - Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err) - })?; + while bs.has_remaining() { + let n = self + .data_stream + .as_mut() + .unwrap() + .write(bs.chunk()) + .await + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err) + })?; + bs.advance(n); + } - Ok(size) + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index 00bce7fac34a..3b58dd0fc2d1 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -38,7 +38,7 @@ impl GhacWriter { } impl oio::Write for GhacWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { let size = bs.len(); let offset = self.size; @@ -61,7 +61,7 @@ impl oio::Write for GhacWriter { } self.size += size as u64; - Ok(size) + Ok(()) } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 96708c1ab1ac..0f60014f410d 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -53,10 +53,15 @@ impl HdfsWriter { } impl oio::Write for HdfsWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, mut bs: Buffer) -> Result<()> { let f = self.f.as_mut().expect("HdfsWriter must be initialized"); - f.write(bs.chunk()).await.map_err(new_std_io_error) + while bs.has_remaining() { + let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -82,9 +87,14 @@ impl oio::Write for HdfsWriter { } impl oio::BlockingWrite for HdfsWriter { - fn write(&mut self, bs: Buffer) -> Result { + fn write(&mut self, mut bs: Buffer) -> Result<()> { let f = self.f.as_mut().expect("HdfsWriter must be initialized"); - f.write(bs.chunk()).map_err(new_std_io_error) + while bs.has_remaining() { + let n = f.write(bs.chunk()).map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } fn close(&mut self) -> Result<()> { diff --git a/core/src/services/hdfs_native/writer.rs b/core/src/services/hdfs_native/writer.rs index e6fb0205e4f2..4cab45b3be46 100644 --- a/core/src/services/hdfs_native/writer.rs +++ b/core/src/services/hdfs_native/writer.rs @@ -31,7 +31,7 @@ impl HdfsNativeWriter { } impl oio::Write for HdfsNativeWriter { - async fn write(&mut self, _bs: Buffer) -> Result { + async fn write(&mut self, _bs: Buffer) -> Result<()> { todo!() } diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 69bff86245c2..9b86c789577a 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -39,8 +39,17 @@ impl SftpWriter { } impl oio::Write for SftpWriter { - async fn write(&mut self, bs: Buffer) -> Result { - self.file.write(bs.chunk()).await.map_err(new_std_io_error) + async fn write(&mut self, mut bs: Buffer) -> Result<()> { + while bs.has_remaining() { + let n = self + .file + .write(bs.chunk()) + .await + .map_err(new_std_io_error)?; + bs.advance(n); + } + + Ok(()) } async fn close(&mut self) -> Result<()> { diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs index 92248762cf87..af875c14ed2f 100644 --- a/core/src/types/context/write.rs +++ b/core/src/types/context/write.rs @@ -278,7 +278,7 @@ impl WriteGenerator { mod tests { use super::*; use crate::raw::oio::Write; - use bytes::Buf; + use bytes::BufMut; use bytes::Bytes; use log::debug; use pretty_assertions::assert_eq; @@ -294,13 +294,12 @@ mod tests { } impl Write for MockWriter { - async fn write(&mut self, bs: Buffer) -> Result { + async fn write(&mut self, bs: Buffer) -> Result<()> { debug!("test_fuzz_exact_buf_writer: flush size: {}", &bs.len()); - let chunk = bs.chunk(); let mut buf = self.buf.lock().await; - buf.extend_from_slice(chunk); - Ok(chunk.len()) + buf.put(bs); + Ok(()) } async fn close(&mut self) -> Result<()> { @@ -327,8 +326,8 @@ mod tests { let buf = Arc::new(Mutex::new(vec![])); let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() }), Some(10), true); - let mut bs = Bytes::from(expected.clone()); - w.write(bs.clone().into()).await?; + let bs = Bytes::from(expected.clone()); + w.write(bs.into()).await?; w.close().await?; @@ -456,83 +455,6 @@ mod tests { Ok(()) } - struct PartialWriter { - buf: Arc>>, - } - - impl Write for PartialWriter { - async fn write(&mut self, mut bs: Buffer) -> Result { - let mut buf = self.buf.lock().await; - - if Buffer::count(&bs) > 1 { - // Always leaves last buffer for non-contiguous buffer. - let mut written = 0; - while Buffer::count(&bs) > 1 { - let chunk = bs.chunk(); - buf.extend_from_slice(chunk); - written += chunk.len(); - bs.advance(chunk.len()); - } - Ok(written) - } else { - let chunk = bs.chunk(); - buf.extend_from_slice(chunk); - Ok(chunk.len()) - } - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - } - - #[tokio::test] - async fn test_inexact_buf_writer_partial_send() -> Result<()> { - let _ = tracing_subscriber::fmt() - .pretty() - .with_test_writer() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .try_init(); - - let buf = Arc::new(Mutex::new(vec![])); - let mut w = WriteGenerator::new( - Box::new(PartialWriter { buf: buf.clone() }), - Some(10), - false, - ); - - let mut rng = thread_rng(); - let mut expected = vec![]; - - let mut new_content = |size| { - let mut content = vec![0; size]; - rng.fill_bytes(&mut content); - expected.extend_from_slice(&content); - Bytes::from(content) - }; - - // content < chunk size. - let content = new_content(5); - w.write(content.into()).await?; - // Non-contiguous buffer. - let content = Buffer::from(vec![new_content(3), new_content(2)]); - w.write(content).await?; - - w.close().await?; - - let buf = buf.lock().await; - assert_eq!(buf.len(), expected.len()); - assert_eq!( - format!("{:x}", Sha256::digest(&*buf)), - format!("{:x}", Sha256::digest(&expected)) - ); - Ok(()) - } - #[tokio::test] async fn test_fuzz_exact_buf_writer() -> Result<()> { let _ = tracing_subscriber::fmt() @@ -561,8 +483,8 @@ mod tests { expected.extend_from_slice(&content); - let mut bs = Bytes::from(content.clone()); - writer.write(bs.clone().into()).await?; + let bs = Bytes::from(content.clone()); + writer.write(bs.into()).await?; } writer.close().await?; From 75ff58d477750a421639debfdf8dd0cf637c93a3 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 11 Jul 2024 17:15:30 +0800 Subject: [PATCH 3/4] Fix tests Signed-off-by: Xuanwo --- .../types/blocking_write/blocking_writer.rs | 8 +- core/src/types/context/write.rs | 74 ++++++++++++------- core/src/types/write/buffer_sink.rs | 12 +-- core/src/types/write/writer.rs | 8 +- 4 files changed, 68 insertions(+), 34 deletions(-) diff --git a/core/src/types/blocking_write/blocking_writer.rs b/core/src/types/blocking_write/blocking_writer.rs index d97cabb1472a..489cae502a74 100644 --- a/core/src/types/blocking_write/blocking_writer.rs +++ b/core/src/types/blocking_write/blocking_writer.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use bytes::Buf; use std::sync::Arc; use crate::raw::*; @@ -67,7 +68,12 @@ impl BlockingWriter { /// } /// ``` pub fn write(&mut self, bs: impl Into) -> Result<()> { - self.inner.write(bs.into()) + let mut bs = bs.into(); + while !bs.is_empty() { + let n = self.inner.write(bs.clone())?; + bs.advance(n); + } + Ok(()) } /// Close the writer and make sure all data have been committed. diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs index af875c14ed2f..6e2464cfceb7 100644 --- a/core/src/types/context/write.rs +++ b/core/src/types/context/write.rs @@ -133,14 +133,17 @@ impl WriteGenerator { impl WriteGenerator { /// Write the entire buffer into writer. - pub async fn write(&mut self, mut bs: Buffer) -> Result<()> { + pub async fn write(&mut self, mut bs: Buffer) -> Result { let Some(chunk_size) = self.chunk_size else { - return self.w.write_dyn(bs).await; + let size = bs.len(); + self.w.write_dyn(bs).await?; + return Ok(size); }; if self.buffer.len() + bs.len() < chunk_size { + let size = bs.len(); self.buffer.push(bs); - return Ok(()); + return Ok(size); } // Condition: @@ -149,10 +152,11 @@ impl WriteGenerator { // Action: // - write buffer + bs directly. if !self.exact { + let fill_size = bs.len(); self.buffer.push(bs); let buf = self.buffer.take().collect(); self.w.write_dyn(buf).await?; - return Ok(()); + return Ok(fill_size); } // Condition: @@ -173,8 +177,9 @@ impl WriteGenerator { // - write bs to buffer with remaining size. let remaining = chunk_size - self.buffer.len(); bs.truncate(remaining); + let n = bs.len(); self.buffer.push(bs); - Ok(()) + Ok(n) } /// Finish the write process. @@ -184,8 +189,8 @@ impl WriteGenerator { break; } - self.w.write_dyn(self.buffer.clone().collect()).await?; - self.buffer.clear(); + let buf = self.buffer.take().collect(); + self.w.write_dyn(buf).await?; } self.w.close().await @@ -215,14 +220,17 @@ impl WriteGenerator { impl WriteGenerator { /// Write the entire buffer into writer. - pub fn write(&mut self, mut bs: Buffer) -> Result<()> { + pub fn write(&mut self, mut bs: Buffer) -> Result { let Some(chunk_size) = self.chunk_size else { - return self.w.write(bs); + let size = bs.len(); + self.w.write(bs)?; + return Ok(size); }; if self.buffer.len() + bs.len() < chunk_size { + let size = bs.len(); self.buffer.push(bs); - return Ok(()); + return Ok(size); } // Condition: @@ -231,10 +239,11 @@ impl WriteGenerator { // Action: // - write buffer + bs directly. if !self.exact { + let fill_size = bs.len(); self.buffer.push(bs); let buf = self.buffer.take().collect(); self.w.write(buf)?; - return Ok(()); + return Ok(fill_size); } // Condition: @@ -255,8 +264,9 @@ impl WriteGenerator { // - write bs to buffer with remaining size. let remaining = chunk_size - self.buffer.len(); bs.truncate(remaining); + let n = bs.len(); self.buffer.push(bs); - Ok(()) + Ok(n) } /// Finish the write process. @@ -266,8 +276,8 @@ impl WriteGenerator { break; } - self.w.write(self.buffer.clone().collect())?; - self.buffer.clear(); + let buf = self.buffer.take().collect(); + self.w.write(buf)?; } self.w.close() @@ -278,8 +288,8 @@ impl WriteGenerator { mod tests { use super::*; use crate::raw::oio::Write; - use bytes::BufMut; use bytes::Bytes; + use bytes::{Buf, BufMut}; use log::debug; use pretty_assertions::assert_eq; use rand::thread_rng; @@ -326,8 +336,11 @@ mod tests { let buf = Arc::new(Mutex::new(vec![])); let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone() }), Some(10), true); - let bs = Bytes::from(expected.clone()); - w.write(bs.into()).await?; + let mut bs = Bytes::from(expected.clone()); + while !bs.is_empty() { + let n = w.write(bs.clone().into()).await?; + bs.advance(n); + } w.close().await?; @@ -356,7 +369,10 @@ mod tests { rng.fill_bytes(&mut expected); let bs = Bytes::from(expected.clone()); - w.write(bs.into()).await?; + // The MockWriter always returns the first chunk size. + let n = w.write(bs.into()).await?; + assert_eq!(expected.len(), n); + w.close().await?; let buf = buf.lock().await; @@ -391,13 +407,14 @@ mod tests { // content > chunk size. let content = new_content(15); - w.write(content.into()).await?; + assert_eq!(15, w.write(content.into()).await?); // content < chunk size. let content = new_content(5); - w.write(content.into()).await?; + assert_eq!(5, w.write(content.into()).await?); // content > chunk size, but 5 bytes in queue. let content = new_content(15); - w.write(content.clone().into()).await?; + // The MockWriter can send all 15 bytes together, so we can only advance 5 bytes. + assert_eq!(15, w.write(content.clone().into()).await?); w.close().await?; @@ -433,16 +450,16 @@ mod tests { // content > chunk size. let content = new_content(15); - w.write(content.into()).await?; + assert_eq!(15, w.write(content.into()).await?); // content < chunk size. let content = new_content(5); - w.write(content.into()).await?; + assert_eq!(5, w.write(content.into()).await?); // content < chunk size. let content = new_content(3); - w.write(content.into()).await?; + assert_eq!(3, w.write(content.into()).await?); // content > chunk size, but can send all chunks in the queue. let content = new_content(15); - w.write(content.clone().into()).await?; + assert_eq!(15, w.write(content.clone().into()).await?); w.close().await?; @@ -483,8 +500,11 @@ mod tests { expected.extend_from_slice(&content); - let bs = Bytes::from(content.clone()); - writer.write(bs.into()).await?; + let mut bs = Bytes::from(content.clone()); + while !bs.is_empty() { + let n = writer.write(bs.clone().into()).await?; + bs.advance(n); + } } writer.close().await?; diff --git a/core/src/types/write/buffer_sink.rs b/core/src/types/write/buffer_sink.rs index cc42e09090bb..46d911253002 100644 --- a/core/src/types/write/buffer_sink.rs +++ b/core/src/types/write/buffer_sink.rs @@ -20,6 +20,8 @@ use std::task::ready; use std::task::Context; use std::task::Poll; +use bytes::Buf; + use crate::raw::*; use crate::*; @@ -33,7 +35,7 @@ pub struct BufferSink { enum State { Idle(Option>), - Writing(BoxedStaticFuture<(WriteGenerator, Result<()>)>), + Writing(BoxedStaticFuture<(WriteGenerator, Result)>), Closing(BoxedStaticFuture<(WriteGenerator, Result<()>)>), } @@ -90,8 +92,8 @@ impl futures::Sink for BufferSink { let (w, res) = ready!(fut.as_mut().poll(cx)); this.state = State::Idle(Some(w)); match res { - Ok(_) => { - this.buf = Buffer::new(); + Ok(n) => { + this.buf.advance(n); } Err(err) => return Poll::Ready(Err(err)), } @@ -137,8 +139,8 @@ impl futures::Sink for BufferSink { let (w, res) = ready!(fut.as_mut().poll(cx)); this.state = State::Idle(Some(w)); match res { - Ok(_) => { - this.buf = Buffer::new(); + Ok(n) => { + this.buf.advance(n); } Err(err) => return Poll::Ready(Err(err)), } diff --git a/core/src/types/write/writer.rs b/core/src/types/write/writer.rs index 44755ed37cba..eaf8b7251771 100644 --- a/core/src/types/write/writer.rs +++ b/core/src/types/write/writer.rs @@ -136,7 +136,13 @@ impl Writer { /// } /// ``` pub async fn write(&mut self, bs: impl Into) -> Result<()> { - self.inner.write(bs.into()).await + let mut bs = bs.into(); + while !bs.is_empty() { + let n = self.inner.write(bs.clone()).await?; + bs.advance(n); + } + + Ok(()) } /// Write [`bytes::Buf`] into inner writer. From 80eef8b356474ea422f19e491381a70090c626f0 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 11 Jul 2024 18:31:30 +0800 Subject: [PATCH 4/4] Fix write Signed-off-by: Xuanwo --- core/src/types/blocking_write/std_writer.rs | 5 +++-- core/src/types/write/writer.rs | 3 +-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/types/blocking_write/std_writer.rs b/core/src/types/blocking_write/std_writer.rs index fe918b43bf87..91accbde5357 100644 --- a/core/src/types/blocking_write/std_writer.rs +++ b/core/src/types/blocking_write/std_writer.rs @@ -81,9 +81,10 @@ impl Write for StdWriter { } let bs = self.buf.get().expect("frozen buffer must be valid"); - w.write(Buffer::from(bs)) + let n = w + .write(Buffer::from(bs)) .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; - self.buf.clean(); + self.buf.advance(n); } } diff --git a/core/src/types/write/writer.rs b/core/src/types/write/writer.rs index eaf8b7251771..114ec886aa0e 100644 --- a/core/src/types/write/writer.rs +++ b/core/src/types/write/writer.rs @@ -155,8 +155,7 @@ impl Writer { pub async fn write_from(&mut self, bs: impl Buf) -> Result<()> { let mut bs = bs; let bs = Buffer::from(bs.copy_to_bytes(bs.remaining())); - self.inner.write(bs).await?; - Ok(()) + self.write(bs).await } /// Abort the writer and clean up all written data.