Skip to content

Commit

Permalink
fix: Don't apply blocking layer when service support blocking (#3050)
Browse files Browse the repository at this point in the history
* fix: Don't apply blocking layer when service support blocking

Signed-off-by: Xuanwo <[email protected]>

* Update core/src/layers/blocking.rs

Co-authored-by: Suyan <[email protected]>

* Fix build

Signed-off-by: Xuanwo <[email protected]>

* Fix typo

Signed-off-by: Xuanwo <[email protected]>

* Fix multipart upload

Signed-off-by: Xuanwo <[email protected]>

* Fix clippy

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
Co-authored-by: Suyan <[email protected]>
  • Loading branch information
Xuanwo and suyanhanx authored Sep 13, 2023
1 parent e895dce commit 56d7119
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 11 deletions.
8 changes: 3 additions & 5 deletions core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ use crate::raw::oio::ReadExt;
use crate::raw::*;
use crate::*;

/// Add blocking API support for every operations.
/// Add blocking API support for non-blocking services.
///
/// # Blocking API
/// # Notes
///
/// - This layer is auto-added to the operator if it's accessor doesn't support blocking APIs.
///
/// Tracking issue: #2678
/// Please only enable this layer when the underlying service does not support blocking.
#[derive(Debug, Clone)]
pub struct BlockingLayer {
handle: Handle,
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
&err,
dur,
&[
("operation", WriteOperation::Abort.into_static()),
("operation", WriteOperation::Close.into_static()),
("path", &self.path),
],
);
Expand Down
6 changes: 5 additions & 1 deletion core/src/raw/oio/write/multipart_upload_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ where
let (w, part) = ready!(fut.as_mut().poll(cx));
self.state = State::Idle(Some(w));
self.parts.push(part?);

// Replace the cache when last write succeeded
let size = bs.remaining();
let cb = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
Expand Down Expand Up @@ -264,8 +265,11 @@ where
State::Close(fut) => {
let (w, res) = futures::ready!(fut.as_mut().poll(cx));
self.state = State::Idle(Some(w));
// We should check res first before clean up cache.
res?;

self.cache = None;
return Poll::Ready(res);
return Poll::Ready(Ok(()));
}
State::Init(_) => unreachable!(
"MultipartUploadWriter must not go into State::Init during poll_close"
Expand Down
5 changes: 4 additions & 1 deletion core/src/raw/oio/write/range_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,11 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
State::Abort(fut) => {
let (w, res) = ready!(fut.poll_unpin(cx));
self.state = State::Idle(Some(w));
// We should check res first before clean up cache.
res?;

self.buffer = None;
return Poll::Ready(res);
return Poll::Ready(Ok(()));
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions core/tests/behavior/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,17 @@ pub fn init_service<B: Builder>() -> Option<Operator> {
op.layer(ChaosLayer::new(0.1))
};

let _guard = RUNTIME.enter();
let op = op
.layer(BlockingLayer::create().expect("blocking layer must be created"))
let mut op = op
.layer(LoggingLayer::default().with_backtrace_output(true))
.layer(TimeoutLayer::new())
.layer(RetryLayer::new())
.finish();

if !op.info().full_capability().blocking {
let _guard = RUNTIME.enter();
op = op.layer(BlockingLayer::create().expect("blocking layer must be created"))
}

Some(op)
}

Expand Down

0 comments on commit 56d7119

Please sign in to comment.