diff --git a/Cargo.toml b/Cargo.toml index 88697fe6076a..ba1aef13c567 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,7 +122,7 @@ reqwest = { version = "0.11.13", features = [ rocksdb = { version = "0.19", default-features = false, optional = true } serde = { version = "1", features = ["derive"] } serde_json = "1" -sled = {version = "0.34.7", optional = true } +sled = { version = "0.34.7", optional = true } suppaftp = { version = "4.5", default-features = false, features = [ "async-secure", "async-rustls", diff --git a/src/layers/chaos.rs b/src/layers/chaos.rs index 7206e4384c92..c2303d976f46 100644 --- a/src/layers/chaos.rs +++ b/src/layers/chaos.rs @@ -126,9 +126,17 @@ impl LayeredAccessor for ChaosAccessor { self.inner.list(path, args).await } + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + self.inner.scan(path, args).await + } + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { self.inner.blocking_list(path, args) } + + fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + self.inner.blocking_scan(path, args) + } } /// ChaosReader will inject error into read operations. diff --git a/src/layers/complete.rs b/src/layers/complete.rs index 7194ea1fe27e..16cdfe641041 100644 --- a/src/layers/complete.rs +++ b/src/layers/complete.rs @@ -168,93 +168,105 @@ impl CompleteReaderAccessor { } } - async fn complete_pager( + async fn complete_list( &self, path: &str, args: OpList, ) -> Result<(RpList, CompletePager)> { - if !self.meta.capabilities().contains(AccessorCapability::List) { - return Err( + let (can_list, can_scan) = ( + self.meta.capabilities().contains(AccessorCapability::List), + self.meta.capabilities().contains(AccessorCapability::Scan), + ); + + if can_list { + let (rp, p) = self.inner.list(path, args).await?; + Ok((rp, CompletePager::AlreadyComplete(p))) + } else if can_scan { + let (_, p) = self.inner.scan(path, OpScan::new()).await?; + let p = to_hierarchy_pager(p, path); + Ok((RpList::default(), CompletePager::NeedHierarchy(p))) + } else { + Err( Error::new(ErrorKind::Unsupported, "operation is not supported") .with_context("service", self.meta.scheme()) .with_operation("list"), - ); + ) } + } - let (can_flat, can_hierarchy) = ( - self.meta.hints().contains(AccessorHint::ListFlat), - self.meta.hints().contains(AccessorHint::ListHierarchy), + fn complete_blocking_list( + &self, + path: &str, + args: OpList, + ) -> Result<(RpList, CompletePager)> { + let (can_list, can_scan) = ( + self.meta.capabilities().contains(AccessorCapability::List), + self.meta.capabilities().contains(AccessorCapability::Scan), ); - match args.style() { - ListStyle::Flat => { - if can_flat { - let (rp, p) = self.inner.list(path, args).await?; - Ok((rp, CompletePager::AlreadyComplete(p))) - } else if can_hierarchy { - // TODO: Make this size configure. - let p = to_flat_pager(self.inner.clone(), path, 256); - Ok((RpList::default(), CompletePager::NeedFlat(p))) - } else { - unreachable!("service that support list can't neither can flat nor can hierarchy, must be a bug") - } - } - ListStyle::Hierarchy => { - if can_hierarchy { - let (rp, p) = self.inner.list(path, args).await?; - Ok((rp, CompletePager::AlreadyComplete(p))) - } else if can_flat { - let (rp, p) = self.inner.list(path, OpList::new(ListStyle::Flat)).await?; - let p = to_hierarchy_pager(p, path); - Ok((rp, CompletePager::NeedHierarchy(p))) - } else { - unreachable!("service that support list can't neither can flat nor can hierarchy, must be a bug") - } - } + if can_list { + let (rp, p) = self.inner.blocking_list(path, args)?; + Ok((rp, CompletePager::AlreadyComplete(p))) + } else if can_scan { + let (_, p) = self.inner.blocking_scan(path, OpScan::new())?; + let p = to_hierarchy_pager(p, path); + Ok((RpList::default(), CompletePager::NeedHierarchy(p))) + } else { + Err( + Error::new(ErrorKind::Unsupported, "operation is not supported") + .with_context("service", self.meta.scheme()) + .with_operation("list"), + ) } } - fn complete_blocking_pager( + async fn complete_scan( &self, path: &str, - args: OpList, - ) -> Result<(RpList, CompletePager)> { - if !self.meta.capabilities().contains(AccessorCapability::List) { - return Err( + args: OpScan, + ) -> Result<(RpScan, CompletePager)> { + let (can_list, can_scan) = ( + self.meta.capabilities().contains(AccessorCapability::List), + self.meta.capabilities().contains(AccessorCapability::Scan), + ); + + if can_scan { + let (rp, p) = self.inner.scan(path, args).await?; + Ok((rp, CompletePager::AlreadyComplete(p))) + } else if can_list { + let p = to_flat_pager(self.inner.clone(), path, args.limit().unwrap_or(1000)); + Ok((RpScan::default(), CompletePager::NeedFlat(p))) + } else { + Err( Error::new(ErrorKind::Unsupported, "operation is not supported") .with_context("service", self.meta.scheme()) - .with_operation("blocking_list"), - ); + .with_operation("scan"), + ) } + } - let (can_flat, can_hierarchy) = ( - self.meta.hints().contains(AccessorHint::ListFlat), - self.meta.hints().contains(AccessorHint::ListHierarchy), + fn complete_blocking_scan( + &self, + path: &str, + args: OpScan, + ) -> Result<(RpScan, CompletePager)> { + let (can_list, can_scan) = ( + self.meta.capabilities().contains(AccessorCapability::List), + self.meta.capabilities().contains(AccessorCapability::Scan), ); - match args.style() { - ListStyle::Flat => { - if can_flat { - let (rp, p) = self.inner.blocking_list(path, args)?; - Ok((rp, CompletePager::AlreadyComplete(p))) - } else { - // TODO: Make this size configure. - let p = to_flat_pager(self.inner.clone(), path, 256); - Ok((RpList::default(), CompletePager::NeedFlat(p))) - } - } - ListStyle::Hierarchy => { - if can_hierarchy { - let (rp, p) = self.inner.blocking_list(path, args)?; - Ok((rp, CompletePager::AlreadyComplete(p))) - } else { - let (rp, p) = self - .inner - .blocking_list(path, OpList::new(ListStyle::Flat))?; - let p = to_hierarchy_pager(p, path); - Ok((rp, CompletePager::NeedHierarchy(p))) - } - } + if can_scan { + let (rp, p) = self.inner.blocking_scan(path, args)?; + Ok((rp, CompletePager::AlreadyComplete(p))) + } else if can_list { + let p = to_flat_pager(self.inner.clone(), path, args.limit().unwrap_or(1000)); + Ok((RpScan::default(), CompletePager::NeedFlat(p))) + } else { + Err( + Error::new(ErrorKind::Unsupported, "operation is not supported") + .with_context("service", self.meta.scheme()) + .with_operation("scan"), + ) } } } @@ -280,11 +292,19 @@ impl LayeredAccessor for CompleteReaderAccessor { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - self.complete_pager(path, args).await + self.complete_list(path, args).await } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { - self.complete_blocking_pager(path, args) + self.complete_blocking_list(path, args) + } + + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + self.complete_scan(path, args).await + } + + fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + self.complete_blocking_scan(path, args) } } diff --git a/src/layers/concurrent_limit.rs b/src/layers/concurrent_limit.rs index f7d81df66708..c12d5ad8a11a 100644 --- a/src/layers/concurrent_limit.rs +++ b/src/layers/concurrent_limit.rs @@ -151,6 +151,20 @@ impl LayeredAccessor for ConcurrentLimitAccessor { .map(|(rp, s)| (rp, ConcurrentLimitWrapper::new(s, permit))) } + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + let permit = self + .semaphore + .clone() + .acquire_owned() + .await + .expect("semaphore must be valid"); + + self.inner + .scan(path, args) + .await + .map(|(rp, s)| (rp, ConcurrentLimitWrapper::new(s, permit))) + } + async fn create_multipart( &self, path: &str, @@ -272,6 +286,18 @@ impl LayeredAccessor for ConcurrentLimitAccessor { .blocking_list(path, args) .map(|(rp, it)| (rp, ConcurrentLimitWrapper::new(it, permit))) } + + fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + let permit = self + .semaphore + .clone() + .try_acquire_owned() + .expect("semaphore must be valid"); + + self.inner + .blocking_scan(path, args) + .map(|(rp, it)| (rp, ConcurrentLimitWrapper::new(it, permit))) + } } pub struct ConcurrentLimitWrapper { diff --git a/src/layers/error_context.rs b/src/layers/error_context.rs index 8aaf60d508da..e317ba9d9fec 100644 --- a/src/layers/error_context.rs +++ b/src/layers/error_context.rs @@ -141,6 +141,27 @@ impl LayeredAccessor for ErrorContextAccessor { .await } + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + self.inner + .scan(path, args) + .map_ok(|(rp, os)| { + ( + rp, + ErrorContextWrapper { + scheme: self.meta.scheme(), + path: path.to_string(), + inner: os, + }, + ) + }) + .map_err(|err| { + err.with_operation(Operation::Scan.into_static()) + .with_context("service", self.meta.scheme()) + .with_context("path", path) + }) + .await + } + fn presign(&self, path: &str, args: OpPresign) -> Result { self.inner.presign(path, args).map_err(|err| { err.with_operation(Operation::Presign.into_static()) @@ -274,6 +295,26 @@ impl LayeredAccessor for ErrorContextAccessor { .with_context("path", path) }) } + + fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + self.inner + .blocking_scan(path, args) + .map(|(rp, os)| { + ( + rp, + ErrorContextWrapper { + scheme: self.meta.scheme(), + path: path.to_string(), + inner: os, + }, + ) + }) + .map_err(|err| { + err.with_operation(Operation::BlockingScan.into_static()) + .with_context("service", self.meta.scheme()) + .with_context("path", path) + }) + } } pub struct ErrorContextWrapper { diff --git a/src/layers/immutable_index.rs b/src/layers/immutable_index.rs index 2010529347d7..9818c19627a3 100644 --- a/src/layers/immutable_index.rs +++ b/src/layers/immutable_index.rs @@ -145,8 +145,10 @@ impl LayeredAccessor for ImmutableIndexAccessor { /// Add list capabilities for underlying storage services. fn metadata(&self) -> AccessorMetadata { let mut meta = self.inner.metadata(); - meta.set_capabilities(meta.capabilities() | AccessorCapability::List); - meta.set_hints(meta.hints() | AccessorHint::ListFlat | AccessorHint::ListHierarchy); + meta.set_capabilities( + meta.capabilities() | AccessorCapability::List | AccessorCapability::Scan, + ); + meta.set_hints(meta.hints()); meta } @@ -155,36 +157,56 @@ impl LayeredAccessor for ImmutableIndexAccessor { self.inner.read(path, args).await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Pager)> { let mut path = path; if path == "/" { path = "" } - let children = match args.style() { - ListStyle::Flat => self.children_flat(path), - ListStyle::Hierarchy => self.children_hierarchy(path), - }; + Ok(( + RpList::default(), + ImmutableDir::new(self.children_hierarchy(path)), + )) + } + + async fn scan(&self, path: &str, _: OpScan) -> Result<(RpScan, Self::Pager)> { + let mut path = path; + if path == "/" { + path = "" + } - Ok((RpList::default(), ImmutableDir::new(children))) + Ok(( + RpScan::default(), + ImmutableDir::new(self.children_flat(path)), + )) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { self.inner.blocking_read(path, args) } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { + fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingPager)> { let mut path = path; if path == "/" { path = "" } - let children = match args.style() { - ListStyle::Flat => self.children_flat(path), - ListStyle::Hierarchy => self.children_hierarchy(path), - }; + Ok(( + RpList::default(), + ImmutableDir::new(self.children_hierarchy(path)), + )) + } + + fn blocking_scan(&self, path: &str, _: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + let mut path = path; + if path == "/" { + path = "" + } - Ok((RpList::default(), ImmutableDir::new(children))) + Ok(( + RpScan::default(), + ImmutableDir::new(self.children_flat(path)), + )) } } diff --git a/src/layers/logging.rs b/src/layers/logging.rs index 03f94ca9fb53..e9420254e913 100644 --- a/src/layers/logging.rs +++ b/src/layers/logging.rs @@ -461,6 +461,53 @@ impl LayeredAccessor for LoggingAccessor { .await } + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + debug!( + target: LOGGING_TARGET, + "service={} operation={} path={} -> started", + self.scheme, + Operation::Scan, + path + ); + + self.inner + .scan(path, args) + .map(|v| match v { + Ok((rp, v)) => { + debug!( + target: LOGGING_TARGET, + "service={} operation={} path={} -> start scanning", + self.scheme, + Operation::Scan, + path + ); + let streamer = LoggingPager::new( + self.scheme, + path, + v, + self.error_level, + self.failure_level, + ); + Ok((rp, streamer)) + } + Err(err) => { + if let Some(lvl) = self.err_level(&err) { + log!( + target: LOGGING_TARGET, + lvl, + "service={} operation={} path={} -> {}: {err:?}", + self.scheme, + Operation::Scan, + path, + self.err_status(&err) + ); + } + Err(err) + } + }) + .await + } + fn presign(&self, path: &str, args: OpPresign) -> Result { debug!( target: LOGGING_TARGET, @@ -936,6 +983,45 @@ impl LayeredAccessor for LoggingAccessor { err }) } + + fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + debug!( + target: LOGGING_TARGET, + "service={} operation={} path={} -> started", + self.scheme, + Operation::BlockingScan, + path + ); + + self.inner + .blocking_scan(path, args) + .map(|(rp, v)| { + debug!( + target: LOGGING_TARGET, + "service={} operation={} path={} -> start scanning", + self.scheme, + Operation::BlockingScan, + path + ); + let li = + LoggingPager::new(self.scheme, path, v, self.error_level, self.failure_level); + (rp, li) + }) + .map_err(|err| { + if let Some(lvl) = self.err_level(&err) { + log!( + target: LOGGING_TARGET, + lvl, + "service={} operation={} path={} -> {}: {err:?}", + self.scheme, + Operation::BlockingScan, + path, + self.err_status(&err) + ); + } + err + }) + } } /// `LoggingReader` is a wrapper of `BytesReader`, with logging functionality. diff --git a/src/layers/metrics.rs b/src/layers/metrics.rs index 6ac293ff4c7b..079589b0b518 100644 --- a/src/layers/metrics.rs +++ b/src/layers/metrics.rs @@ -160,6 +160,9 @@ struct MetricsHandler { requests_total_list: Counter, requests_duration_seconds_list: Histogram, + requests_total_scan: Counter, + requests_duration_seconds_scan: Histogram, + requests_total_presign: Counter, requests_duration_seconds_presign: Histogram, @@ -195,6 +198,9 @@ struct MetricsHandler { requests_total_blocking_list: Counter, requests_duration_seconds_blocking_list: Histogram, + + requests_total_blocking_scan: Counter, + requests_duration_seconds_blocking_scan: Histogram, } impl MetricsHandler { @@ -289,6 +295,17 @@ impl MetricsHandler { LABEL_OPERATION => Operation::List.into_static(), ), + requests_total_scan: register_counter!( + METRIC_REQUESTS_TOTAL, + LABEL_SERVICE => service, + LABEL_OPERATION => Operation::Scan.into_static(), + ), + requests_duration_seconds_scan: register_histogram!( + METRIC_REQUESTS_DURATION_SECONDS, + LABEL_SERVICE => service, + LABEL_OPERATION => Operation::Scan.into_static(), + ), + requests_total_presign: register_counter!( METRIC_REQUESTS_TOTAL, LABEL_SERVICE => service, @@ -425,6 +442,17 @@ impl MetricsHandler { LABEL_SERVICE => service, LABEL_OPERATION => Operation::BlockingList.into_static(), ), + + requests_total_blocking_scan: register_counter!( + METRIC_REQUESTS_TOTAL, + LABEL_SERVICE => service, + LABEL_OPERATION => Operation::BlockingScan.into_static(), + ), + requests_duration_seconds_blocking_scan: register_histogram!( + METRIC_REQUESTS_DURATION_SECONDS, + LABEL_SERVICE => service, + LABEL_OPERATION => Operation::BlockingScan.into_static(), + ), } } @@ -614,6 +642,25 @@ impl LayeredAccessor for MetricsAccessor { .await } + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + self.handle.requests_total_scan.increment(1); + + let start = Instant::now(); + + self.inner + .scan(path, args) + .inspect_ok(|_| { + let dur = start.elapsed().as_secs_f64(); + + self.handle.requests_duration_seconds_scan.record(dur); + }) + .inspect_err(|e| { + self.handle + .increment_errors_total(Operation::Scan, e.kind()); + }) + .await + } + fn presign(&self, path: &str, args: OpPresign) -> Result { self.handle.requests_total_presign.increment(1); @@ -870,6 +917,24 @@ impl LayeredAccessor for MetricsAccessor { e }) } + + fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + self.handle.requests_total_blocking_scan.increment(1); + + let start = Instant::now(); + let result = self.inner.blocking_scan(path, args); + let dur = start.elapsed().as_secs_f64(); + + self.handle + .requests_duration_seconds_blocking_scan + .record(dur); + + result.map_err(|e| { + self.handle + .increment_errors_total(Operation::BlockingScan, e.kind()); + e + }) + } } pub struct MetricReader { diff --git a/src/layers/retry.rs b/src/layers/retry.rs index 3d1ea2fa1730..536295204590 100644 --- a/src/layers/retry.rs +++ b/src/layers/retry.rs @@ -238,6 +238,26 @@ impl LayeredAccessor for RetryAccessor { .await } + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + { || self.inner.scan(path, args.clone()) } + .retry(&self.builder) + .when(|e| e.is_temporary()) + .notify(|err, dur| { + warn!( + target: "opendal::service", + "operation={} -> retry after {}s: error={:?}", + Operation::Scan, dur.as_secs_f64(), err) + }) + .map(|v| { + v.map(|(l, p)| { + let pager = RetryPager::new(p, path, self.builder.clone()); + (l, pager) + }) + .map_err(|e| e.set_persistent()) + }) + .await + } + async fn create_multipart( &self, path: &str, @@ -385,6 +405,24 @@ impl LayeredAccessor for RetryAccessor { }) .map_err(|e| e.set_persistent()) } + + fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + { || self.inner.blocking_scan(path, args.clone()) } + .retry(&self.builder) + .when(|e| e.is_temporary()) + .notify(|err, dur| { + warn!( + target: "opendal::service", + "operation={} -> retry after {}s: error={:?}", + Operation::BlockingScan, dur.as_secs_f64(), err) + }) + .call() + .map(|(rp, p)| { + let p = RetryPager::new(p, path, self.builder.clone()); + (rp, p) + }) + .map_err(|e| e.set_persistent()) + } } /// TODO: Refactor me to replace duplicated code. @@ -782,7 +820,7 @@ mod tests { fn metadata(&self) -> AccessorMetadata { let mut am = AccessorMetadata::default(); am.set_capabilities(AccessorCapability::List); - am.set_hints(AccessorHint::ReadStreamable | AccessorHint::ListHierarchy); + am.set_hints(AccessorHint::ReadStreamable); am } diff --git a/src/layers/tracing.rs b/src/layers/tracing.rs index 3b0aa63f0d47..34017f2de480 100644 --- a/src/layers/tracing.rs +++ b/src/layers/tracing.rs @@ -185,6 +185,14 @@ impl LayeredAccessor for TracingAccessor { .await } + #[tracing::instrument(level = "debug", skip(self))] + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + self.inner + .scan(path, args) + .map(|v| v.map(|(rp, s)| (rp, TracingWrapper::new(Span::current(), s)))) + .await + } + #[tracing::instrument(level = "debug", skip(self))] fn presign(&self, path: &str, args: OpPresign) -> Result { self.inner.presign(path, args) @@ -266,6 +274,13 @@ impl LayeredAccessor for TracingAccessor { .blocking_list(path, args) .map(|(rp, it)| (rp, TracingWrapper::new(Span::current(), it))) } + + #[tracing::instrument(level = "debug", skip(self))] + fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + self.inner + .blocking_scan(path, args) + .map(|(rp, it)| (rp, TracingWrapper::new(Span::current(), it))) + } } pub struct TracingWrapper { diff --git a/src/layers/type_eraser.rs b/src/layers/type_eraser.rs index a02aee7848aa..3af3647ca7f7 100644 --- a/src/layers/type_eraser.rs +++ b/src/layers/type_eraser.rs @@ -80,4 +80,17 @@ impl LayeredAccessor for TypeEraseAccessor { .blocking_list(path, args) .map(|(rp, p)| (rp, Box::new(p) as output::BlockingPager)) } + + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + self.inner + .scan(path, args) + .await + .map(|(rp, p)| (rp, Box::new(p) as output::Pager)) + } + + fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + self.inner + .blocking_scan(path, args) + .map(|(rp, p)| (rp, Box::new(p) as output::BlockingPager)) + } } diff --git a/src/object/object.rs b/src/object/object.rs index 074aa81609ac..ca24b12c954d 100644 --- a/src/object/object.rs +++ b/src/object/object.rs @@ -1011,10 +1011,7 @@ impl Object { .with_context("path", self.path())); } - let (_, pager) = self - .acc - .list(self.path(), OpList::new(ListStyle::Hierarchy)) - .await?; + let (_, pager) = self.acc.list(self.path(), OpList::new()).await?; Ok(ObjectLister::new(self.acc.clone(), pager)) } @@ -1061,9 +1058,7 @@ impl Object { .with_context("path", self.path())); } - let (_, pager) = self - .acc - .blocking_list(self.path(), OpList::new(ListStyle::Hierarchy))?; + let (_, pager) = self.acc.blocking_list(self.path(), OpList::new())?; Ok(BlockingObjectLister::new(self.acc.clone(), pager)) } @@ -1111,10 +1106,7 @@ impl Object { .with_context("path", self.path())); } - let (_, pager) = self - .acc - .list(self.path(), OpList::new(ListStyle::Flat)) - .await?; + let (_, pager) = self.acc.scan(self.path(), OpScan::new()).await?; Ok(ObjectLister::new(self.acc.clone(), pager)) } @@ -1156,14 +1148,12 @@ impl Object { ErrorKind::ObjectNotADirectory, "the path trying to list is not a directory", ) - .with_operation("Object::blocking_list") + .with_operation("Object::blocking_scan") .with_context("service", self.accessor().metadata().scheme().into_static()) .with_context("path", self.path())); } - let (_, pager) = self - .acc - .blocking_list(self.path(), OpList::new(ListStyle::Flat))?; + let (_, pager) = self.acc.blocking_scan(self.path(), OpScan::new())?; Ok(BlockingObjectLister::new(self.acc.clone(), pager)) } diff --git a/src/ops.rs b/src/ops.rs index 74ba11cb9e65..03fd1d795ce6 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -55,10 +55,8 @@ impl OpDelete { } /// Args for `list` operation. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct OpList { - /// The style passed to underlying services to specify the list style. - style: ListStyle, /// The limit passed to underlying service to specify the max results /// that could return. limit: Option, @@ -66,13 +64,8 @@ pub struct OpList { impl OpList { /// Create a new `OpList`. - pub fn new(style: ListStyle) -> Self { - Self { style, limit: None } - } - - /// Get the style from list. - pub fn style(&self) -> ListStyle { - self.style + pub fn new() -> Self { + Self::default() } /// Change the limit of this list operation. @@ -96,6 +89,32 @@ pub enum ListStyle { Hierarchy, } +/// Args for `scan` operation. +#[derive(Debug, Default, Clone)] +pub struct OpScan { + /// The limit passed to underlying service to specify the max results + /// that could return. + limit: Option, +} + +impl OpScan { + /// Create a new `OpList`. + pub fn new() -> Self { + Self::default() + } + + /// Change the limit of this list operation. + pub fn with_limit(mut self, limit: usize) -> Self { + self.limit = Some(limit); + self + } + + /// Get the limit of list operation. + pub fn limit(&self) -> Option { + self.limit + } +} + /// Args for `create_multipart` operation. #[derive(Debug, Clone, Default)] pub struct OpCreateMultipart {} diff --git a/src/raw/accessor.rs b/src/raw/accessor.rs index 2a089539c8e4..6dfef28a593f 100644 --- a/src/raw/accessor.rs +++ b/src/raw/accessor.rs @@ -32,25 +32,6 @@ use crate::*; /// /// # Operations /// -/// | Name | Capability | -/// | ---- | ---------- | -/// | [`metadata`][Accessor::metadata] | - | -/// | [`create`][Accessor::create] | - | -/// | [`read`][Accessor::read] | - | -/// | [`write`][Accessor::write] | - | -/// | [`delete`][Accessor::delete] | - | -/// | [`list`][Accessor::list] | - | -/// | [`presign`][Accessor::presign] | `Presign` | -/// | [`create_multipart`][Accessor::create_multipart] | `Multipart` | -/// | [`write_multipart`][Accessor::write_multipart] | `Multipart` | -/// | [`complete_multipart`][Accessor::complete_multipart] | `Multipart` | -/// | [`abort_multipart`][Accessor::abort_multipart] | `Multipart` | -/// | [`blocking_create`][Accessor::blocking_create] | `Blocking` | -/// | [`blocking_read`][Accessor::blocking_read] | `Blocking` | -/// | [`blocking_write`][Accessor::blocking_write] | `Blocking` | -/// | [`blocking_delete`][Accessor::blocking_delete] | `Blocking` | -/// | [`blocking_list`][Accessor::blocking_list] | `Blocking` | -/// /// - Path in args will all be normalized into the same style, services /// should handle them based on services' requirement. /// - Path that ends with `/` means it's Dir, otherwise, it's File. @@ -77,12 +58,23 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { type BlockingPager: output::BlockingPage; /// Invoke the `metadata` operation to get metadata of accessor. - fn metadata(&self) -> AccessorMetadata { - unimplemented!("metadata() is required to be implemented") - } + /// + /// # Notes + /// + /// This function is required to be implemented. + /// + /// By returning AccessorMetadata, underlying services can declare + /// some useful information about it self. + /// + /// - scheme: declare the scheme of backend. + /// - capabilities: declare the capabilities of current backend. + /// - hints: declare the hints of current backend + fn metadata(&self) -> AccessorMetadata; /// Invoke the `create` operation on the specified path /// + /// Require [`AccessorCapability::Write`] + /// /// # Behavior /// /// - Input path MUST match with ObjectMode, DON'T NEED to check object mode. @@ -100,6 +92,8 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// Invoke the `read` operation on the specified path, returns a /// [`ObjectReader`][crate::ObjectReader] if operate successful. /// + /// Require [`AccessorCapability::Read`] + /// /// # Behavior /// /// - Input path MUST be file path, DON'T NEED to check object mode. @@ -116,6 +110,8 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// Invoke the `write` operation on the specified path, returns a /// written size if operate successful. /// + /// Require [`AccessorCapability::Write`] + /// /// # Behavior /// /// - Input path MUST be file path, DON'T NEED to check object mode. @@ -130,6 +126,8 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// Invoke the `stat` operation on the specified path. /// + /// Require [`AccessorCapability::Read`] + /// /// # Behavior /// /// - `stat` empty path means stat backend's root path. @@ -146,6 +144,8 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// Invoke the `delete` operation on the specified path. /// + /// Require [`AccessorCapability::Write`] + /// /// # Behavior /// /// - `delete` is an idempotent operation, it's safe to call `Delete` on the same path multiple times. @@ -161,6 +161,8 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// Invoke the `list` operation on the specified path. /// + /// Require [`AccessorCapability::List`] + /// /// # Behavior /// /// - Input path MUST be dir path, DON'T NEED to check object mode. @@ -174,11 +176,24 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { )) } + /// Invoke the `scan` operation on the specified path. + /// + /// Require [`AccessorCapability::Scan`] + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + let (_, _) = (path, args); + + Err(Error::new( + ErrorKind::Unsupported, + "operation is not supported", + )) + } + /// Invoke the `presign` operation on the specified path. /// + /// Require [`AccessorCapability::Presign`] + /// /// # Behavior /// - /// - Require capability: `Presign` /// - This API is optional, return [`std::io::ErrorKind::Unsupported`] if not supported. fn presign(&self, path: &str, args: OpPresign) -> Result { let (_, _) = (path, args); @@ -191,9 +206,10 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// Invoke the `create_multipart` operation on the specified path. /// + /// Require [`AccessorCapability::Multipart`] + /// /// # Behavior /// - /// - Require capability: `Multipart` /// - This op returns a `upload_id` which is required to for following APIs. async fn create_multipart( &self, @@ -210,9 +226,7 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// Invoke the `write_multipart` operation on the specified path. /// - /// # Behavior - /// - /// - Require capability: `Multipart` + /// Require [`AccessorCapability::Multipart`] async fn write_multipart( &self, path: &str, @@ -229,9 +243,7 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// Invoke the `complete_multipart` operation on the specified path. /// - /// # Behavior - /// - /// - Require capability: `Multipart` + /// Require [`AccessorCapability::Multipart`] async fn complete_multipart( &self, path: &str, @@ -247,9 +259,7 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// Invoke the `abort_multipart` operation on the specified path. /// - /// # Behavior - /// - /// - Require capability: `Multipart` + /// Require [`AccessorCapability::Multipart`] async fn abort_multipart( &self, path: &str, @@ -267,9 +277,7 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// /// This operation is the blocking version of [`Accessor::create`] /// - /// # Behavior - /// - /// - Require capability: `Blocking` + /// Require [`AccessorCapability::Write`] and [`AccessorCapability::Blocking`] fn blocking_create(&self, path: &str, args: OpCreate) -> Result { let (_, _) = (path, args); @@ -283,9 +291,7 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// /// This operation is the blocking version of [`Accessor::read`] /// - /// # Behavior - /// - /// - Require capability: `Blocking` + /// Require [`AccessorCapability::Read`] and [`AccessorCapability::Blocking`] fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { let (_, _) = (path, args); @@ -299,9 +305,7 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// /// This operation is the blocking version of [`Accessor::write`] /// - /// # Behavior - /// - /// - Require capability: `Blocking` + /// Require [`AccessorCapability::Write`] and [`AccessorCapability::Blocking`] fn blocking_write( &self, path: &str, @@ -320,9 +324,7 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// /// This operation is the blocking version of [`Accessor::stat`] /// - /// # Behavior - /// - /// - Require capability: `Blocking` + /// Require [`AccessorCapability::Read`] and [`AccessorCapability::Blocking`] fn blocking_stat(&self, path: &str, args: OpStat) -> Result { let (_, _) = (path, args); @@ -336,9 +338,7 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// /// This operation is the blocking version of [`Accessor::delete`] /// - /// # Behavior - /// - /// - Require capability: `Blocking` + /// Require [`AccessorCapability::Write`] and [`AccessorCapability::Blocking`] fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { let (_, _) = (path, args); @@ -352,9 +352,10 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// /// This operation is the blocking version of [`Accessor::list`] /// + /// Require [`AccessorCapability::List`] and [`AccessorCapability::Blocking`] + /// /// # Behavior /// - /// - Require capability: `Blocking` /// - List non-exist dir should return Empty. fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { let (_, _) = (path, args); @@ -364,6 +365,18 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { "operation is not supported", )) } + + /// Invoke the `blocking_scan` operation on the specified path. + /// + /// Require [`AccessorCapability::Scan`] and [`AccessorCapability::Blocking`] + fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + let (_, _) = (path, args); + + Err(Error::new( + ErrorKind::Unsupported, + "operation is not supported", + )) + } } /// All functions in `Accessor` only requires `&self`, so it's safe to implement @@ -398,6 +411,9 @@ impl Accessor for Arc { async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { self.as_ref().list(path, args).await } + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + self.as_ref().scan(path, args).await + } fn presign(&self, path: &str, args: OpPresign) -> Result { self.as_ref().presign(path, args) @@ -456,6 +472,10 @@ impl Accessor for Arc { fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { self.as_ref().blocking_list(path, args) } + + fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + self.as_ref().blocking_scan(path, args) + } } /// FusedAccessor is the type erased accessor with `Box`. @@ -554,6 +574,8 @@ flags! { Write, /// Add this capability if service supports `list` List, + /// Add this capability if service supports `scan` + Scan, /// Add this capability if service supports `presign` Presign, /// Add this capability if service supports `multipart` @@ -578,9 +600,5 @@ flags! { /// /// It's better to use stream to reading data. ReadStreamable, - /// List flat means this underlying can list with flat style. - ListFlat, - /// List hierarchy means this underlying can list with hierarchy style. - ListHierarchy, } } diff --git a/src/raw/io/output/into_reader/by_range.rs b/src/raw/io/output/into_reader/by_range.rs index 02a18244d6a5..35a4dcec233a 100644 --- a/src/raw/io/output/into_reader/by_range.rs +++ b/src/raw/io/output/into_reader/by_range.rs @@ -320,6 +320,13 @@ mod tests { type Pager = (); type BlockingPager = (); + fn metadata(&self) -> AccessorMetadata { + let mut am = AccessorMetadata::default(); + am.set_capabilities(AccessorCapability::Read); + + am + } + async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let bs = args.range().apply_on_bytes(self.data.clone()); diff --git a/src/raw/io/output/page.rs b/src/raw/io/output/page.rs index be3153d5d189..a80f44f22252 100644 --- a/src/raw/io/output/page.rs +++ b/src/raw/io/output/page.rs @@ -17,10 +17,8 @@ use async_trait::async_trait; use super::Entry; use crate::*; -/// Page trait is used by [`Accessor`] to implement `list` operation. -/// -/// `list` will return a boxed `Page` which allow users to call `next_page` -/// to fecth a new page of [`Entry`]. +/// Page trait is used by [`crate::raw::Accessor`] to implement `list` +/// or `scan` operation. #[async_trait] pub trait Page: Send + Sync + 'static { /// Fetch a new page of [`Entry`] diff --git a/src/raw/io/output/to_flat_pager.rs b/src/raw/io/output/to_flat_pager.rs index 25bcd60aa479..f593d0f29e21 100644 --- a/src/raw/io/output/to_flat_pager.rs +++ b/src/raw/io/output/to_flat_pager.rs @@ -27,11 +27,11 @@ pub fn to_flat_pager(acc: A, path: &str, size: usize) -> ToFlatP { let meta = acc.metadata(); debug_assert!( - !meta.hints().contains(AccessorHint::ListFlat), - "service already supports list flat, call to_flat_pager must be a mistake" + !meta.capabilities().contains(AccessorCapability::Scan), + "service already supports scan, call to_flat_pager must be a mistake" ); debug_assert!( - meta.hints().contains(AccessorHint::ListHierarchy), + meta.capabilities().contains(AccessorCapability::List), "service doesn't support list hierarchy, it must be a bug" ); } @@ -101,10 +101,7 @@ where async fn next_page(&mut self) -> Result>> { loop { if let Some(de) = self.dirs.pop_back() { - let (_, op) = self - .acc - .list(de.path(), OpList::new(ListStyle::Hierarchy)) - .await?; + let (_, op) = self.acc.list(de.path(), OpList::new()).await?; self.pagers.push((op, de, vec![])) } @@ -161,9 +158,7 @@ where fn next_page(&mut self) -> Result>> { loop { if let Some(de) = self.dirs.pop_back() { - let (_, op) = self - .acc - .blocking_list(de.path(), OpList::new(ListStyle::Hierarchy))?; + let (_, op) = self.acc.blocking_list(de.path(), OpList::new())?; self.pagers.push((op, de, vec![])) } @@ -253,7 +248,7 @@ mod tests { fn metadata(&self) -> AccessorMetadata { let mut am = AccessorMetadata::default(); - am.set_hints(AccessorHint::ListHierarchy); + am.set_capabilities(AccessorCapability::List); am } diff --git a/src/raw/layer.rs b/src/raw/layer.rs index 0ce2d378aedc..70633bab29f3 100644 --- a/src/raw/layer.rs +++ b/src/raw/layer.rs @@ -84,6 +84,14 @@ use crate::*; /// fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { /// self.inner.blocking_list(path, args) /// } +/// +/// async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { +/// self.inner.scan(path, args).await +/// } +/// +/// fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> { +/// self.inner.blocking_scan(path, args) +/// } /// } /// /// /// The public struct that exposed to users. @@ -144,6 +152,8 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static { async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)>; + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)>; + fn presign(&self, path: &str, args: OpPresign) -> Result { self.inner().presign(path, args) } @@ -205,6 +215,8 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static { } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)>; + + fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)>; } #[async_trait] @@ -242,6 +254,10 @@ impl Accessor for L { (self as &L).list(path, args).await } + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + (self as &L).scan(path, args).await + } + fn presign(&self, path: &str, args: OpPresign) -> Result { (self as &L).presign(path, args) } @@ -307,6 +323,10 @@ impl Accessor for L { fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { (self as &L).blocking_list(path, args) } + + fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + (self as &L).blocking_scan(path, args) + } } #[cfg(test)] diff --git a/src/raw/operation.rs b/src/raw/operation.rs index d8d5de4c82ff..71aefbe407b3 100644 --- a/src/raw/operation.rs +++ b/src/raw/operation.rs @@ -33,6 +33,8 @@ pub enum Operation { Delete, /// Operation for [`crate::raw::Accessor::list`] List, + /// Operation for [`crate::raw::Accessor::scan`] + Scan, /// Operation for [`crate::raw::Accessor::presign`] Presign, /// Operation for [`crate::raw::Accessor::create_multipart`] @@ -55,6 +57,8 @@ pub enum Operation { BlockingDelete, /// Operation for [`crate::raw::Accessor::blocking_list`] BlockingList, + /// Operation for [`crate::raw::Accessor::blocking_scan`] + BlockingScan, } impl Operation { @@ -86,6 +90,7 @@ impl From for &'static str { Operation::Stat => "stat", Operation::Delete => "delete", Operation::List => "list", + Operation::Scan => "scan", Operation::Presign => "presign", Operation::CreateMultipart => "create_multipart", Operation::WriteMultipart => "write_multipart", @@ -97,6 +102,7 @@ impl From for &'static str { Operation::BlockingStat => "blocking_stat", Operation::BlockingDelete => "blocking_delete", Operation::BlockingList => "blocking_list", + Operation::BlockingScan => "blocking_scan", } } } diff --git a/src/raw/rps.rs b/src/raw/rps.rs index e234af6084f5..3fa1be21f6df 100644 --- a/src/raw/rps.rs +++ b/src/raw/rps.rs @@ -28,6 +28,10 @@ pub struct RpDelete {} #[derive(Debug, Clone, Default)] pub struct RpList {} +/// Reply for `scan` operation. +#[derive(Debug, Clone, Default)] +pub struct RpScan {} + /// Reply for `create_multipart` operation. #[derive(Debug, Clone, Default)] pub struct RpCreateMultipart { diff --git a/src/services/azblob/backend.rs b/src/services/azblob/backend.rs index 23783bc5f6a8..b39785f88637 100644 --- a/src/services/azblob/backend.rs +++ b/src/services/azblob/backend.rs @@ -419,16 +419,15 @@ impl Accessor for AzblobBackend { type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { + use AccessorCapability::*; + use AccessorHint::*; + let mut am = AccessorMetadata::default(); am.set_scheme(Scheme::Azblob) .set_root(&self.root) .set_name(&self.container) - .set_capabilities( - AccessorCapability::Read | AccessorCapability::Write | AccessorCapability::List, - ) - .set_hints( - AccessorHint::ReadStreamable | AccessorHint::ListFlat | AccessorHint::ListHierarchy, - ); + .set_capabilities(Read | Write | List | Scan) + .set_hints(ReadStreamable); am } @@ -520,21 +519,28 @@ impl Accessor for AzblobBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - let delimiter = match args.style() { - ListStyle::Flat => "".to_string(), - ListStyle::Hierarchy => "/".to_string(), - }; - let op = DirStream::new( Arc::new(self.clone()), self.root.clone(), path.to_string(), - delimiter, + "/".to_string(), args.limit(), ); Ok((RpList::default(), op)) } + + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + let op = DirStream::new( + Arc::new(self.clone()), + self.root.clone(), + path.to_string(), + "".to_string(), + args.limit(), + ); + + Ok((RpScan::default(), op)) + } } impl AzblobBackend { diff --git a/src/services/azdfs/backend.rs b/src/services/azdfs/backend.rs index 435c148863bf..a8e7a7f39c54 100644 --- a/src/services/azdfs/backend.rs +++ b/src/services/azdfs/backend.rs @@ -311,7 +311,7 @@ impl Accessor for AzdfsBackend { .set_capabilities( AccessorCapability::Read | AccessorCapability::Write | AccessorCapability::List, ) - .set_hints(AccessorHint::ReadStreamable | AccessorHint::ListHierarchy); + .set_hints(AccessorHint::ReadStreamable); am } @@ -423,13 +423,6 @@ impl Accessor for AzdfsBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - if !matches!(args.style(), ListStyle::Hierarchy) { - return Err(Error::new( - ErrorKind::Unsupported, - "list with flat style is not supported", - )); - } - let op = DirStream::new( Arc::new(self.clone()), self.root.clone(), diff --git a/src/services/fs/backend.rs b/src/services/fs/backend.rs index ee933a0e1102..ea2d66606cd4 100644 --- a/src/services/fs/backend.rs +++ b/src/services/fs/backend.rs @@ -304,7 +304,7 @@ impl Accessor for FsBackend { | AccessorCapability::List | AccessorCapability::Blocking, ) - .set_hints(AccessorHint::ReadSeekable | AccessorHint::ListHierarchy); + .set_hints(AccessorHint::ReadSeekable); am } @@ -511,13 +511,6 @@ impl Accessor for FsBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - if !matches!(args.style(), ListStyle::Hierarchy) { - return Err(Error::new( - ErrorKind::Unsupported, - "list with flat style is not supported", - )); - } - let p = self.root.join(path.trim_end_matches('/')); let f = match tokio::fs::read_dir(&p).await { @@ -722,13 +715,6 @@ impl Accessor for FsBackend { } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { - if !matches!(args.style(), ListStyle::Hierarchy) { - return Err(Error::new( - ErrorKind::Unsupported, - "list with flat style is not supported", - )); - } - let p = self.root.join(path.trim_end_matches('/')); let f = match std::fs::read_dir(p) { diff --git a/src/services/ftp/backend.rs b/src/services/ftp/backend.rs index ecc0f3e8bc63..91d5f16fc265 100644 --- a/src/services/ftp/backend.rs +++ b/src/services/ftp/backend.rs @@ -321,8 +321,7 @@ impl Accessor for FtpBackend { .set_root(&self.root) .set_capabilities( AccessorCapability::Read | AccessorCapability::Write | AccessorCapability::List, - ) - .set_hints(AccessorHint::ListHierarchy); + ); am } @@ -451,13 +450,6 @@ impl Accessor for FtpBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - if !matches!(args.style(), ListStyle::Hierarchy) { - return Err(Error::new( - ErrorKind::Unsupported, - "list with flat style is not supported", - )); - } - let mut ftp_stream = self.ftp_connect(Operation::List).await?; let pathname = if path == "/" { None } else { Some(path) }; diff --git a/src/services/gcs/backend.rs b/src/services/gcs/backend.rs index 5a21bed6894f..a4b898f54074 100644 --- a/src/services/gcs/backend.rs +++ b/src/services/gcs/backend.rs @@ -350,16 +350,15 @@ impl Accessor for GcsBackend { type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { + use AccessorCapability::*; + use AccessorHint::*; + let mut am = AccessorMetadata::default(); am.set_scheme(Scheme::Gcs) .set_root(&self.root) .set_name(&self.bucket) - .set_capabilities( - AccessorCapability::Read | AccessorCapability::Write | AccessorCapability::List, - ) - .set_hints( - AccessorHint::ReadStreamable | AccessorHint::ListFlat | AccessorHint::ListHierarchy, - ); + .set_capabilities(Read | Write | List | Scan) + .set_hints(ReadStreamable); am } @@ -467,20 +466,16 @@ impl Accessor for GcsBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - let delimiter = match args.style() { - ListStyle::Flat => "", - ListStyle::Hierarchy => "/", - }; - Ok(( RpList::default(), - DirStream::new( - Arc::new(self.clone()), - &self.root, - path, - delimiter, - args.limit(), - ), + DirStream::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), + )) + } + + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + Ok(( + RpScan::default(), + DirStream::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), )) } } diff --git a/src/services/hdfs/backend.rs b/src/services/hdfs/backend.rs index 3c88b6eb402c..caa8a08eb017 100644 --- a/src/services/hdfs/backend.rs +++ b/src/services/hdfs/backend.rs @@ -235,7 +235,7 @@ impl Accessor for HdfsBackend { | AccessorCapability::List | AccessorCapability::Blocking, ) - .set_hints(AccessorHint::ReadSeekable | AccessorHint::ListHierarchy); + .set_hints(AccessorHint::ReadSeekable); am } @@ -393,13 +393,6 @@ impl Accessor for HdfsBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - if !matches!(args.style(), ListStyle::Hierarchy) { - return Err(Error::new( - ErrorKind::Unsupported, - "list with flat style is not supported", - )); - } - let p = build_rooted_abs_path(&self.root, path); let f = match self.client.read_dir(&p) { @@ -574,13 +567,6 @@ impl Accessor for HdfsBackend { } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { - if !matches!(args.style(), ListStyle::Hierarchy) { - return Err(Error::new( - ErrorKind::Unsupported, - "list with flat style is not supported", - )); - } - let p = build_rooted_abs_path(&self.root, path); let f = match self.client.read_dir(&p) { diff --git a/src/services/ipfs/backend.rs b/src/services/ipfs/backend.rs index 2fa6410cd245..792e1e2e2762 100644 --- a/src/services/ipfs/backend.rs +++ b/src/services/ipfs/backend.rs @@ -217,7 +217,7 @@ impl Accessor for IpfsBackend { ma.set_scheme(Scheme::Ipfs) .set_root(&self.root) .set_capabilities(AccessorCapability::Read | AccessorCapability::List) - .set_hints(AccessorHint::ReadStreamable | AccessorHint::ListHierarchy); + .set_hints(AccessorHint::ReadStreamable); ma } @@ -383,14 +383,7 @@ impl Accessor for IpfsBackend { } } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - if !matches!(args.style(), ListStyle::Hierarchy) { - return Err(Error::new( - ErrorKind::Unsupported, - "list with flat style is not supported", - )); - } - + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Pager)> { Ok(( RpList::default(), DirStream::new(Arc::new(self.clone()), path), diff --git a/src/services/ipmfs/backend.rs b/src/services/ipmfs/backend.rs index e9021564395e..2547556c37d3 100644 --- a/src/services/ipmfs/backend.rs +++ b/src/services/ipmfs/backend.rs @@ -71,7 +71,7 @@ impl Accessor for IpmfsBackend { .set_capabilities( AccessorCapability::Read | AccessorCapability::Write | AccessorCapability::List, ) - .set_hints(AccessorHint::ReadStreamable | AccessorHint::ListHierarchy); + .set_hints(AccessorHint::ReadStreamable); am } @@ -170,14 +170,7 @@ impl Accessor for IpmfsBackend { } } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - if !matches!(args.style(), ListStyle::Hierarchy) { - return Err(Error::new( - ErrorKind::Unsupported, - "list with flat style is not supported", - )); - } - + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Pager)> { Ok(( RpList::default(), DirStream::new(Arc::new(self.clone()), &self.root, path), diff --git a/src/services/obs/backend.rs b/src/services/obs/backend.rs index 5ea90848715e..19476c073878 100644 --- a/src/services/obs/backend.rs +++ b/src/services/obs/backend.rs @@ -307,16 +307,15 @@ impl Accessor for ObsBackend { type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { + use AccessorCapability::*; + use AccessorHint::*; + let mut am = AccessorMetadata::default(); am.set_scheme(Scheme::Obs) .set_root(&self.root) .set_name(&self.bucket) - .set_capabilities( - AccessorCapability::Read | AccessorCapability::Write | AccessorCapability::List, - ) - .set_hints( - AccessorHint::ReadStreamable | AccessorHint::ListFlat | AccessorHint::ListHierarchy, - ); + .set_capabilities(Read | Write | List | Scan) + .set_hints(ReadStreamable); am } @@ -410,20 +409,16 @@ impl Accessor for ObsBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - let delimiter = match args.style() { - ListStyle::Flat => "", - ListStyle::Hierarchy => "/", - }; - Ok(( RpList::default(), - DirStream::new( - Arc::new(self.clone()), - &self.root, - path, - delimiter, - args.limit(), - ), + DirStream::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), + )) + } + + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + Ok(( + RpScan::default(), + DirStream::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), )) } } diff --git a/src/services/oss/backend.rs b/src/services/oss/backend.rs index baf117cdd3db..28b24eff6a89 100644 --- a/src/services/oss/backend.rs +++ b/src/services/oss/backend.rs @@ -391,19 +391,16 @@ impl Accessor for OssBackend { type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { + use AccessorCapability::*; + use AccessorHint::*; + let mut am = AccessorMetadata::default(); am.set_scheme(Scheme::Oss) .set_root(&self.root) .set_name(&self.bucket) - .set_capabilities( - AccessorCapability::Read - | AccessorCapability::Write - | AccessorCapability::List - | AccessorCapability::Presign, - ) - .set_hints( - AccessorHint::ReadStreamable | AccessorHint::ListFlat | AccessorHint::ListHierarchy, - ); + .set_capabilities(Read | Write | List | Scan | Presign) + .set_hints(ReadStreamable); + am } @@ -490,20 +487,16 @@ impl Accessor for OssBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - let delimiter = match args.style() { - ListStyle::Flat => "", - ListStyle::Hierarchy => "/", - }; - Ok(( RpList::default(), - DirStream::new( - Arc::new(self.clone()), - &self.root, - path, - delimiter, - args.limit(), - ), + DirStream::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), + )) + } + + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + Ok(( + RpScan::default(), + DirStream::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), )) } diff --git a/src/services/s3/backend.rs b/src/services/s3/backend.rs index cacc6a9e3f64..79fa8246da2a 100644 --- a/src/services/s3/backend.rs +++ b/src/services/s3/backend.rs @@ -1111,20 +1111,15 @@ impl Accessor for S3Backend { type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { + use AccessorCapability::*; + use AccessorHint::*; + let mut am = AccessorMetadata::default(); am.set_scheme(Scheme::S3) .set_root(&self.root) .set_name(&self.bucket) - .set_capabilities( - AccessorCapability::Read - | AccessorCapability::Write - | AccessorCapability::List - | AccessorCapability::Presign - | AccessorCapability::Multipart, - ) - .set_hints( - AccessorHint::ReadStreamable | AccessorHint::ListFlat | AccessorHint::ListHierarchy, - ); + .set_capabilities(Read | Write | List | Scan | Presign | Multipart) + .set_hints(ReadStreamable); am } @@ -1215,20 +1210,16 @@ impl Accessor for S3Backend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - let delimiter = match args.style() { - ListStyle::Flat => "", - ListStyle::Hierarchy => "/", - }; - Ok(( RpList::default(), - DirStream::new( - Arc::new(self.clone()), - &self.root, - path, - delimiter, - args.limit(), - ), + DirStream::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), + )) + } + + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + Ok(( + RpScan::default(), + DirStream::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), )) } diff --git a/src/services/sled/backend.rs b/src/services/sled/backend.rs index 2b85877c92ac..c0a24205f30f 100644 --- a/src/services/sled/backend.rs +++ b/src/services/sled/backend.rs @@ -12,15 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{ - raw::{adapters::kv, *}, - Builder, Error, ErrorKind, Scheme, *, -}; +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; + use async_trait::async_trait; -use std::{ - collections::HashMap, - fmt::{Debug, Formatter}, -}; + +use crate::raw::adapters::kv; +use crate::raw::*; +use crate::Builder; +use crate::Error; +use crate::ErrorKind; +use crate::Scheme; +use crate::*; /// Sled service support. /// diff --git a/src/services/webhdfs/backend.rs b/src/services/webhdfs/backend.rs index c73210d03aa6..852735fddfa1 100644 --- a/src/services/webhdfs/backend.rs +++ b/src/services/webhdfs/backend.rs @@ -547,7 +547,7 @@ impl Accessor for WebhdfsBackend { .set_capabilities( AccessorCapability::Read | AccessorCapability::Write | AccessorCapability::List, ) - .set_hints(AccessorHint::ReadStreamable | AccessorHint::ListHierarchy); + .set_hints(AccessorHint::ReadStreamable); am } @@ -675,14 +675,7 @@ impl Accessor for WebhdfsBackend { } } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - if !matches!(args.style(), ListStyle::Hierarchy) { - return Err(Error::new( - ErrorKind::Unsupported, - "list with flat style is not supported", - )); - } - + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Pager)> { let path = path.trim_end_matches('/'); let req = self.webhdfs_list_status_req(path)?;