Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add native & full capability #2874

Merged
merged 4 commits into from
Aug 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/oay/src/services/s3/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn handle_list_objects(
) -> Result<OkResponse, ErrorResponse> {
debug!("got params: {:?}", params);

if !state.op.info().capability().list_with_start_after {
if !state.op.info().full_capability().list_with_start_after {
return Err(ErrorResponse {
code: StatusCode::NOT_IMPLEMENTED,
err: Error {
Expand Down
7 changes: 3 additions & 4 deletions core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,9 @@ impl<A: Accessor> LayeredAccessor for BlockingAccessor<A> {
}

fn metadata(&self) -> AccessorInfo {
let mut info = self.inner.info();
let cap = info.capability_mut();
cap.blocking = true;
info
let mut meta = self.inner.info();
meta.full_capability_mut().blocking = true;
meta
}

async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
Expand Down
57 changes: 33 additions & 24 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,8 @@ impl<A: Accessor> Layer<A> for CompleteLayer {
type LayeredAccessor = CompleteReaderAccessor<A>;

fn layer(&self, inner: A) -> Self::LayeredAccessor {
let meta = inner.info();
CompleteReaderAccessor {
meta,
meta: inner.info(),
inner: Arc::new(inner),
}
}
Expand All @@ -157,7 +156,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
path: &str,
args: OpRead,
) -> Result<(RpRead, CompleteReader<A, A::Reader>)> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.read {
return new_capability_unsupported_error(Operation::Read);
}
Expand Down Expand Up @@ -210,7 +209,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
path: &str,
args: OpRead,
) -> Result<(RpRead, CompleteReader<A, A::BlockingReader>)> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.read || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingRead);
}
Expand Down Expand Up @@ -266,7 +265,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
path: &str,
args: OpList,
) -> Result<(RpList, CompletePager<A, A::Pager>)> {
let cap = self.meta.capability();
let cap = self.meta.full_capability();
if !cap.list {
return Err(
Error::new(ErrorKind::Unsupported, "operation is not supported")
Expand Down Expand Up @@ -315,7 +314,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
path: &str,
args: OpList,
) -> Result<(RpList, CompletePager<A, A::BlockingPager>)> {
let cap = self.meta.capability();
let cap = self.meta.full_capability();
if !cap.list {
return Err(
Error::new(ErrorKind::Unsupported, "operation is not supported")
Expand Down Expand Up @@ -376,6 +375,16 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
&self.inner
}

fn metadata(&self) -> AccessorInfo {
let mut meta = self.meta.clone();
let cap = meta.full_capability_mut();
if cap.read {
cap.read_can_next = true;
cap.read_can_seek = true;
}
meta
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
self.complete_reader(path, args).await
}
Expand All @@ -385,7 +394,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.stat {
return new_capability_unsupported_error(Operation::Stat);
}
Expand All @@ -399,7 +408,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.stat || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingStat);
}
Expand All @@ -413,7 +422,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.write {
return new_capability_unsupported_error(Operation::Write);
}
Expand All @@ -426,7 +435,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.write || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingWrite);
}
Expand All @@ -438,7 +447,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.append {
return new_capability_unsupported_error(Operation::Append);
}
Expand All @@ -450,7 +459,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.create_dir {
return new_capability_unsupported_error(Operation::CreateDir);
}
Expand All @@ -459,7 +468,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.create_dir || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingCreateDir);
}
Expand All @@ -468,7 +477,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.delete {
return new_capability_unsupported_error(Operation::Delete);
}
Expand All @@ -477,7 +486,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.delete || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingDelete);
}
Expand All @@ -486,7 +495,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.copy {
return new_capability_unsupported_error(Operation::Copy);
}
Expand All @@ -495,7 +504,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.copy || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingCopy);
}
Expand All @@ -504,7 +513,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.rename {
return new_capability_unsupported_error(Operation::Rename);
}
Expand All @@ -513,7 +522,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.rename || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingRename);
}
Expand All @@ -522,7 +531,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.list {
return new_capability_unsupported_error(Operation::List);
}
Expand All @@ -531,7 +540,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.list || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingList);
}
Expand All @@ -540,7 +549,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.presign {
return new_capability_unsupported_error(Operation::Presign);
}
Expand All @@ -549,7 +558,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.batch {
return new_capability_unsupported_error(Operation::Batch);
}
Expand Down Expand Up @@ -952,7 +961,7 @@ mod tests {

fn info(&self) -> AccessorInfo {
let mut info = AccessorInfo::default();
info.set_capability(self.capability);
info.set_full_capability(self.capability);

info
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/immutable_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> {
fn metadata(&self) -> AccessorInfo {
let mut meta = self.inner.info();

let cap = meta.capability_mut();
let cap = meta.full_capability_mut();
cap.list = true;
cap.list_with_delimiter_slash = true;
cap.list_without_delimiter = true;
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/madsim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl LayeredAccessor for MadsimAccessor {
let mut info = AccessorInfo::default();
info.set_name("madsim");

info.set_capability(Capability {
info.set_full_capability(Capability {
read: true,
write: true,
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ mod tests {

fn info(&self) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.set_capability(Capability {
am.set_full_capability(Capability {
read: true,
list: true,
list_with_delimiter_slash: true,
Expand Down
29 changes: 18 additions & 11 deletions core/src/raw/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ impl Accessor for () {
scheme: Scheme::Custom("dummy"),
root: "".to_string(),
name: "dummy".to_string(),
capability: Capability::default(),
native_capability: Capability::default(),
full_capability: Capability::default(),
}
}
}
Expand Down Expand Up @@ -509,7 +510,8 @@ pub struct AccessorInfo {
root: String,
name: String,

capability: Capability,
native_capability: Capability,
full_capability: Capability,
}

impl AccessorInfo {
Expand Down Expand Up @@ -553,19 +555,24 @@ impl AccessorInfo {
self
}

/// Get backend's capabilities.
pub fn capability(&self) -> Capability {
self.capability
/// Get backend's native capabilities.
pub fn native_capability(&self) -> Capability {
self.native_capability
}

/// Get backend's capabilities.
pub fn capability_mut(&mut self) -> &mut Capability {
&mut self.capability
/// Get service's full capabilities.
pub fn full_capability(&self) -> Capability {
self.full_capability
}

/// Set capabilities for backend.
pub fn set_capability(&mut self, capability: Capability) -> &mut Self {
self.capability = capability;
/// Get service's full capabilities.
pub fn full_capability_mut(&mut self) -> &mut Capability {
&mut self.full_capability
}

/// Set full capabilities for service.
pub fn set_full_capability(&mut self, capability: Capability) -> &mut Self {
self.full_capability = capability;
self
}
}
2 changes: 1 addition & 1 deletion core/src/raw/adapters/kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl From<Metadata> for AccessorInfo {
let mut am = AccessorInfo::default();
am.set_name(m.name());
am.set_scheme(m.scheme());
am.set_capability(m.capabilities());
am.set_full_capability(m.capabilities());

am
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<S: Adapter> Accessor for Backend<S> {
let mut am: AccessorInfo = self.kv.metadata().into();
am.set_root(&self.root);

let cap = am.capability_mut();
let cap = am.full_capability_mut();
if cap.read {
cap.read_can_seek = true;
cap.read_can_next = true;
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/adapters/typed_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<S: Adapter> Accessor for Backend<S> {
am.set_scheme(kv_info.scheme());
am.set_name(kv_info.name());
let kv_cap = kv_info.capabilities();
let cap = am.capability_mut();
let cap = am.full_capability_mut();
if kv_cap.get {
cap.read = true;
cap.read_can_seek = true;
Expand Down
6 changes: 3 additions & 3 deletions core/src/raw/oio/page/into_flat_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn into_flat_page<A: Accessor, P>(acc: A, path: &str, size: usize) -> FlatPa
{
let meta = acc.info();
debug_assert!(
meta.capability().list_with_delimiter_slash,
meta.full_capability().list_with_delimiter_slash,
"service doesn't support list hierarchy, it must be a bug"
);
}
Expand Down Expand Up @@ -254,8 +254,8 @@ mod tests {

fn info(&self) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.capability_mut().list = true;
am.capability_mut().list_with_delimiter_slash = true;
am.full_capability_mut().list = true;
am.full_capability_mut().list_with_delimiter_slash = true;

am
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/oio/read/into_seekable_read_by_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ mod tests {

fn info(&self) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.set_capability(Capability {
am.set_full_capability(Capability {
read: true,
..Default::default()
});
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ impl Accessor for AzblobBackend {
am.set_scheme(Scheme::Azblob)
.set_root(&self.core.root)
.set_name(&self.core.container)
.set_capability(Capability {
.set_full_capability(Capability {
stat: true,
stat_with_if_match: true,
stat_with_if_none_match: true,
Expand Down
Loading