diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 3ddce08002c4..d3c50861c122 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -331,19 +331,16 @@ impl ObjectStore for AmazonS3 { .boxed() } - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>> { - self.client.list(prefix).await + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + self.client.list(prefix) } - async fn list_with_offset( + fn list_with_offset( &self, prefix: Option<&Path>, offset: &Path, - ) -> Result>> { - self.client.list_with_offset(prefix, offset).await + ) -> BoxStream<'_, Result> { + self.client.list_with_offset(prefix, offset) } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 190b73bf9490..2a08c6775807 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -206,11 +206,8 @@ impl ObjectStore for MicrosoftAzure { self.client.delete_request(location, &()).await } - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>> { - self.client.list(prefix).await + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + self.client.list(prefix) } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index 008dec679413..d3e02b412725 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -147,19 +147,16 @@ impl ObjectStore for ChunkedStore { self.inner.delete(location).await } - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>> { - self.inner.list(prefix).await + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + self.inner.list(prefix) } - async fn list_with_offset( + fn list_with_offset( &self, prefix: Option<&Path>, offset: &Path, - ) -> Result>> { - self.inner.list_with_offset(prefix, offset).await + ) -> BoxStream<'_, Result> { + self.inner.list_with_offset(prefix, offset) } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { diff --git a/object_store/src/client/list.rs b/object_store/src/client/list.rs index b2dbee27f14d..371894dfeb71 100644 --- a/object_store/src/client/list.rs +++ b/object_store/src/client/list.rs @@ -46,16 +46,13 @@ pub trait ListClientExt { offset: Option<&Path>, ) -> BoxStream<'_, Result>; - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>>; + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result>; - async fn list_with_offset( + fn list_with_offset( &self, prefix: Option<&Path>, offset: &Path, - ) -> Result>>; + ) -> BoxStream<'_, Result>; async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result; } @@ -90,31 +87,22 @@ impl ListClientExt for T { .boxed() } - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>> { - let stream = self - .list_paginated(prefix, false, None) + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + self.list_paginated(prefix, false, None) .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok))) .try_flatten() - .boxed(); - - Ok(stream) + .boxed() } - async fn list_with_offset( + fn list_with_offset( &self, prefix: Option<&Path>, offset: &Path, - ) -> Result>> { - let stream = self - .list_paginated(prefix, false, Some(offset)) + ) -> BoxStream<'_, Result> { + self.list_paginated(prefix, false, Some(offset)) .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok))) .try_flatten() - .boxed(); - - Ok(stream) + .boxed() } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index a75527fe7b9f..513e396cbae6 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -601,11 +601,8 @@ impl ObjectStore for GoogleCloudStorage { self.client.delete_request(location).await } - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>> { - self.client.list(prefix).await + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + self.client.list(prefix) } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index 6ffb62358941..2fd7850b6bbf 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -34,7 +34,7 @@ use async_trait::async_trait; use bytes::Bytes; use futures::stream::BoxStream; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use snafu::{OptionExt, ResultExt, Snafu}; use tokio::io::AsyncWrite; @@ -122,14 +122,13 @@ impl ObjectStore for HttpStore { self.client.delete(location).await } - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default(); - let status = self.client.list(prefix, "infinity").await?; - Ok(futures::stream::iter( - status + let prefix = prefix.cloned(); + futures::stream::once(async move { + let status = self.client.list(prefix.as_ref(), "infinity").await?; + + let iter = status .response .into_iter() .filter(|r| !r.is_dir()) @@ -138,9 +137,12 @@ impl ObjectStore for HttpStore { response.object_meta(self.client.base_url()) }) // Filter out exact prefix matches - .filter_ok(move |r| r.location.as_ref().len() > prefix_len), - ) - .boxed()) + .filter_ok(move |r| r.location.as_ref().len() > prefix_len); + + Ok::<_, crate::Error>(futures::stream::iter(iter)) + }) + .try_flatten() + .boxed() } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index b79042e3cda8..9b396444fa0d 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -95,18 +95,18 @@ //! //! ``` //! # use object_store::local::LocalFileSystem; +//! # use std::sync::Arc; +//! # use object_store::{path::Path, ObjectStore}; +//! # use futures::stream::StreamExt; //! # // use LocalFileSystem for example -//! # fn get_object_store() -> LocalFileSystem { -//! # LocalFileSystem::new_with_prefix("/tmp").unwrap() +//! # fn get_object_store() -> Arc { +//! # Arc::new(LocalFileSystem::new()) //! # } -//! +//! # //! # async fn example() { -//! use std::sync::Arc; -//! use object_store::{path::Path, ObjectStore}; -//! use futures::stream::StreamExt; -//! +//! # //! // create an ObjectStore -//! let object_store: Arc = Arc::new(get_object_store()); +//! let object_store: Arc = get_object_store(); //! //! // Recursively list all files below the 'data' path. //! // 1. On AWS S3 this would be the 'data/' prefix @@ -114,21 +114,12 @@ //! let prefix: Path = "data".try_into().unwrap(); //! //! // Get an `async` stream of Metadata objects: -//! let list_stream = object_store -//! .list(Some(&prefix)) -//! .await -//! .expect("Error listing files"); +//! let mut list_stream = object_store.list(Some(&prefix)); //! -//! // Print a line about each object based on its metadata -//! // using for_each from `StreamExt` trait. -//! list_stream -//! .for_each(move |meta| { -//! async { -//! let meta = meta.expect("Error listing"); -//! println!("Name: {}, size: {}", meta.location, meta.size); -//! } -//! }) -//! .await; +//! // Print a line about each object +//! while let Some(meta) = list_stream.next().await.transpose().unwrap() { +//! println!("Name: {}, size: {}", meta.location, meta.size); +//! } //! # } //! ``` //! @@ -147,19 +138,18 @@ //! from remote storage or files in the local filesystem as a stream. //! //! ``` +//! # use futures::TryStreamExt; //! # use object_store::local::LocalFileSystem; -//! # // use LocalFileSystem for example -//! # fn get_object_store() -> LocalFileSystem { -//! # LocalFileSystem::new_with_prefix("/tmp").unwrap() +//! # use std::sync::Arc; +//! # use object_store::{path::Path, ObjectStore}; +//! # fn get_object_store() -> Arc { +//! # Arc::new(LocalFileSystem::new()) //! # } -//! +//! # //! # async fn example() { -//! use std::sync::Arc; -//! use object_store::{path::Path, ObjectStore}; -//! use futures::stream::StreamExt; -//! +//! # //! // create an ObjectStore -//! let object_store: Arc = Arc::new(get_object_store()); +//! let object_store: Arc = get_object_store(); //! //! // Retrieve a specific file //! let path: Path = "data/file01.parquet".try_into().unwrap(); @@ -171,16 +161,11 @@ //! .unwrap() //! .into_stream(); //! -//! // Count the '0's using `map` from `StreamExt` trait +//! // Count the '0's using `try_fold` from `TryStreamExt` trait //! let num_zeros = stream -//! .map(|bytes| { -//! let bytes = bytes.unwrap(); -//! bytes.iter().filter(|b| **b == 0).count() -//! }) -//! .collect::>() -//! .await -//! .into_iter() -//! .sum::(); +//! .try_fold(0, |acc, bytes| async move { +//! Ok(acc + bytes.iter().filter(|b| **b == 0).count()) +//! }).await.unwrap(); //! //! println!("Num zeros in {} is {}", path, num_zeros); //! # } @@ -196,22 +181,19 @@ //! //! ``` //! # use object_store::local::LocalFileSystem; -//! # fn get_object_store() -> LocalFileSystem { -//! # LocalFileSystem::new_with_prefix("/tmp").unwrap() +//! # use object_store::ObjectStore; +//! # use std::sync::Arc; +//! # use bytes::Bytes; +//! # use object_store::path::Path; +//! # fn get_object_store() -> Arc { +//! # Arc::new(LocalFileSystem::new()) //! # } //! # async fn put() { -//! use object_store::ObjectStore; -//! use std::sync::Arc; -//! use bytes::Bytes; -//! use object_store::path::Path; -//! -//! let object_store: Arc = Arc::new(get_object_store()); +//! # +//! let object_store: Arc = get_object_store(); //! let path: Path = "data/file1".try_into().unwrap(); //! let bytes = Bytes::from_static(b"hello"); -//! object_store -//! .put(&path, bytes) -//! .await -//! .unwrap(); +//! object_store.put(&path, bytes).await.unwrap(); //! # } //! ``` //! @@ -220,22 +202,20 @@ //! //! ``` //! # use object_store::local::LocalFileSystem; -//! # fn get_object_store() -> LocalFileSystem { -//! # LocalFileSystem::new_with_prefix("/tmp").unwrap() +//! # use object_store::ObjectStore; +//! # use std::sync::Arc; +//! # use bytes::Bytes; +//! # use tokio::io::AsyncWriteExt; +//! # use object_store::path::Path; +//! # fn get_object_store() -> Arc { +//! # Arc::new(LocalFileSystem::new()) //! # } //! # async fn multi_upload() { -//! use object_store::ObjectStore; -//! use std::sync::Arc; -//! use bytes::Bytes; -//! use tokio::io::AsyncWriteExt; -//! use object_store::path::Path; -//! -//! let object_store: Arc = Arc::new(get_object_store()); +//! # +//! let object_store: Arc = get_object_store(); //! let path: Path = "data/large_file".try_into().unwrap(); -//! let (_id, mut writer) = object_store -//! .put_multipart(&path) -//! .await -//! .unwrap(); +//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap(); +//! //! let bytes = Bytes::from_static(b"hello"); //! writer.write_all(&bytes).await.unwrap(); //! writer.flush().await.unwrap(); @@ -439,23 +419,22 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// return Ok. If it is an error, it will be [`Error::NotFound`]. /// /// ``` + /// # use futures::{StreamExt, TryStreamExt}; /// # use object_store::local::LocalFileSystem; /// # async fn example() -> Result<(), Box> { /// # let root = tempfile::TempDir::new().unwrap(); /// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap(); - /// use object_store::{ObjectStore, ObjectMeta}; - /// use object_store::path::Path; - /// use futures::{StreamExt, TryStreamExt}; - /// use bytes::Bytes; - /// + /// # use object_store::{ObjectStore, ObjectMeta}; + /// # use object_store::path::Path; + /// # use futures::{StreamExt, TryStreamExt}; + /// # use bytes::Bytes; + /// # /// // Create two objects /// store.put(&Path::from("foo"), Bytes::from("foo")).await?; /// store.put(&Path::from("bar"), Bytes::from("bar")).await?; /// /// // List object - /// let locations = store.list(None).await? - /// .map(|meta: Result| meta.map(|m| m.location)) - /// .boxed(); + /// let locations = store.list(None).map_ok(|m| m.location).boxed(); /// /// // Delete them /// store.delete_stream(locations).try_collect::>().await?; @@ -484,10 +463,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// `foo/bar_baz/x`. /// /// Note: the order of returned [`ObjectMeta`] is not guaranteed - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>>; + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result>; /// List all the objects with the given prefix and a location greater than `offset` /// @@ -495,18 +471,15 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// the number of network requests required /// /// Note: the order of returned [`ObjectMeta`] is not guaranteed - async fn list_with_offset( + fn list_with_offset( &self, prefix: Option<&Path>, offset: &Path, - ) -> Result>> { + ) -> BoxStream<'_, Result> { let offset = offset.clone(); - let stream = self - .list(prefix) - .await? + self.list(prefix) .try_filter(move |f| futures::future::ready(f.location > offset)) - .boxed(); - Ok(stream) + .boxed() } /// List objects with the given prefix and an implementation specific @@ -624,19 +597,16 @@ macro_rules! as_ref_impl { self.as_ref().delete_stream(locations) } - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>> { - self.as_ref().list(prefix).await + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + self.as_ref().list(prefix) } - async fn list_with_offset( + fn list_with_offset( &self, prefix: Option<&Path>, offset: &Path, - ) -> Result>> { - self.as_ref().list_with_offset(prefix, offset).await + ) -> BoxStream<'_, Result> { + self.as_ref().list_with_offset(prefix, offset) } async fn list_with_delimiter( @@ -973,7 +943,6 @@ mod test_util { ) -> Result> { storage .list(prefix) - .await? .map_ok(|meta| meta.location) .try_collect::>() .await @@ -1264,11 +1233,7 @@ mod tests { ]; for (prefix, offset) in cases { - let s = storage - .list_with_offset(prefix.as_ref(), &offset) - .await - .unwrap(); - + let s = storage.list_with_offset(prefix.as_ref(), &offset); let mut actual: Vec<_> = s.map_ok(|x| x.location).try_collect().await.unwrap(); @@ -1700,12 +1665,7 @@ mod tests { } async fn delete_fixtures(storage: &DynObjectStore) { - let paths = storage - .list(None) - .await - .unwrap() - .map_ok(|meta| meta.location) - .boxed(); + let paths = storage.list(None).map_ok(|meta| meta.location).boxed(); storage .delete_stream(paths) .try_collect::>() @@ -1714,18 +1674,18 @@ mod tests { } /// Test that the returned stream does not borrow the lifetime of Path - async fn list_store<'a, 'b>( + fn list_store<'a>( store: &'a dyn ObjectStore, - path_str: &'b str, - ) -> super::Result>> { + path_str: &str, + ) -> BoxStream<'a, Result> { let path = Path::from(path_str); - store.list(Some(&path)).await + store.list(Some(&path)) } #[tokio::test] async fn test_list_lifetimes() { let store = memory::InMemory::new(); - let mut stream = list_store(&store, "path").await.unwrap(); + let mut stream = list_store(&store, "path"); assert!(stream.next().await.is_none()); } diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index a9b8c4b05020..00cbce023c3d 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -23,7 +23,7 @@ use crate::{ }; use async_trait::async_trait; use bytes::Bytes; -use futures::Stream; +use futures::{FutureExt, Stream}; use std::io::{Error, IoSlice}; use std::ops::Range; use std::pin::Pin; @@ -147,23 +147,31 @@ impl ObjectStore for LimitStore { self.inner.delete_stream(locations) } - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>> { - let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); - let s = self.inner.list(prefix).await?; - Ok(PermitWrapper::new(s, permit).boxed()) + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + let prefix = prefix.cloned(); + let fut = Arc::clone(&self.semaphore) + .acquire_owned() + .map(move |permit| { + let s = self.inner.list(prefix.as_ref()); + PermitWrapper::new(s, permit.unwrap()) + }); + fut.into_stream().flatten().boxed() } - async fn list_with_offset( + fn list_with_offset( &self, prefix: Option<&Path>, offset: &Path, - ) -> Result>> { - let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); - let s = self.inner.list_with_offset(prefix, offset).await?; - Ok(PermitWrapper::new(s, permit).boxed()) + ) -> BoxStream<'_, Result> { + let prefix = prefix.cloned(); + let offset = offset.clone(); + let fut = Arc::clone(&self.semaphore) + .acquire_owned() + .map(move |permit| { + let s = self.inner.list_with_offset(prefix.as_ref(), &offset); + PermitWrapper::new(s, permit.unwrap()) + }); + fut.into_stream().flatten().boxed() } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { @@ -272,6 +280,8 @@ mod tests { use crate::memory::InMemory; use crate::tests::*; use crate::ObjectStore; + use futures::stream::StreamExt; + use std::pin::Pin; use std::time::Duration; use tokio::time::timeout; @@ -290,19 +300,21 @@ mod tests { let mut streams = Vec::with_capacity(max_requests); for _ in 0..max_requests { - let stream = integration.list(None).await.unwrap(); + let mut stream = integration.list(None).peekable(); + Pin::new(&mut stream).peek().await; // Ensure semaphore is acquired streams.push(stream); } let t = Duration::from_millis(20); // Expect to not be able to make another request - assert!(timeout(t, integration.list(None)).await.is_err()); + let fut = integration.list(None).collect::>(); + assert!(timeout(t, fut).await.is_err()); // Drop one of the streams streams.pop(); // Can now make another request - integration.list(None).await.unwrap(); + integration.list(None).collect::>().await; } } diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 3d4a02a1e9e9..38467c3a9e7c 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -420,14 +420,14 @@ impl ObjectStore for LocalFileSystem { .await } - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { let config = Arc::clone(&self.config); let root_path = match prefix { - Some(prefix) => config.path_to_filesystem(prefix)?, + Some(prefix) => match config.path_to_filesystem(prefix) { + Ok(path) => path, + Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(), + }, None => self.config.root.to_file_path().unwrap(), }; @@ -457,36 +457,34 @@ impl ObjectStore for LocalFileSystem { // If no tokio context, return iterator directly as no // need to perform chunked spawn_blocking reads if tokio::runtime::Handle::try_current().is_err() { - return Ok(futures::stream::iter(s).boxed()); + return futures::stream::iter(s).boxed(); } // Otherwise list in batches of CHUNK_SIZE const CHUNK_SIZE: usize = 1024; let buffer = VecDeque::with_capacity(CHUNK_SIZE); - let stream = - futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move { - if buffer.is_empty() { - (s, buffer) = tokio::task::spawn_blocking(move || { - for _ in 0..CHUNK_SIZE { - match s.next() { - Some(r) => buffer.push_back(r), - None => break, - } + futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move { + if buffer.is_empty() { + (s, buffer) = tokio::task::spawn_blocking(move || { + for _ in 0..CHUNK_SIZE { + match s.next() { + Some(r) => buffer.push_back(r), + None => break, } - (s, buffer) - }) - .await?; - } - - match buffer.pop_front() { - Some(Err(e)) => Err(e), - Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))), - None => Ok(None), - } - }); + } + (s, buffer) + }) + .await?; + } - Ok(stream.boxed()) + match buffer.pop_front() { + Some(Err(e)) => Err(e), + Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))), + None => Ok(None), + } + }) + .boxed() } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { @@ -1138,21 +1136,14 @@ mod tests { let store = LocalFileSystem::new_with_prefix(root.path()).unwrap(); - // `list` must fail - match store.list(None).await { - Err(_) => { - // ok, error found - } - Ok(mut stream) => { - let mut any_err = false; - while let Some(res) = stream.next().await { - if res.is_err() { - any_err = true; - } - } - assert!(any_err); + let mut stream = store.list(None); + let mut any_err = false; + while let Some(res) = stream.next().await { + if res.is_err() { + any_err = true; } } + assert!(any_err); // `list_with_delimiter assert!(store.list_with_delimiter(None).await.is_err()); @@ -1226,13 +1217,7 @@ mod tests { prefix: Option<&Path>, expected: &[&str], ) { - let result: Vec<_> = integration - .list(prefix) - .await - .unwrap() - .try_collect() - .await - .unwrap(); + let result: Vec<_> = integration.list(prefix).try_collect().await.unwrap(); let mut strings: Vec<_> = result.iter().map(|x| x.location.as_ref()).collect(); strings.sort_unstable(); @@ -1428,8 +1413,7 @@ mod tests { std::fs::write(temp_dir.path().join(filename), "foo").unwrap(); - let list_stream = integration.list(None).await.unwrap(); - let res: Vec<_> = list_stream.try_collect().await.unwrap(); + let res: Vec<_> = integration.list(None).try_collect().await.unwrap(); assert_eq!(res.len(), 1); assert_eq!(res[0].location.as_ref(), filename); diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index f638ed6d7a55..00b330b5eb94 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -228,10 +228,7 @@ impl ObjectStore for InMemory { Ok(()) } - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { let root = Path::default(); let prefix = prefix.unwrap_or(&root); @@ -256,7 +253,7 @@ impl ObjectStore for InMemory { }) .collect(); - Ok(futures::stream::iter(values).boxed()) + futures::stream::iter(values).boxed() } /// The memory implementation returns all results, as opposed to the cloud diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index 39585f73b692..3776dec2e872 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -144,24 +144,21 @@ impl ObjectStore for PrefixStore { self.inner.delete(&full_path).await } - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { let prefix = self.full_path(prefix.unwrap_or(&Path::default())); - let s = self.inner.list(Some(&prefix)).await?; - Ok(s.map_ok(|meta| self.strip_meta(meta)).boxed()) + let s = self.inner.list(Some(&prefix)); + s.map_ok(|meta| self.strip_meta(meta)).boxed() } - async fn list_with_offset( + fn list_with_offset( &self, prefix: Option<&Path>, offset: &Path, - ) -> Result>> { + ) -> BoxStream<'_, Result> { let offset = self.full_path(offset); let prefix = self.full_path(prefix.unwrap_or(&Path::default())); - let s = self.inner.list_with_offset(Some(&prefix), &offset).await?; - Ok(s.map_ok(|meta| self.strip_meta(meta)).boxed()) + let s = self.inner.list_with_offset(Some(&prefix), &offset); + s.map_ok(|meta| self.strip_meta(meta)).boxed() } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index 58c476ab4530..f716a11f8a05 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -233,29 +233,30 @@ impl ObjectStore for ThrottledStore { self.inner.delete(location).await } - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>> { - sleep(self.config().wait_list_per_call).await; - - // need to copy to avoid moving / referencing `self` - let wait_list_per_entry = self.config().wait_list_per_entry; - let stream = self.inner.list(prefix).await?; - Ok(throttle_stream(stream, move |_| wait_list_per_entry)) + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + let stream = self.inner.list(prefix); + futures::stream::once(async move { + let wait_list_per_entry = self.config().wait_list_per_entry; + sleep(self.config().wait_list_per_call).await; + throttle_stream(stream, move |_| wait_list_per_entry) + }) + .flatten() + .boxed() } - async fn list_with_offset( + fn list_with_offset( &self, prefix: Option<&Path>, offset: &Path, - ) -> Result>> { - sleep(self.config().wait_list_per_call).await; - - // need to copy to avoid moving / referencing `self` - let wait_list_per_entry = self.config().wait_list_per_entry; - let stream = self.inner.list_with_offset(prefix, offset).await?; - Ok(throttle_stream(stream, move |_| wait_list_per_entry)) + ) -> BoxStream<'_, Result> { + let stream = self.inner.list_with_offset(prefix, offset); + futures::stream::once(async move { + let wait_list_per_entry = self.config().wait_list_per_entry; + sleep(self.config().wait_list_per_call).await; + throttle_stream(stream, move |_| wait_list_per_entry) + }) + .flatten() + .boxed() } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { @@ -511,13 +512,7 @@ mod tests { let prefix = Path::from("foo"); // clean up store - let entries: Vec<_> = store - .list(Some(&prefix)) - .await - .unwrap() - .try_collect() - .await - .unwrap(); + let entries: Vec<_> = store.list(Some(&prefix)).try_collect().await.unwrap(); for entry in entries { store.delete(&entry.location).await.unwrap(); @@ -583,8 +578,6 @@ mod tests { let t0 = Instant::now(); store .list(Some(&prefix)) - .await - .unwrap() .try_collect::>() .await .unwrap(); diff --git a/object_store/tests/get_range_file.rs b/object_store/tests/get_range_file.rs index f926e3b07f2a..25c469260675 100644 --- a/object_store/tests/get_range_file.rs +++ b/object_store/tests/get_range_file.rs @@ -75,10 +75,7 @@ impl ObjectStore for MyStore { todo!() } - async fn list( - &self, - _: Option<&Path>, - ) -> object_store::Result>> { + fn list(&self, _: Option<&Path>) -> BoxStream<'_, object_store::Result> { todo!() }