Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Retry with reloaded credentials on cloud error #20185

Merged
merged 6 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ impl CloudWriter {

let (cloud_location, object_store) =
crate::cloud::build_object_store(uri, cloud_options, false).await?;
Self::new_with_object_store(object_store, object_path_from_str(&cloud_location.prefix)?)
Self::new_with_object_store(
object_store.to_dyn_object_store().await,
object_path_from_str(&cloud_location.prefix)?,
)
}

pub fn close(&mut self) -> PolarsResult<()> {
Expand Down
46 changes: 30 additions & 16 deletions crates/polars-io/src/cloud/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use object_store::path::Path;
use polars_core::error::to_compute_err;
use polars_core::prelude::{polars_ensure, polars_err};
use polars_error::PolarsResult;
use polars_utils::pl_str::PlSmallStr;
use regex::Regex;
use url::Url;

Expand Down Expand Up @@ -74,13 +75,13 @@ pub(crate) fn extract_prefix_expansion(url: &str) -> PolarsResult<(String, Optio
#[derive(PartialEq, Debug, Default)]
pub struct CloudLocation {
/// The scheme (s3, ...).
pub scheme: String,
pub scheme: PlSmallStr,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by PlSmallStr as these tend to be small

/// The bucket name.
pub bucket: String,
pub bucket: PlSmallStr,
/// The prefix inside the bucket, this will be the full key when wildcards are not used.
pub prefix: String,
/// The path components that need to be expanded.
pub expansion: Option<String>,
pub expansion: Option<PlSmallStr>,
}

impl CloudLocation {
Expand All @@ -102,7 +103,8 @@ impl CloudLocation {
.ok_or_else(
|| polars_err!(ComputeError: "cannot parse bucket (host) from url: {}", parsed),
)?
.to_string();
.to_string()
.into();
(bucket, key)
};

Expand All @@ -114,9 +116,9 @@ impl CloudLocation {
if is_local && key.starts_with(DELIMITER) {
prefix.insert(0, DELIMITER);
}
(prefix, expansion)
(prefix, expansion.map(|x| x.into()))
} else {
(key.to_string(), None)
(key.as_ref().into(), None)
};

Ok(CloudLocation {
Expand Down Expand Up @@ -155,7 +157,7 @@ impl Matcher {
}

pub(crate) fn is_matching(&self, key: &str) -> bool {
if !key.starts_with(&self.prefix) {
if !key.starts_with(self.prefix.as_str()) {
// Prefix does not match, should not happen.
return false;
}
Expand Down Expand Up @@ -183,23 +185,35 @@ pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResu
let matcher = &Matcher::new(
if scheme == "file" {
// For local paths the returned location has the leading slash stripped.
prefix[1..].to_string()
prefix[1..].into()
} else {
prefix.clone()
},
expansion.as_deref(),
)?;

let path = Path::from(prefix.as_str());
let path = Some(&path);

let mut locations = store
.list(Some(&Path::from(prefix)))
.try_filter_map(|x| async move {
let out =
(x.size > 0 && matcher.is_matching(x.location.as_ref())).then_some(x.location);
Ok(out)
.try_exec_rebuild_on_err(|store| {
let st = store.clone();

async {
let store = st;
store
.list(path)
.try_filter_map(|x| async move {
let out = (x.size > 0 && matcher.is_matching(x.location.as_ref()))
.then_some(x.location);
Ok(out)
})
.try_collect::<Vec<_>>()
.await
.map_err(to_compute_err)
}
})
.try_collect::<Vec<_>>()
.await
.map_err(to_compute_err)?;
.await?;

locations.sort_unstable();
Ok(locations
Expand Down
217 changes: 135 additions & 82 deletions crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,22 @@ use once_cell::sync::Lazy;
use polars_core::config;
use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult};
use polars_utils::aliases::PlHashMap;
use polars_utils::pl_str::PlSmallStr;
use tokio::sync::RwLock;
use url::Url;

use super::{parse_url, CloudLocation, CloudOptions, CloudType};
use super::{parse_url, CloudLocation, CloudOptions, CloudType, PolarsObjectStore};
use crate::cloud::CloudConfig;

/// Object stores must be cached. Every object-store will do DNS lookups and
/// get rate limited when querying the DNS (can take up to 5s).
/// Other reasons are connection pools that must be shared between as much as possible.
#[allow(clippy::type_complexity)]
static OBJECT_STORE_CACHE: Lazy<RwLock<PlHashMap<String, Arc<dyn ObjectStore>>>> =
static OBJECT_STORE_CACHE: Lazy<RwLock<PlHashMap<String, PolarsObjectStore>>> =
Lazy::new(Default::default);

type BuildResult = PolarsResult<(CloudLocation, Arc<dyn ObjectStore>)>;

#[allow(dead_code)]
fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult {
fn err_missing_feature(feature: &str, scheme: &str) -> PolarsResult<Arc<dyn ObjectStore>> {
polars_bail!(
ComputeError:
"feature '{}' must be enabled in order to use '{}' cloud urls", feature, scheme,
Expand Down Expand Up @@ -64,6 +63,7 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String {
},
))
.unwrap();

format!(
"{}://{}<\\creds\\>{}",
url.scheme(),
Expand All @@ -77,6 +77,126 @@ pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path
object_store::path::Path::parse(path).map_err(to_compute_err)
}

#[derive(Debug, Clone)]
pub(crate) struct PolarsObjectStoreBuilder {
url: PlSmallStr,
parsed_url: Url,
#[allow(unused)]
scheme: PlSmallStr,
cloud_type: CloudType,
options: Option<CloudOptions>,
}

impl PolarsObjectStoreBuilder {
pub(super) async fn build_impl(&self) -> PolarsResult<Arc<dyn ObjectStore>> {
let options = self
.options
.as_ref()
.unwrap_or_else(|| CloudOptions::default_static_ref());

let store = match self.cloud_type {
CloudType::Aws => {
#[cfg(feature = "aws")]
{
let store = options.build_aws(&self.url).await?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
#[cfg(not(feature = "aws"))]
return err_missing_feature("aws", &self.scheme);
},
CloudType::Gcp => {
#[cfg(feature = "gcp")]
{
let store = options.build_gcp(&self.url)?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
#[cfg(not(feature = "gcp"))]
return err_missing_feature("gcp", &self.scheme);
},
CloudType::Azure => {
{
#[cfg(feature = "azure")]
{
let store = options.build_azure(&self.url)?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
}
#[cfg(not(feature = "azure"))]
return err_missing_feature("azure", &self.scheme);
},
CloudType::File => {
let local = LocalFileSystem::new();
Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
},
CloudType::Http => {
{
#[cfg(feature = "http")]
{
let store = options.build_http(&self.url)?;
PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
}
}
#[cfg(not(feature = "http"))]
return err_missing_feature("http", &cloud_location.scheme);
},
CloudType::Hf => panic!("impl error: unresolved hf:// path"),
}?;

Ok(store)
}

/// Note: Use `build_impl` for a non-caching version.
pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
let opt_cache_key = match &self.cloud_type {
CloudType::Aws | CloudType::Gcp | CloudType::Azure => Some(url_and_creds_to_key(
&self.parsed_url,
self.options.as_ref(),
)),
CloudType::File | CloudType::Http | CloudType::Hf => None,
};

let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
let cache = OBJECT_STORE_CACHE.read().await;

if let Some(store) = cache.get(cache_key) {
return Ok(store.clone());
}

drop(cache);

let cache = OBJECT_STORE_CACHE.write().await;

if let Some(store) = cache.get(cache_key) {
return Ok(store.clone());
}

Some(cache)
} else {
None
};

let store = self.build_impl().await?;
let store = PolarsObjectStore::new_from_inner(store, self);

if let Some(mut cache) = opt_cache_write_guard {
// Clear the cache if we surpass a certain amount of buckets.
if cache.len() >= 8 {
if config::verbose() {
eprintln!(
"build_object_store: clearing store cache (cache.len(): {})",
cache.len()
);
}
cache.clear()
}

cache.insert(opt_cache_key.unwrap(), store.clone());
}

Ok(store)
}
}

/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
pub async fn build_object_store(
url: &str,
Expand All @@ -86,88 +206,21 @@ pub async fn build_object_store(
)]
options: Option<&CloudOptions>,
glob: bool,
) -> BuildResult {
) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
let parsed = parse_url(url).map_err(to_compute_err)?;
let cloud_location = CloudLocation::from_url(&parsed, glob)?;
let cloud_type = CloudType::from_url(&parsed)?;

// FIXME: `credential_provider` is currently serializing the entire Python function here
// into a string with pickle for this cache key because we are using `serde_json::to_string`
let key = url_and_creds_to_key(&parsed, options);
let mut allow_cache = true;

{
let cache = OBJECT_STORE_CACHE.read().await;
if let Some(store) = cache.get(&key) {
return Ok((cloud_location, store.clone()));
}
let store = PolarsObjectStoreBuilder {
url: url.into(),
parsed_url: parsed,
scheme: cloud_location.scheme.as_str().into(),
cloud_type,
options: options.cloned(),
}
.build()
.await?;

let options = options.map(std::borrow::Cow::Borrowed).unwrap_or_default();

let cloud_type = CloudType::from_url(&parsed)?;
let store = match cloud_type {
CloudType::Aws => {
#[cfg(feature = "aws")]
{
let store = options.build_aws(url).await?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
#[cfg(not(feature = "aws"))]
return err_missing_feature("aws", &cloud_location.scheme);
},
CloudType::Gcp => {
#[cfg(feature = "gcp")]
{
let store = options.build_gcp(url)?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
#[cfg(not(feature = "gcp"))]
return err_missing_feature("gcp", &cloud_location.scheme);
},
CloudType::Azure => {
{
#[cfg(feature = "azure")]
{
let store = options.build_azure(url)?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
}
#[cfg(not(feature = "azure"))]
return err_missing_feature("azure", &cloud_location.scheme);
},
CloudType::File => {
allow_cache = false;
let local = LocalFileSystem::new();
Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
},
CloudType::Http => {
{
allow_cache = false;
#[cfg(feature = "http")]
{
let store = options.build_http(url)?;
PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
}
}
#[cfg(not(feature = "http"))]
return err_missing_feature("http", &cloud_location.scheme);
},
CloudType::Hf => panic!("impl error: unresolved hf:// path"),
}?;
if allow_cache {
let mut cache = OBJECT_STORE_CACHE.write().await;
// Clear the cache if we surpass a certain amount of buckets.
if cache.len() > 8 {
if config::verbose() {
eprintln!(
"build_object_store: clearing store cache (cache.len(): {})",
cache.len()
);
}
cache.clear()
}
cache.insert(key, store.clone());
}
Ok((cloud_location, store))
}

Expand Down
Loading
Loading