Skip to content

Commit

Permalink
refactor: Extract scan as a new API and remove ListStyle (#1324)
Browse files Browse the repository at this point in the history
* Add new API

Signed-off-by: Xuanwo <[email protected]>

* Save work

Signed-off-by: Xuanwo <[email protected]>

* Refactor

Signed-off-by: Xuanwo <[email protected]>

* Make all tests happy

Signed-off-by: Xuanwo <[email protected]>

* Format code

Signed-off-by: Xuanwo <[email protected]>

* fix docs

Signed-off-by: Xuanwo <[email protected]>

* FIx test

Signed-off-by: Xuanwo <[email protected]>

* Make MSRV happy

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Feb 11, 2023
1 parent 92a2700 commit ddb3a4f
Show file tree
Hide file tree
Showing 33 changed files with 660 additions and 349 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ reqwest = { version = "0.11.13", features = [
rocksdb = { version = "0.19", default-features = false, optional = true }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sled = {version = "0.34.7", optional = true }
sled = { version = "0.34.7", optional = true }
suppaftp = { version = "4.5", default-features = false, features = [
"async-secure",
"async-rustls",
Expand Down
8 changes: 8 additions & 0 deletions src/layers/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,17 @@ impl<A: Accessor> LayeredAccessor for ChaosAccessor<A> {
self.inner.list(path, args).await
}

async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> {
self.inner.scan(path, args).await
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
self.inner.blocking_list(path, args)
}

fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> {
self.inner.blocking_scan(path, args)
}
}

/// ChaosReader will inject error into read operations.
Expand Down
154 changes: 87 additions & 67 deletions src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,93 +168,105 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
}
}

async fn complete_pager(
async fn complete_list(
&self,
path: &str,
args: OpList,
) -> Result<(RpList, CompletePager<A, A::Pager>)> {
if !self.meta.capabilities().contains(AccessorCapability::List) {
return Err(
let (can_list, can_scan) = (
self.meta.capabilities().contains(AccessorCapability::List),
self.meta.capabilities().contains(AccessorCapability::Scan),
);

if can_list {
let (rp, p) = self.inner.list(path, args).await?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else if can_scan {
let (_, p) = self.inner.scan(path, OpScan::new()).await?;
let p = to_hierarchy_pager(p, path);
Ok((RpList::default(), CompletePager::NeedHierarchy(p)))
} else {
Err(
Error::new(ErrorKind::Unsupported, "operation is not supported")
.with_context("service", self.meta.scheme())
.with_operation("list"),
);
)
}
}

let (can_flat, can_hierarchy) = (
self.meta.hints().contains(AccessorHint::ListFlat),
self.meta.hints().contains(AccessorHint::ListHierarchy),
fn complete_blocking_list(
&self,
path: &str,
args: OpList,
) -> Result<(RpList, CompletePager<A, A::BlockingPager>)> {
let (can_list, can_scan) = (
self.meta.capabilities().contains(AccessorCapability::List),
self.meta.capabilities().contains(AccessorCapability::Scan),
);

match args.style() {
ListStyle::Flat => {
if can_flat {
let (rp, p) = self.inner.list(path, args).await?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else if can_hierarchy {
// TODO: Make this size configure.
let p = to_flat_pager(self.inner.clone(), path, 256);
Ok((RpList::default(), CompletePager::NeedFlat(p)))
} else {
unreachable!("service that support list can't neither can flat nor can hierarchy, must be a bug")
}
}
ListStyle::Hierarchy => {
if can_hierarchy {
let (rp, p) = self.inner.list(path, args).await?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else if can_flat {
let (rp, p) = self.inner.list(path, OpList::new(ListStyle::Flat)).await?;
let p = to_hierarchy_pager(p, path);
Ok((rp, CompletePager::NeedHierarchy(p)))
} else {
unreachable!("service that support list can't neither can flat nor can hierarchy, must be a bug")
}
}
if can_list {
let (rp, p) = self.inner.blocking_list(path, args)?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else if can_scan {
let (_, p) = self.inner.blocking_scan(path, OpScan::new())?;
let p = to_hierarchy_pager(p, path);
Ok((RpList::default(), CompletePager::NeedHierarchy(p)))
} else {
Err(
Error::new(ErrorKind::Unsupported, "operation is not supported")
.with_context("service", self.meta.scheme())
.with_operation("list"),
)
}
}

fn complete_blocking_pager(
async fn complete_scan(
&self,
path: &str,
args: OpList,
) -> Result<(RpList, CompletePager<A, A::BlockingPager>)> {
if !self.meta.capabilities().contains(AccessorCapability::List) {
return Err(
args: OpScan,
) -> Result<(RpScan, CompletePager<A, A::Pager>)> {
let (can_list, can_scan) = (
self.meta.capabilities().contains(AccessorCapability::List),
self.meta.capabilities().contains(AccessorCapability::Scan),
);

if can_scan {
let (rp, p) = self.inner.scan(path, args).await?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else if can_list {
let p = to_flat_pager(self.inner.clone(), path, args.limit().unwrap_or(1000));
Ok((RpScan::default(), CompletePager::NeedFlat(p)))
} else {
Err(
Error::new(ErrorKind::Unsupported, "operation is not supported")
.with_context("service", self.meta.scheme())
.with_operation("blocking_list"),
);
.with_operation("scan"),
)
}
}

let (can_flat, can_hierarchy) = (
self.meta.hints().contains(AccessorHint::ListFlat),
self.meta.hints().contains(AccessorHint::ListHierarchy),
fn complete_blocking_scan(
&self,
path: &str,
args: OpScan,
) -> Result<(RpScan, CompletePager<A, A::BlockingPager>)> {
let (can_list, can_scan) = (
self.meta.capabilities().contains(AccessorCapability::List),
self.meta.capabilities().contains(AccessorCapability::Scan),
);

match args.style() {
ListStyle::Flat => {
if can_flat {
let (rp, p) = self.inner.blocking_list(path, args)?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else {
// TODO: Make this size configure.
let p = to_flat_pager(self.inner.clone(), path, 256);
Ok((RpList::default(), CompletePager::NeedFlat(p)))
}
}
ListStyle::Hierarchy => {
if can_hierarchy {
let (rp, p) = self.inner.blocking_list(path, args)?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else {
let (rp, p) = self
.inner
.blocking_list(path, OpList::new(ListStyle::Flat))?;
let p = to_hierarchy_pager(p, path);
Ok((rp, CompletePager::NeedHierarchy(p)))
}
}
if can_scan {
let (rp, p) = self.inner.blocking_scan(path, args)?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else if can_list {
let p = to_flat_pager(self.inner.clone(), path, args.limit().unwrap_or(1000));
Ok((RpScan::default(), CompletePager::NeedFlat(p)))
} else {
Err(
Error::new(ErrorKind::Unsupported, "operation is not supported")
.with_context("service", self.meta.scheme())
.with_operation("scan"),
)
}
}
}
Expand All @@ -280,11 +292,19 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
self.complete_pager(path, args).await
self.complete_list(path, args).await
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
self.complete_blocking_pager(path, args)
self.complete_blocking_list(path, args)
}

async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> {
self.complete_scan(path, args).await
}

fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> {
self.complete_blocking_scan(path, args)
}
}

Expand Down
26 changes: 26 additions & 0 deletions src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,20 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
.map(|(rp, s)| (rp, ConcurrentLimitWrapper::new(s, permit)))
}

async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> {
let permit = self
.semaphore
.clone()
.acquire_owned()
.await
.expect("semaphore must be valid");

self.inner
.scan(path, args)
.await
.map(|(rp, s)| (rp, ConcurrentLimitWrapper::new(s, permit)))
}

async fn create_multipart(
&self,
path: &str,
Expand Down Expand Up @@ -272,6 +286,18 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
.blocking_list(path, args)
.map(|(rp, it)| (rp, ConcurrentLimitWrapper::new(it, permit)))
}

fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> {
let permit = self
.semaphore
.clone()
.try_acquire_owned()
.expect("semaphore must be valid");

self.inner
.blocking_scan(path, args)
.map(|(rp, it)| (rp, ConcurrentLimitWrapper::new(it, permit)))
}
}

pub struct ConcurrentLimitWrapper<R> {
Expand Down
41 changes: 41 additions & 0 deletions src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,27 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
.await
}

async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> {
self.inner
.scan(path, args)
.map_ok(|(rp, os)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
inner: os,
},
)
})
.map_err(|err| {
err.with_operation(Operation::Scan.into_static())
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
.await
}

fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
self.inner.presign(path, args).map_err(|err| {
err.with_operation(Operation::Presign.into_static())
Expand Down Expand Up @@ -274,6 +295,26 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
.with_context("path", path)
})
}

fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> {
self.inner
.blocking_scan(path, args)
.map(|(rp, os)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
inner: os,
},
)
})
.map_err(|err| {
err.with_operation(Operation::BlockingScan.into_static())
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
}
}

pub struct ErrorContextWrapper<T> {
Expand Down
50 changes: 36 additions & 14 deletions src/layers/immutable_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,10 @@ impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> {
/// Add list capabilities for underlying storage services.
fn metadata(&self) -> AccessorMetadata {
let mut meta = self.inner.metadata();
meta.set_capabilities(meta.capabilities() | AccessorCapability::List);
meta.set_hints(meta.hints() | AccessorHint::ListFlat | AccessorHint::ListHierarchy);
meta.set_capabilities(
meta.capabilities() | AccessorCapability::List | AccessorCapability::Scan,
);
meta.set_hints(meta.hints());

meta
}
Expand All @@ -155,36 +157,56 @@ impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> {
self.inner.read(path, args).await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Pager)> {
let mut path = path;
if path == "/" {
path = ""
}

let children = match args.style() {
ListStyle::Flat => self.children_flat(path),
ListStyle::Hierarchy => self.children_hierarchy(path),
};
Ok((
RpList::default(),
ImmutableDir::new(self.children_hierarchy(path)),
))
}

async fn scan(&self, path: &str, _: OpScan) -> Result<(RpScan, Self::Pager)> {
let mut path = path;
if path == "/" {
path = ""
}

Ok((RpList::default(), ImmutableDir::new(children)))
Ok((
RpScan::default(),
ImmutableDir::new(self.children_flat(path)),
))
}

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
self.inner.blocking_read(path, args)
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingPager)> {
let mut path = path;
if path == "/" {
path = ""
}

let children = match args.style() {
ListStyle::Flat => self.children_flat(path),
ListStyle::Hierarchy => self.children_hierarchy(path),
};
Ok((
RpList::default(),
ImmutableDir::new(self.children_hierarchy(path)),
))
}

fn blocking_scan(&self, path: &str, _: OpScan) -> Result<(RpScan, Self::BlockingPager)> {
let mut path = path;
if path == "/" {
path = ""
}

Ok((RpList::default(), ImmutableDir::new(children)))
Ok((
RpScan::default(),
ImmutableDir::new(self.children_flat(path)),
))
}
}

Expand Down
Loading

2 comments on commit ddb3a4f

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for opendal ready!

✅ Preview
https://opendal-auoa5irec-databend.vercel.app

Built with commit ddb3a4f.
This pull request is being automatically deployed with vercel-action

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Rust Benchmark'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: ddb3a4f Previous: 92a2700 Ratio
service_memory_read_full/16.0 MiB 4210583 ns/iter (± 41510) 2102570 ns/iter (± 36612) 2.00

This comment was automatically generated by workflow using github-action-benchmark.

CC: @Xuanwo

Please sign in to comment.