diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs new file mode 100644 index 000000000000..88bf67be5653 --- /dev/null +++ b/core/src/layers/blocking.rs @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use bytes; +use bytes::Bytes; +use tokio::runtime::Handle; + +use crate::raw::oio::ReadExt; +use crate::raw::*; +use crate::*; + +/// Add blocking API support for every operations. +/// +/// # Blocking API +/// +/// - This layer is auto-added to the operator if it's accessor doesn't support blocking APIs. +/// +/// Tracking issue: #2678 +#[derive(Debug, Clone)] +pub struct BlockingLayer { + handle: Handle, +} + +impl BlockingLayer { + pub fn create() -> Result { + Ok(Self { + handle: Handle::try_current()?, + }) + } +} + +impl Layer for BlockingLayer { + type LayeredAccessor = BlockingAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccessor { + BlockingAccessor { + inner: inner, + handle: self.handle.clone(), + } + } +} + +#[derive(Clone, Debug)] +pub struct BlockingAccessor { + inner: A, + + handle: Handle, +} + +#[async_trait] +impl LayeredAccessor for BlockingAccessor { + type Inner = A; + type Reader = A::Reader; + type BlockingReader = BlockingWrapper; + type Writer = A::Writer; + type BlockingWriter = BlockingWrapper; + type Appender = A::Appender; + type Pager = A::Pager; + type BlockingPager = BlockingWrapper; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + fn metadata(&self) -> AccessorInfo { + let mut info = self.inner.info(); + let cap = info.capability_mut(); + cap.blocking = true; + info + } + + async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { + self.inner.create_dir(path, args).await + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + self.inner.read(path, args).await + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + self.inner.write(path, args).await + } + + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { + self.inner.append(path, args).await + } + + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { + self.inner.copy(from, to, args).await + } + + async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result { + self.inner.rename(from, to, args).await + } + + async fn stat(&self, path: &str, args: OpStat) -> Result { + self.inner.stat(path, args).await + } + + async fn delete(&self, path: &str, args: OpDelete) -> Result { + self.inner.delete(path, args).await + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + self.inner.list(path, args).await + } + + async fn presign(&self, path: &str, args: OpPresign) -> Result { + self.inner.presign(path, args).await + } + + async fn batch(&self, args: OpBatch) -> Result { + self.inner.batch(args).await + } + + fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { + self.handle.block_on(self.inner.create_dir(path, args)) + } + + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + self.handle.block_on(async { + let (rp, reader) = self.inner.read(path, args).await?; + let blocking_reader = Self::BlockingReader::new(self.handle.clone(), reader); + + Ok((rp, blocking_reader)) + }) + } + + fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + self.handle.block_on(async { + let (rp, writer) = self.inner.write(path, args).await?; + let blocking_writer = Self::BlockingWriter::new(self.handle.clone(), writer); + Ok((rp, blocking_writer)) + }) + } + + fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result { + self.handle.block_on(self.inner.copy(from, to, args)) + } + + fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result { + self.handle.block_on(self.inner.rename(from, to, args)) + } + + fn blocking_stat(&self, path: &str, args: OpStat) -> Result { + self.handle.block_on(self.inner.stat(path, args)) + } + + fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { + self.handle.block_on(self.inner.delete(path, args)) + } + + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { + self.handle.block_on(async { + let (rp, pager) = self.inner.list(path, args).await?; + let blocking_pager = Self::BlockingPager::new(self.handle.clone(), pager); + Ok((rp, blocking_pager)) + }) + } +} + +pub struct BlockingWrapper { + handle: Handle, + inner: I, +} + +impl BlockingWrapper { + fn new(handle: Handle, inner: I) -> Self { + Self { handle, inner } + } +} + +impl oio::BlockingRead for BlockingWrapper { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.handle + .block_on(oio::ReadExt::read(&mut self.inner, buf)) + } + + fn seek(&mut self, pos: std::io::SeekFrom) -> Result { + self.handle.block_on(self.inner.seek(pos)) + } + + fn next(&mut self) -> Option> { + self.handle.block_on(self.inner.next()) + } +} + +impl oio::BlockingWrite for BlockingWrapper { + fn write(&mut self, bs: Bytes) -> Result<()> { + self.handle.block_on(oio::Write::write(&mut self.inner, bs)) + } + + fn close(&mut self) -> Result<()> { + self.handle.block_on(oio::Write::close(&mut self.inner)) + } +} + +impl oio::BlockingPage for BlockingWrapper { + fn next(&mut self) -> Result>> { + self.handle.block_on(self.inner.next()) + } +} diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index da6adf369194..2c738fcdd7a6 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -29,6 +29,9 @@ pub use logging::LoggingLayer; mod timeout; pub use timeout::TimeoutLayer; +mod blocking; +pub use blocking::BlockingLayer; + #[cfg(feature = "layers-chaos")] mod chaos; #[cfg(feature = "layers-chaos")] diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 6b8414b7e822..e2139135125e 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -370,7 +370,6 @@ impl OperatorBuilder { /// Finish the building to construct an Operator. pub fn finish(self) -> Operator { let ob = self.layer(TypeEraseLayer); - Operator::from_inner(Arc::new(ob.accessor) as FusedAccessor) } } diff --git a/core/tests/behavior/blocking_copy.rs b/core/tests/behavior/blocking_copy.rs index 43d0faec5cec..a5d99670571a 100644 --- a/core/tests/behavior/blocking_copy.rs +++ b/core/tests/behavior/blocking_copy.rs @@ -22,10 +22,17 @@ use crate::*; pub fn behavior_blocking_copy_tests(op: &Operator) -> Vec { let cap = op.info().capability(); - if !(cap.read && cap.write && cap.copy && cap.blocking) { + if !(cap.read && cap.write && cap.copy) { return vec![]; } + let mut op = op.clone(); + if !cap.blocking { + op = op.layer(layers::BlockingLayer::with_handle(RUNTIME.handle().clone())); + } + + let op = &op; + blocking_trials!( op, test_blocking_copy_file, diff --git a/core/tests/behavior/blocking_list.rs b/core/tests/behavior/blocking_list.rs index 745396848035..08e20e0c378c 100644 --- a/core/tests/behavior/blocking_list.rs +++ b/core/tests/behavior/blocking_list.rs @@ -26,10 +26,17 @@ use crate::*; pub fn behavior_blocking_list_tests(op: &Operator) -> Vec { let cap = op.info().capability(); - if !(cap.read && cap.write && cap.copy && cap.blocking && cap.list) { + if !(cap.read && cap.write && cap.copy && cap.list) { return vec![]; } + let mut op = op.clone(); + if !cap.blocking { + op = op.layer(layers::BlockingLayer::with_handle(RUNTIME.handle().clone())); + } + + let op = &op; + blocking_trials!( op, test_blocking_list_dir, diff --git a/core/tests/behavior/blocking_read_only.rs b/core/tests/behavior/blocking_read_only.rs index 847fa63e4d5b..040019f66527 100644 --- a/core/tests/behavior/blocking_read_only.rs +++ b/core/tests/behavior/blocking_read_only.rs @@ -24,10 +24,16 @@ use crate::*; pub fn behavior_blocking_read_only_tests(op: &Operator) -> Vec { let cap = op.info().capability(); - if !(cap.read && !cap.write && cap.blocking) { + if !(cap.read && !cap.write) { return vec![]; } + let mut op = op.clone(); + if !cap.blocking { + op = op.layer(layers::BlockingLayer::with_handle(RUNTIME.handle().clone())); + } + + let op = &op; blocking_trials!( op, test_blocking_read_only_stat_file_and_dir, diff --git a/core/tests/behavior/blocking_rename.rs b/core/tests/behavior/blocking_rename.rs index 4506c2c5c0ed..746dc3031159 100644 --- a/core/tests/behavior/blocking_rename.rs +++ b/core/tests/behavior/blocking_rename.rs @@ -22,10 +22,17 @@ use crate::*; pub fn behavior_blocking_rename_tests(op: &Operator) -> Vec { let cap = op.info().capability(); - if !(cap.read && cap.write && cap.copy && cap.blocking && cap.rename) { + if !(cap.read && cap.write && cap.copy && cap.rename) { return vec![]; } + let mut op = op.clone(); + if !cap.blocking { + op = op.layer(layers::BlockingLayer::with_handle(RUNTIME.handle().clone())); + } + + let op = &op; + blocking_trials!( op, test_blocking_rename_file, diff --git a/core/tests/behavior/blocking_write.rs b/core/tests/behavior/blocking_write.rs index 3775509c25e8..985d4eb269a9 100644 --- a/core/tests/behavior/blocking_write.rs +++ b/core/tests/behavior/blocking_write.rs @@ -28,10 +28,17 @@ use crate::*; pub fn behavior_blocking_write_tests(op: &Operator) -> Vec { let cap = op.info().capability(); - if !(cap.read && cap.write && cap.blocking) { + if !(cap.read && cap.write) { return vec![]; } + let mut op = op.clone(); + if !cap.blocking { + op = op.layer(layers::BlockingLayer::with_handle(RUNTIME.handle().clone())); + } + + let op = &op; + blocking_trials!( op, test_blocking_create_dir, diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs index ca8b2e61f7cf..c4df30722a11 100644 --- a/core/tests/behavior/main.rs +++ b/core/tests/behavior/main.rs @@ -78,6 +78,7 @@ fn behavior_test() -> Vec { trials.extend(behavior_blocking_read_only_tests(&operator)); trials.extend(behavior_blocking_rename_tests(&operator)); trials.extend(behavior_blocking_write_tests(&operator)); + // Async tests trials.extend(behavior_append_tests(&operator)); trials.extend(behavior_copy_tests(&operator));