Skip to content

Commit

Permalink
refactor: Implement query based object metadata cache (#1395)
Browse files Browse the repository at this point in the history
* Don't allow serde metadata

Signed-off-by: Xuanwo <[email protected]>

* Avoid copy path

Signed-off-by: Xuanwo <[email protected]>

* Remove metadata related operation on object

Signed-off-by: Xuanwo <[email protected]>

* Remove not needed clone

Signed-off-by: Xuanwo <[email protected]>

* Save work

Signed-off-by: Xuanwo <[email protected]>

* Refactor stat and metadata

Signed-off-by: Xuanwo <[email protected]>

* Remove not needed with complete call

Signed-off-by: Xuanwo <[email protected]>

* Set complete to true for dir

Signed-off-by: Xuanwo <[email protected]>

* Reduce the extra call

Signed-off-by: Xuanwo <[email protected]>

* Format code

Signed-off-by: Xuanwo <[email protected]>

* Fix test

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Feb 21, 2023
1 parent 8523be3 commit 1dc5521
Show file tree
Hide file tree
Showing 38 changed files with 447 additions and 477 deletions.
6 changes: 3 additions & 3 deletions binaries/oay/src/services/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Service {
.op
.object(&percent_decode(req.path().as_bytes()).decode_utf8_lossy());

let meta = o.metadata().await?;
let meta = o.stat().await?;

let (size, r) = if let Some(range) = req.headers().get(header::RANGE) {
let br = BytesRange::from_str(range.to_str().map_err(|e| {
Expand All @@ -103,7 +103,7 @@ impl Service {
}

async fn put(&self, req: HttpRequest, mut body: web::Payload) -> Result<HttpResponse> {
let o = self
let mut o = self
.op
.object(&percent_decode(req.path().as_bytes()).decode_utf8_lossy());

Expand Down Expand Up @@ -167,7 +167,7 @@ impl Service {
let o = self
.op
.object(&percent_decode(req.path().as_bytes()).decode_utf8_lossy());
let meta = o.metadata().await?;
let meta = o.stat().await?;

Ok(HttpResponse::Ok().body(SizedStream::new(
meta.content_length(),
Expand Down
4 changes: 2 additions & 2 deletions binaries/oli/src/commands/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ pub async fn main(args: Option<ArgMatches>) -> Result<()> {
.get_one::<String>("destination")
.ok_or_else(|| anyhow!("missing target"))?;
let (dst_op, dst_path) = parse_location(dst)?;
let dst_o = dst_op.object(dst_path);
let mut dst_o = dst_op.object(dst_path);

let size = src_o.metadata().await?.content_length();
let size = src_o.stat().await?.content_length();
let reader = src_o.reader().await?;
dst_o.write_from(size, reader).await?;
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions bindings/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl std::fmt::Display for OpendalStore {
#[async_trait]
impl ObjectStore for OpendalStore {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let o = self.inner.object(location.as_ref());
let mut o = self.inner.object(location.as_ref());
Ok(o.write(bytes)
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?)
Expand Down Expand Up @@ -106,7 +106,7 @@ impl ObjectStore for OpendalStore {
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let o = self.inner.object(location.as_ref());
let meta = o
.metadata()
.stat()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;

Expand All @@ -127,7 +127,7 @@ impl ObjectStore for OpendalStore {
}

async fn delete(&self, location: &Path) -> Result<()> {
let o = self.inner.object(location.as_ref());
let mut o = self.inner.object(location.as_ref());
o.delete()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! use opendal::ErrorKind;
//! # #[tokio::main]
//! # async fn test(op: Operator) -> Result<()> {
//! if let Err(e) = op.object("test_file").metadata().await {
//! if let Err(e) = op.object("test_file").stat().await {
//! if e.kind() == ErrorKind::ObjectNotFound {
//! println!("object not exist")
//! }
Expand Down
13 changes: 13 additions & 0 deletions src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,19 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
self.complete_blocking_reader(path, args)
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.inner
.stat(path, args)
.await
.map(|v| v.map_metadata(|m| m.with_complete()))
}

fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.inner
.blocking_stat(path, args)
.map(|v| v.map_metadata(|m| m.with_complete()))
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
self.complete_list(path, args).await
}
Expand Down
35 changes: 25 additions & 10 deletions src/layers/immutable_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,17 @@ mod tests {
let mut map = HashMap::new();
let mut set = HashSet::new();
let mut ds = op.object("").list().await?;
while let Some(entry) = ds.try_next().await? {
while let Some(mut entry) = ds.try_next().await? {
debug!("got entry: {}", entry.path());
assert!(
set.insert(entry.path().to_string()),
"duplicated value: {}",
entry.path()
);
map.insert(entry.path().to_string(), entry.mode().await?);
map.insert(
entry.path().to_string(),
entry.metadata(ObjectMetadataKey::Mode).await?.mode(),
);
}

assert_eq!(map["file"], ObjectMode::FILE);
Expand Down Expand Up @@ -324,14 +327,17 @@ mod tests {
let mut ds = op.object("/").scan().await?;
let mut set = HashSet::new();
let mut map = HashMap::new();
while let Some(entry) = ds.try_next().await? {
while let Some(mut entry) = ds.try_next().await? {
debug!("got entry: {}", entry.path());
assert!(
set.insert(entry.path().to_string()),
"duplicated value: {}",
entry.path()
);
map.insert(entry.path().to_string(), entry.mode().await?);
map.insert(
entry.path().to_string(),
entry.metadata(ObjectMetadataKey::Mode).await?.mode(),
);
}

debug!("current files: {:?}", map);
Expand Down Expand Up @@ -366,13 +372,16 @@ mod tests {
let mut map = HashMap::new();
let mut set = HashSet::new();
let mut ds = op.object("/").list().await?;
while let Some(entry) = ds.try_next().await? {
while let Some(mut entry) = ds.try_next().await? {
assert!(
set.insert(entry.path().to_string()),
"duplicated value: {}",
entry.path()
);
map.insert(entry.path().to_string(), entry.mode().await?);
map.insert(
entry.path().to_string(),
entry.metadata(ObjectMetadataKey::Mode).await?.mode(),
);
}

assert_eq!(map.len(), 1);
Expand All @@ -382,13 +391,16 @@ mod tests {
let mut map = HashMap::new();
let mut set = HashSet::new();
let mut ds = op.object("dataset/stateful/").list().await?;
while let Some(entry) = ds.try_next().await? {
while let Some(mut entry) = ds.try_next().await? {
assert!(
set.insert(entry.path().to_string()),
"duplicated value: {}",
entry.path()
);
map.insert(entry.path().to_string(), entry.mode().await?);
map.insert(
entry.path().to_string(),
entry.metadata(ObjectMetadataKey::Mode).await?.mode(),
);
}

assert_eq!(
Expand Down Expand Up @@ -430,13 +442,16 @@ mod tests {

let mut map = HashMap::new();
let mut set = HashSet::new();
while let Some(entry) = ds.try_next().await? {
while let Some(mut entry) = ds.try_next().await? {
assert!(
set.insert(entry.path().to_string()),
"duplicated value: {}",
entry.path()
);
map.insert(entry.path().to_string(), entry.mode().await?);
map.insert(
entry.path().to_string(),
entry.metadata(ObjectMetadataKey::Mode).await?.mode(),
);
}

debug!("current files: {:?}", map);
Expand Down
10 changes: 5 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
//! .finish();
//!
//! // Create object handler.
//! let o = op.object("test_file");
//! let mut o = op.object("test_file");
//!
//! // Write data
//! o.write("Hello, World!").await?;
Expand All @@ -49,7 +49,7 @@
//! let bs = o.read().await?;
//!
//! // Fetch metadata
//! let meta = o.metadata().await?;
//! let meta = o.stat().await?;
//! let mode = meta.mode();
//! let length = meta.content_length();
//!
Expand Down Expand Up @@ -79,6 +79,7 @@ mod object;
pub use object::Object;
pub use object::ObjectLister;
pub use object::ObjectMetadata;
pub use object::ObjectMetadataKey;
pub use object::ObjectMode;
pub use object::ObjectMultipart;
pub use object::ObjectPart;
Expand Down Expand Up @@ -116,9 +117,8 @@ mod tests {
assert_eq!(88, size_of::<AccessorMetadata>());
assert_eq!(16, size_of::<Operator>());
assert_eq!(112, size_of::<BatchOperator>());
assert_eq!(208, size_of::<output::Entry>());
assert_eq!(48, size_of::<Object>());
assert_eq!(184, size_of::<ObjectMetadata>());
assert_eq!(32, size_of::<Object>());
assert_eq!(192, size_of::<ObjectMetadata>());
assert_eq!(1, size_of::<ObjectMode>());
assert_eq!(64, size_of::<ObjectMultipart>());
assert_eq!(32, size_of::<ObjectPart>());
Expand Down
10 changes: 1 addition & 9 deletions src/object/blocking_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@

use std::io;
use std::io::SeekFrom;
use std::sync::Arc;

use bytes::Bytes;
use parking_lot::Mutex;

use crate::error::Error;
use crate::error::Result;
use crate::ops::OpRead;
use crate::raw::*;
use crate::ErrorKind;
use crate::ObjectMetadata;

/// BlockingObjectReader is the public API for users.
pub struct BlockingObjectReader {
Expand All @@ -39,12 +36,7 @@ impl BlockingObjectReader {
///
/// We don't want to expose those details to users so keep this function
/// in crate only.
pub(crate) fn create(
acc: FusedAccessor,
path: &str,
_meta: Arc<Mutex<ObjectMetadata>>,
op: OpRead,
) -> Result<Self> {
pub(crate) fn create(acc: FusedAccessor, path: &str, op: OpRead) -> Result<Self> {
let acc_meta = acc.metadata();

let r = if acc_meta.hints().contains(AccessorHint::ReadSeekable) {
Expand Down
Loading

2 comments on commit 1dc5521

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for opendal ready!

✅ Preview
https://opendal-7thnf2iir-databend.vercel.app

Built with commit 1dc5521.
This pull request is being automatically deployed with vercel-action

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Rust Benchmark'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 1dc5521 Previous: 8523be3 Ratio
service_fs_read_full/4.00 KiB 106819 ns/iter (± 32418) 42975 ns/iter (± 21102) 2.49
service_memory_read_full/256 KiB 29706 ns/iter (± 1354) 13336 ns/iter (± 11) 2.23

This comment was automatically generated by workflow using github-action-benchmark.

CC: @Xuanwo

Please sign in to comment.