diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 7dd2d189ca62..701191490aa3 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -25,13 +25,11 @@ use crate::raw::oio::ReadExt; use crate::raw::*; use crate::*; -/// Add blocking API support for every operations. +/// Add blocking API support for non-blocking services. /// -/// # Blocking API +/// # Notes /// -/// - This layer is auto-added to the operator if it's accessor doesn't support blocking APIs. -/// -/// Tracking issue: #2678 +/// Please only enable this layer when the underlying service does not support blocking. #[derive(Debug, Clone)] pub struct BlockingLayer { handle: Handle, diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 7b2549dc006a..e99737dd0a84 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -998,7 +998,7 @@ impl oio::Write for RetryWrapper { &err, dur, &[ - ("operation", WriteOperation::Abort.into_static()), + ("operation", WriteOperation::Close.into_static()), ("path", &self.path), ], ); diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 6edeb96a88fe..4cf597e599a4 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -195,6 +195,7 @@ where let (w, part) = ready!(fut.as_mut().poll(cx)); self.state = State::Idle(Some(w)); self.parts.push(part?); + // Replace the cache when last write succeeded let size = bs.remaining(); let cb = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size)); @@ -264,8 +265,11 @@ where State::Close(fut) => { let (w, res) = futures::ready!(fut.as_mut().poll(cx)); self.state = State::Idle(Some(w)); + // We should check res first before clean up cache. + res?; + self.cache = None; - return Poll::Ready(res); + return Poll::Ready(Ok(())); } State::Init(_) => unreachable!( "MultipartUploadWriter must not go into State::Init during poll_close" diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index 9ae017a5e718..6dd9c23b2050 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -274,8 +274,11 @@ impl oio::Write for RangeWriter { State::Abort(fut) => { let (w, res) = ready!(fut.poll_unpin(cx)); self.state = State::Idle(Some(w)); + // We should check res first before clean up cache. + res?; + self.buffer = None; - return Poll::Ready(res); + return Poll::Ready(Ok(())); } } } diff --git a/core/tests/behavior/utils.rs b/core/tests/behavior/utils.rs index 3f197869e3a8..b5bd0970d871 100644 --- a/core/tests/behavior/utils.rs +++ b/core/tests/behavior/utils.rs @@ -82,14 +82,17 @@ pub fn init_service() -> Option { op.layer(ChaosLayer::new(0.1)) }; - let _guard = RUNTIME.enter(); - let op = op - .layer(BlockingLayer::create().expect("blocking layer must be created")) + let mut op = op .layer(LoggingLayer::default().with_backtrace_output(true)) .layer(TimeoutLayer::new()) .layer(RetryLayer::new()) .finish(); + if !op.info().full_capability().blocking { + let _guard = RUNTIME.enter(); + op = op.layer(BlockingLayer::create().expect("blocking layer must be created")) + } + Some(op) }