diff --git a/crates/polars-io/src/cloud/adaptors.rs b/crates/polars-io/src/cloud/adaptors.rs index d23d36d23383..a77800bbd54e 100644 --- a/crates/polars-io/src/cloud/adaptors.rs +++ b/crates/polars-io/src/cloud/adaptors.rs @@ -81,7 +81,10 @@ impl CloudWriter { let (cloud_location, object_store) = crate::cloud::build_object_store(uri, cloud_options, false).await?; - Self::new_with_object_store(object_store, object_path_from_str(&cloud_location.prefix)?) + Self::new_with_object_store( + object_store.to_dyn_object_store().await, + object_path_from_str(&cloud_location.prefix)?, + ) } pub fn close(&mut self) -> PolarsResult<()> { diff --git a/crates/polars-io/src/cloud/glob.rs b/crates/polars-io/src/cloud/glob.rs index 83b21a9b2b73..c55d7767f39f 100644 --- a/crates/polars-io/src/cloud/glob.rs +++ b/crates/polars-io/src/cloud/glob.rs @@ -3,6 +3,7 @@ use object_store::path::Path; use polars_core::error::to_compute_err; use polars_core::prelude::{polars_ensure, polars_err}; use polars_error::PolarsResult; +use polars_utils::pl_str::PlSmallStr; use regex::Regex; use url::Url; @@ -74,13 +75,13 @@ pub(crate) fn extract_prefix_expansion(url: &str) -> PolarsResult<(String, Optio #[derive(PartialEq, Debug, Default)] pub struct CloudLocation { /// The scheme (s3, ...). - pub scheme: String, + pub scheme: PlSmallStr, /// The bucket name. - pub bucket: String, + pub bucket: PlSmallStr, /// The prefix inside the bucket, this will be the full key when wildcards are not used. pub prefix: String, /// The path components that need to be expanded. - pub expansion: Option, + pub expansion: Option, } impl CloudLocation { @@ -102,7 +103,8 @@ impl CloudLocation { .ok_or_else( || polars_err!(ComputeError: "cannot parse bucket (host) from url: {}", parsed), )? - .to_string(); + .to_string() + .into(); (bucket, key) }; @@ -114,9 +116,9 @@ impl CloudLocation { if is_local && key.starts_with(DELIMITER) { prefix.insert(0, DELIMITER); } - (prefix, expansion) + (prefix, expansion.map(|x| x.into())) } else { - (key.to_string(), None) + (key.as_ref().into(), None) }; Ok(CloudLocation { @@ -155,7 +157,7 @@ impl Matcher { } pub(crate) fn is_matching(&self, key: &str) -> bool { - if !key.starts_with(&self.prefix) { + if !key.starts_with(self.prefix.as_str()) { // Prefix does not match, should not happen. return false; } @@ -183,23 +185,35 @@ pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResu let matcher = &Matcher::new( if scheme == "file" { // For local paths the returned location has the leading slash stripped. - prefix[1..].to_string() + prefix[1..].into() } else { prefix.clone() }, expansion.as_deref(), )?; + let path = Path::from(prefix.as_str()); + let path = Some(&path); + let mut locations = store - .list(Some(&Path::from(prefix))) - .try_filter_map(|x| async move { - let out = - (x.size > 0 && matcher.is_matching(x.location.as_ref())).then_some(x.location); - Ok(out) + .try_exec_rebuild_on_err(|store| { + let st = store.clone(); + + async { + let store = st; + store + .list(path) + .try_filter_map(|x| async move { + let out = (x.size > 0 && matcher.is_matching(x.location.as_ref())) + .then_some(x.location); + Ok(out) + }) + .try_collect::>() + .await + .map_err(to_compute_err) + } }) - .try_collect::>() - .await - .map_err(to_compute_err)?; + .await?; locations.sort_unstable(); Ok(locations diff --git a/crates/polars-io/src/cloud/object_store_setup.rs b/crates/polars-io/src/cloud/object_store_setup.rs index 0bb33b333c18..22e666a8198b 100644 --- a/crates/polars-io/src/cloud/object_store_setup.rs +++ b/crates/polars-io/src/cloud/object_store_setup.rs @@ -6,23 +6,22 @@ use once_cell::sync::Lazy; use polars_core::config; use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult}; use polars_utils::aliases::PlHashMap; +use polars_utils::pl_str::PlSmallStr; use tokio::sync::RwLock; use url::Url; -use super::{parse_url, CloudLocation, CloudOptions, CloudType}; +use super::{parse_url, CloudLocation, CloudOptions, CloudType, PolarsObjectStore}; use crate::cloud::CloudConfig; /// Object stores must be cached. Every object-store will do DNS lookups and /// get rate limited when querying the DNS (can take up to 5s). /// Other reasons are connection pools that must be shared between as much as possible. #[allow(clippy::type_complexity)] -static OBJECT_STORE_CACHE: Lazy>>> = +static OBJECT_STORE_CACHE: Lazy>> = Lazy::new(Default::default); -type BuildResult = PolarsResult<(CloudLocation, Arc)>; - #[allow(dead_code)] -fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult { +fn err_missing_feature(feature: &str, scheme: &str) -> PolarsResult> { polars_bail!( ComputeError: "feature '{}' must be enabled in order to use '{}' cloud urls", feature, scheme, @@ -64,6 +63,7 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String { }, )) .unwrap(); + format!( "{}://{}<\\creds\\>{}", url.scheme(), @@ -77,6 +77,126 @@ pub fn object_path_from_str(path: &str) -> PolarsResult, +} + +impl PolarsObjectStoreBuilder { + pub(super) async fn build_impl(&self) -> PolarsResult> { + let options = self + .options + .as_ref() + .unwrap_or_else(|| CloudOptions::default_static_ref()); + + let store = match self.cloud_type { + CloudType::Aws => { + #[cfg(feature = "aws")] + { + let store = options.build_aws(&self.url).await?; + Ok::<_, PolarsError>(Arc::new(store) as Arc) + } + #[cfg(not(feature = "aws"))] + return err_missing_feature("aws", &self.scheme); + }, + CloudType::Gcp => { + #[cfg(feature = "gcp")] + { + let store = options.build_gcp(&self.url)?; + Ok::<_, PolarsError>(Arc::new(store) as Arc) + } + #[cfg(not(feature = "gcp"))] + return err_missing_feature("gcp", &self.scheme); + }, + CloudType::Azure => { + { + #[cfg(feature = "azure")] + { + let store = options.build_azure(&self.url)?; + Ok::<_, PolarsError>(Arc::new(store) as Arc) + } + } + #[cfg(not(feature = "azure"))] + return err_missing_feature("azure", &self.scheme); + }, + CloudType::File => { + let local = LocalFileSystem::new(); + Ok::<_, PolarsError>(Arc::new(local) as Arc) + }, + CloudType::Http => { + { + #[cfg(feature = "http")] + { + let store = options.build_http(&self.url)?; + PolarsResult::Ok(Arc::new(store) as Arc) + } + } + #[cfg(not(feature = "http"))] + return err_missing_feature("http", &cloud_location.scheme); + }, + CloudType::Hf => panic!("impl error: unresolved hf:// path"), + }?; + + Ok(store) + } + + /// Note: Use `build_impl` for a non-caching version. + pub(super) async fn build(self) -> PolarsResult { + let opt_cache_key = match &self.cloud_type { + CloudType::Aws | CloudType::Gcp | CloudType::Azure => Some(url_and_creds_to_key( + &self.parsed_url, + self.options.as_ref(), + )), + CloudType::File | CloudType::Http | CloudType::Hf => None, + }; + + let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() { + let cache = OBJECT_STORE_CACHE.read().await; + + if let Some(store) = cache.get(cache_key) { + return Ok(store.clone()); + } + + drop(cache); + + let cache = OBJECT_STORE_CACHE.write().await; + + if let Some(store) = cache.get(cache_key) { + return Ok(store.clone()); + } + + Some(cache) + } else { + None + }; + + let store = self.build_impl().await?; + let store = PolarsObjectStore::new_from_inner(store, self); + + if let Some(mut cache) = opt_cache_write_guard { + // Clear the cache if we surpass a certain amount of buckets. + if cache.len() >= 8 { + if config::verbose() { + eprintln!( + "build_object_store: clearing store cache (cache.len(): {})", + cache.len() + ); + } + cache.clear() + } + + cache.insert(opt_cache_key.unwrap(), store.clone()); + } + + Ok(store) + } +} + /// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store. pub async fn build_object_store( url: &str, @@ -86,88 +206,21 @@ pub async fn build_object_store( )] options: Option<&CloudOptions>, glob: bool, -) -> BuildResult { +) -> PolarsResult<(CloudLocation, PolarsObjectStore)> { let parsed = parse_url(url).map_err(to_compute_err)?; let cloud_location = CloudLocation::from_url(&parsed, glob)?; + let cloud_type = CloudType::from_url(&parsed)?; - // FIXME: `credential_provider` is currently serializing the entire Python function here - // into a string with pickle for this cache key because we are using `serde_json::to_string` - let key = url_and_creds_to_key(&parsed, options); - let mut allow_cache = true; - - { - let cache = OBJECT_STORE_CACHE.read().await; - if let Some(store) = cache.get(&key) { - return Ok((cloud_location, store.clone())); - } + let store = PolarsObjectStoreBuilder { + url: url.into(), + parsed_url: parsed, + scheme: cloud_location.scheme.as_str().into(), + cloud_type, + options: options.cloned(), } + .build() + .await?; - let options = options.map(std::borrow::Cow::Borrowed).unwrap_or_default(); - - let cloud_type = CloudType::from_url(&parsed)?; - let store = match cloud_type { - CloudType::Aws => { - #[cfg(feature = "aws")] - { - let store = options.build_aws(url).await?; - Ok::<_, PolarsError>(Arc::new(store) as Arc) - } - #[cfg(not(feature = "aws"))] - return err_missing_feature("aws", &cloud_location.scheme); - }, - CloudType::Gcp => { - #[cfg(feature = "gcp")] - { - let store = options.build_gcp(url)?; - Ok::<_, PolarsError>(Arc::new(store) as Arc) - } - #[cfg(not(feature = "gcp"))] - return err_missing_feature("gcp", &cloud_location.scheme); - }, - CloudType::Azure => { - { - #[cfg(feature = "azure")] - { - let store = options.build_azure(url)?; - Ok::<_, PolarsError>(Arc::new(store) as Arc) - } - } - #[cfg(not(feature = "azure"))] - return err_missing_feature("azure", &cloud_location.scheme); - }, - CloudType::File => { - allow_cache = false; - let local = LocalFileSystem::new(); - Ok::<_, PolarsError>(Arc::new(local) as Arc) - }, - CloudType::Http => { - { - allow_cache = false; - #[cfg(feature = "http")] - { - let store = options.build_http(url)?; - PolarsResult::Ok(Arc::new(store) as Arc) - } - } - #[cfg(not(feature = "http"))] - return err_missing_feature("http", &cloud_location.scheme); - }, - CloudType::Hf => panic!("impl error: unresolved hf:// path"), - }?; - if allow_cache { - let mut cache = OBJECT_STORE_CACHE.write().await; - // Clear the cache if we surpass a certain amount of buckets. - if cache.len() > 8 { - if config::verbose() { - eprintln!( - "build_object_store: clearing store cache (cache.len(): {})", - cache.len() - ); - } - cache.clear() - } - cache.insert(key, store.clone()); - } Ok((cloud_location, store)) } diff --git a/crates/polars-io/src/cloud/options.rs b/crates/polars-io/src/cloud/options.rs index d45f2a3166a7..de5f8903eabd 100644 --- a/crates/polars-io/src/cloud/options.rs +++ b/crates/polars-io/src/cloud/options.rs @@ -20,7 +20,6 @@ pub use object_store::gcp::GoogleConfigKey; use object_store::ClientOptions; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] use object_store::{BackoffConfig, RetryConfig}; -#[cfg(feature = "aws")] use once_cell::sync::Lazy; use polars_error::*; #[cfg(feature = "aws")] @@ -83,14 +82,22 @@ pub struct CloudOptions { impl Default for CloudOptions { fn default() -> Self { - Self { + Self::default_static_ref().clone() + } +} + +impl CloudOptions { + pub fn default_static_ref() -> &'static Self { + static DEFAULT: Lazy = Lazy::new(|| CloudOptions { max_retries: 2, #[cfg(feature = "file_cache")] file_cache_ttl: get_env_file_cache_ttl(), config: None, #[cfg(feature = "cloud")] - credential_provider: Default::default(), - } + credential_provider: None, + }); + + &DEFAULT } } @@ -131,7 +138,7 @@ where .collect::>()) } -#[derive(PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub enum CloudType { Aws, Azure, @@ -361,7 +368,7 @@ impl CloudOptions { let region = std::str::from_utf8(region.as_bytes()).map_err(to_compute_err)?; let mut bucket_region = BUCKET_REGION.lock().unwrap(); - bucket_region.insert(bucket.into(), region.into()); + bucket_region.insert(bucket, region.into()); builder = builder.with_config(AmazonS3ConfigKey::Region, region) } } diff --git a/crates/polars-io/src/cloud/polars_object_store.rs b/crates/polars-io/src/cloud/polars_object_store.rs index 084408e8bc41..5769cc383b3a 100644 --- a/crates/polars-io/src/cloud/polars_object_store.rs +++ b/crates/polars-io/src/cloud/polars_object_store.rs @@ -1,5 +1,4 @@ use std::ops::Range; -use std::sync::Arc; use bytes::Bytes; use futures::{StreamExt, TryStreamExt}; @@ -7,27 +6,134 @@ use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; use polars_core::prelude::{InitHashMaps, PlHashMap}; use polars_error::{to_compute_err, PolarsError, PolarsResult}; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use crate::pl_async::{ self, get_concurrency_limit, get_download_chunk_size, tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST, }; -/// Polars specific wrapper for `Arc` that limits the number of -/// concurrent requests for the entire application. -#[derive(Debug, Clone)] -pub struct PolarsObjectStore(Arc); -pub type ObjectStorePath = object_store::path::Path; +mod inner { + use std::future::Future; + use std::sync::atomic::AtomicBool; + use std::sync::Arc; -impl PolarsObjectStore { - pub fn new(store: Arc) -> Self { - Self(store) + use object_store::ObjectStore; + use polars_core::config; + use polars_error::PolarsResult; + + use crate::cloud::PolarsObjectStoreBuilder; + + #[derive(Debug)] + struct Inner { + store: tokio::sync::Mutex>, + builder: PolarsObjectStoreBuilder, + } + + /// Polars wrapper around [`ObjectStore`] functionality. This struct is cheaply cloneable. + #[derive(Debug)] + pub struct PolarsObjectStore { + inner: Arc, + /// Avoid contending the Mutex `lock()` until the first re-build. + initial_store: std::sync::Arc, + /// Used for interior mutability. Doesn't need to be shared with other threads so it's not + /// inside `Arc<>`. + rebuilt: AtomicBool, } + impl Clone for PolarsObjectStore { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + initial_store: self.initial_store.clone(), + rebuilt: AtomicBool::new(self.rebuilt.load(std::sync::atomic::Ordering::Relaxed)), + } + } + } + + impl PolarsObjectStore { + pub(crate) fn new_from_inner( + store: Arc, + builder: PolarsObjectStoreBuilder, + ) -> Self { + let initial_store = store.clone(); + Self { + inner: Arc::new(Inner { + store: tokio::sync::Mutex::new(store), + builder, + }), + initial_store, + rebuilt: AtomicBool::new(false), + } + } + + /// Gets the underlying [`ObjectStore`] implementation. + pub async fn to_dyn_object_store(&self) -> Arc { + if !self.rebuilt.load(std::sync::atomic::Ordering::Relaxed) { + self.initial_store.clone() + } else { + self.inner.store.lock().await.clone() + } + } + + pub async fn rebuild_inner( + &self, + from_version: &Arc, + ) -> PolarsResult> { + let mut current_store = self.inner.store.lock().await; + + self.rebuilt + .store(true, std::sync::atomic::Ordering::Relaxed); + + // If this does not eq, then `inner` was already re-built by another thread. + if Arc::ptr_eq(&*current_store, from_version) { + *current_store = self.inner.builder.clone().build_impl().await.map_err(|e| { + e.wrap_msg(|e| format!("attempt to rebuild object store failed: {}", e)) + })?; + } + + Ok((*current_store).clone()) + } + + pub async fn try_exec_rebuild_on_err(&self, mut func: Fn) -> PolarsResult + where + Fn: FnMut(&Arc) -> Fut, + Fut: Future>, + { + let store = self.to_dyn_object_store().await; + + let out = func(&store).await; + + let orig_err = match out { + Ok(v) => return Ok(v), + Err(e) => e, + }; + + if config::verbose() { + eprintln!( + "[PolarsObjectStore]: got error: {}, will attempt re-build", + &orig_err + ); + } + + let store = self + .rebuild_inner(&store) + .await + .map_err(|e| e.wrap_msg(|e| format!("{}; original error: {}", e, orig_err)))?; + + func(&store).await + } + } +} + +pub use inner::PolarsObjectStore; + +pub type ObjectStorePath = object_store::path::Path; + +impl PolarsObjectStore { /// Returns a buffered stream that downloads concurrently up to the concurrency limit. fn get_buffered_ranges_stream<'a, T: Iterator>>( - &'a self, + store: &'a dyn ObjectStore, path: &'a Path, ranges: T, ) -> impl StreamExt> @@ -35,39 +141,48 @@ impl PolarsObjectStore { + use<'a, T> { futures::stream::iter( ranges - .map(|range| async { self.0.get_range(path, range).await.map_err(to_compute_err) }), + .map(|range| async { store.get_range(path, range).await.map_err(to_compute_err) }), ) // Add a limit locally as this gets run inside a single `tune_with_concurrency_budget`. .buffered(get_concurrency_limit() as usize) } pub async fn get_range(&self, path: &Path, range: Range) -> PolarsResult { - let parts = split_range(range.clone()); + self.try_exec_rebuild_on_err(move |store| { + let range = range.clone(); + let st = store.clone(); + + async { + let store = st; + let parts = split_range(range.clone()); + + if parts.len() == 1 { + tune_with_concurrency_budget(1, || async { store.get_range(path, range).await }) + .await + .map_err(to_compute_err) + } else { + let parts = tune_with_concurrency_budget( + parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32, + || { + Self::get_buffered_ranges_stream(&store, path, parts) + .try_collect::>() + }, + ) + .await?; - if parts.len() == 1 { - tune_with_concurrency_budget(1, || self.0.get_range(path, range)) - .await - .map_err(to_compute_err) - } else { - let parts = tune_with_concurrency_budget( - parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32, - || { - self.get_buffered_ranges_stream(path, parts) - .try_collect::>() - }, - ) - .await?; - - let mut combined = Vec::with_capacity(range.len()); - - for part in parts { - combined.extend_from_slice(&part) - } + let mut combined = Vec::with_capacity(range.len()); - assert_eq!(combined.len(), range.len()); + for part in parts { + combined.extend_from_slice(&part) + } - PolarsResult::Ok(Bytes::from(combined)) - } + assert_eq!(combined.len(), range.len()); + + PolarsResult::Ok(Bytes::from(combined)) + } + } + }) + .await } /// Fetch byte ranges into a HashMap keyed by the range start. This will mutably sort the @@ -87,153 +202,183 @@ impl PolarsObjectStore { return Ok(Default::default()); } - let mut out = PlHashMap::with_capacity(ranges.len()); - ranges.sort_unstable_by_key(|x| x.start); + let ranges_len = ranges.len(); let (merged_ranges, merged_ends): (Vec<_>, Vec<_>) = merge_ranges(ranges).unzip(); - let mut stream = self.get_buffered_ranges_stream(path, merged_ranges.iter().cloned()); + self.try_exec_rebuild_on_err(|store| { + let st = store.clone(); - tune_with_concurrency_budget( - merged_ranges.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32, - || async { - let mut len = 0; - let mut current_offset = 0; - let mut ends_iter = merged_ends.iter(); + async { + let store = st; + let mut out = PlHashMap::with_capacity(ranges_len); - let mut splitted_parts = vec![]; + let mut stream = + Self::get_buffered_ranges_stream(&store, path, merged_ranges.iter().cloned()); - while let Some(bytes) = stream.try_next().await? { - len += bytes.len(); - let end = *ends_iter.next().unwrap(); + tune_with_concurrency_budget( + merged_ranges.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32, + || async { + let mut len = 0; + let mut current_offset = 0; + let mut ends_iter = merged_ends.iter(); - if end == 0 { - splitted_parts.push(bytes); - continue; - } + let mut splitted_parts = vec![]; - let full_range = ranges[current_offset..end] - .iter() - .cloned() - .reduce(|l, r| l.start.min(r.start)..l.end.max(r.end)) - .unwrap(); + while let Some(bytes) = stream.try_next().await? { + len += bytes.len(); + let end = *ends_iter.next().unwrap(); - let bytes = if splitted_parts.is_empty() { - bytes - } else { - let mut out = Vec::with_capacity(full_range.len()); + if end == 0 { + splitted_parts.push(bytes); + continue; + } - for x in splitted_parts.drain(..) { - out.extend_from_slice(&x); - } + let full_range = ranges[current_offset..end] + .iter() + .cloned() + .reduce(|l, r| l.start.min(r.start)..l.end.max(r.end)) + .unwrap(); - out.extend_from_slice(&bytes); - Bytes::from(out) - }; + let bytes = if splitted_parts.is_empty() { + bytes + } else { + let mut out = Vec::with_capacity(full_range.len()); - assert_eq!(bytes.len(), full_range.len()); + for x in splitted_parts.drain(..) { + out.extend_from_slice(&x); + } - for range in &ranges[current_offset..end] { - let v = out.insert( - K::try_from(range.start).unwrap(), - T::from(bytes.slice( - range.start - full_range.start..range.end - full_range.start, - )), - ); + out.extend_from_slice(&bytes); + Bytes::from(out) + }; - assert!(v.is_none()); // duplicate range start - } + assert_eq!(bytes.len(), full_range.len()); - current_offset = end; - } + for range in &ranges[current_offset..end] { + let v = out.insert( + K::try_from(range.start).unwrap(), + T::from(bytes.slice( + range.start - full_range.start + ..range.end - full_range.start, + )), + ); - assert!(splitted_parts.is_empty()); + assert!(v.is_none()); // duplicate range start + } - PolarsResult::Ok(pl_async::Size::from(len as u64)) - }, - ) - .await?; + current_offset = end; + } - Ok(out) + assert!(splitted_parts.is_empty()); + + PolarsResult::Ok(pl_async::Size::from(len as u64)) + }, + ) + .await?; + + Ok(out) + } + }) + .await } pub async fn download(&self, path: &Path, file: &mut tokio::fs::File) -> PolarsResult<()> { let opt_size = self.head(path).await.ok().map(|x| x.size); - let parts = opt_size.map(|x| split_range(0..x)).filter(|x| x.len() > 1); - - if let Some(parts) = parts { - tune_with_concurrency_budget( - parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32, - || async { - let mut stream = self.get_buffered_ranges_stream(path, parts); - let mut len = 0; - while let Some(bytes) = stream.try_next().await? { - len += bytes.len(); - file.write_all(&bytes).await.map_err(to_compute_err)?; - } - assert_eq!(len, opt_size.unwrap()); + let initial_pos = file.stream_position().await?; - PolarsResult::Ok(pl_async::Size::from(len as u64)) - }, - ) - .await? - } else { - tune_with_concurrency_budget(1, || async { - let mut stream = self - .0 - .get(path) - .await - .map_err(to_compute_err)? - .into_stream(); - - let mut len = 0; - while let Some(bytes) = stream.try_next().await? { - len += bytes.len(); - file.write_all(&bytes).await.map_err(to_compute_err)?; - } + self.try_exec_rebuild_on_err(|store| { + let st = store.clone(); - PolarsResult::Ok(pl_async::Size::from(len as u64)) - }) - .await? - }; + // Workaround for "can't move captured variable". + let file: &mut tokio::fs::File = unsafe { std::mem::transmute_copy(&file) }; - // Dropping is delayed for tokio async files so we need to explicitly - // flush here (https://github.com/tokio-rs/tokio/issues/2307#issuecomment-596336451). - file.sync_all().await.map_err(PolarsError::from)?; + async { + file.set_len(initial_pos).await?; // Reset if this function was called again. - Ok(()) - } + let store = st; + let parts = opt_size.map(|x| split_range(0..x)).filter(|x| x.len() > 1); - /// Fetch the metadata of the parquet file, do not memoize it. - pub async fn head(&self, path: &Path) -> PolarsResult { - with_concurrency_budget(1, || async { - let head_result = self.0.head(path).await; - - if head_result.is_err() { - // Pre-signed URLs forbid the HEAD method, but we can still retrieve the header - // information with a range 0-0 request. - let get_range_0_0_result = self - .0 - .get_opts( - path, - object_store::GetOptions { - range: Some((0..1).into()), - ..Default::default() + if let Some(parts) = parts { + tune_with_concurrency_budget( + parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32, + || async { + let mut stream = Self::get_buffered_ranges_stream(&store, path, parts); + let mut len = 0; + while let Some(bytes) = stream.try_next().await? { + len += bytes.len(); + file.write_all(&bytes).await.map_err(to_compute_err)?; + } + + assert_eq!(len, opt_size.unwrap()); + + PolarsResult::Ok(pl_async::Size::from(len as u64)) }, ) - .await; + .await? + } else { + tune_with_concurrency_budget(1, || async { + let mut stream = + store.get(path).await.map_err(to_compute_err)?.into_stream(); + + let mut len = 0; + while let Some(bytes) = stream.try_next().await? { + len += bytes.len(); + file.write_all(&bytes).await.map_err(to_compute_err)?; + } - if let Ok(v) = get_range_0_0_result { - return Ok(v.meta); - } + PolarsResult::Ok(pl_async::Size::from(len as u64)) + }) + .await? + }; + + // Dropping is delayed for tokio async files so we need to explicitly + // flush here (https://github.com/tokio-rs/tokio/issues/2307#issuecomment-596336451). + file.sync_all().await.map_err(PolarsError::from)?; + + Ok(()) } + }) + .await + } - head_result + /// Fetch the metadata of the parquet file, do not memoize it. + pub async fn head(&self, path: &Path) -> PolarsResult { + self.try_exec_rebuild_on_err(|store| { + let st = store.clone(); + + async { + with_concurrency_budget(1, || async { + let store = st; + let head_result = store.head(path).await; + + if head_result.is_err() { + // Pre-signed URLs forbid the HEAD method, but we can still retrieve the header + // information with a range 0-0 request. + let get_range_0_0_result = store + .get_opts( + path, + object_store::GetOptions { + range: Some((0..1).into()), + ..Default::default() + }, + ) + .await; + + if let Ok(v) = get_range_0_0_result { + return Ok(v.meta); + } + } + + head_result + }) + .await + .map_err(to_compute_err) + } }) .await - .map_err(to_compute_err) } } diff --git a/crates/polars-io/src/file_cache/utils.rs b/crates/polars-io/src/file_cache/utils.rs index a7fe3396ce3c..a7fe5286897d 100644 --- a/crates/polars-io/src/file_cache/utils.rs +++ b/crates/polars-io/src/file_cache/utils.rs @@ -8,9 +8,7 @@ use polars_error::{PolarsError, PolarsResult}; use super::cache::{get_env_file_cache_ttl, FILE_CACHE}; use super::entry::FileCacheEntry; use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher}; -use crate::cloud::{ - build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore, -}; +use crate::cloud::{build_object_store, object_path_from_str, CloudLocation, CloudOptions}; use crate::path_utils::{ensure_directory_init, is_cloud_url, POLARS_TEMP_DIR_BASE_PATH}; use crate::pl_async; @@ -77,7 +75,7 @@ pub fn init_entries_from_uri_list( .map(|i| async move { let (_, object_store) = build_object_store(&uri_list[i], cloud_options, false).await?; - PolarsResult::Ok(PolarsObjectStore::new(object_store)) + PolarsResult::Ok(object_store) }), ) .await diff --git a/crates/polars-io/src/ipc/ipc_reader_async.rs b/crates/polars-io/src/ipc/ipc_reader_async.rs index 089dceaf9a8a..a6626bd3a04f 100644 --- a/crates/polars-io/src/ipc/ipc_reader_async.rs +++ b/crates/polars-io/src/ipc/ipc_reader_async.rs @@ -74,7 +74,7 @@ impl IpcReaderAsync { let path = object_path_from_str(&prefix)?; Ok(Self { - store: PolarsObjectStore::new(store), + store, cache_entry, path, }) diff --git a/crates/polars-io/src/parquet/read/async_impl.rs b/crates/polars-io/src/parquet/read/async_impl.rs index 053aad67464a..bf35337a313d 100644 --- a/crates/polars-io/src/parquet/read/async_impl.rs +++ b/crates/polars-io/src/parquet/read/async_impl.rs @@ -42,7 +42,7 @@ impl ParquetObjectStore { let path = object_path_from_str(&prefix)?; Ok(ParquetObjectStore { - store: PolarsObjectStore::new(store), + store, path, length: None, metadata, diff --git a/crates/polars-io/src/path_utils/hugging_face.rs b/crates/polars-io/src/path_utils/hugging_face.rs index 4ea6795dca2c..8e6f8477360e 100644 --- a/crates/polars-io/src/path_utils/hugging_face.rs +++ b/crates/polars-io/src/path_utils/hugging_face.rs @@ -258,7 +258,7 @@ pub(super) async fn expand_paths_hf( (path_parts.path.clone(), None) }; let expansion_matcher = &if expansion.is_some() { - Some(Matcher::new(prefix.clone(), expansion.as_deref())?) + Some(Matcher::new(prefix.as_str().into(), expansion.as_deref())?) } else { None }; diff --git a/crates/polars-io/src/path_utils/mod.rs b/crates/polars-io/src/path_utils/mod.rs index 6817de4a71cf..b6612e553039 100644 --- a/crates/polars-io/src/path_utils/mod.rs +++ b/crates/polars-io/src/path_utils/mod.rs @@ -272,22 +272,31 @@ pub fn expand_paths_hive( let cloud_location = &cloud_location; let mut paths = store - .list(Some(&prefix)) - .try_filter_map(|x| async move { - let out = (x.size > 0).then(|| { - PathBuf::from({ - format_path( - &cloud_location.scheme, - &cloud_location.bucket, - x.location.as_ref(), - ) - }) - }); - Ok(out) + .try_exec_rebuild_on_err(|store| { + let st = store.clone(); + + async { + let store = st; + store + .list(Some(&prefix)) + .try_filter_map(|x| async move { + let out = (x.size > 0).then(|| { + PathBuf::from({ + format_path( + &cloud_location.scheme, + &cloud_location.bucket, + x.location.as_ref(), + ) + }) + }); + Ok(out) + }) + .try_collect::>() + .await + .map_err(to_compute_err) + } }) - .try_collect::>() - .await - .map_err(to_compute_err)?; + .await?; paths.sort_unstable(); ( diff --git a/crates/polars-io/src/utils/byte_source.rs b/crates/polars-io/src/utils/byte_source.rs index af37d32b36da..06290956d63d 100644 --- a/crates/polars-io/src/utils/byte_source.rs +++ b/crates/polars-io/src/utils/byte_source.rs @@ -78,7 +78,6 @@ impl ObjectStoreByteSource { let (CloudLocation { prefix, .. }, store) = build_object_store(path, cloud_options, false).await?; let path = object_path_from_str(&prefix)?; - let store = PolarsObjectStore::new(store); Ok(Self { store, path }) } diff --git a/py-polars/tests/unit/io/cloud/test_cloud.py b/py-polars/tests/unit/io/cloud/test_cloud.py index 54d1b5ccd6a6..949b520cbd63 100644 --- a/py-polars/tests/unit/io/cloud/test_cloud.py +++ b/py-polars/tests/unit/io/cloud/test_cloud.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +import contextlib from functools import partial import pytest @@ -30,3 +33,25 @@ def test_scan_nonexistent_cloud_path_17444(format: str) -> None: # Upon collection, it should fail with pytest.raises(ComputeError): result.collect() + + +def test_scan_err_rebuild_store_19933() -> None: + call_count = 0 + + def f() -> None: + nonlocal call_count + call_count += 1 + raise AssertionError + + q = pl.scan_parquet( + "s3://.../...", + storage_options={"aws_region": "eu-west-1"}, + credential_provider=f, # type: ignore[arg-type] + ) + + with contextlib.suppress(Exception): + q.collect() + + # Note: We get called 2 times per attempt + if call_count != 4: + raise AssertionError(call_count)