Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support cloud storage in scan_csv #16674

Merged
merged 20 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ahash = { workspace = true }
arrow = { workspace = true }
async-trait = { version = "0.1.59", optional = true }
atoi_simd = { workspace = true, optional = true }
blake3 = { version = "1.5.1", optional = true }
bytes = { version = "1.3" }
chrono = { workspace = true, optional = true }
chrono-tz = { workspace = true, optional = true }
Expand All @@ -38,16 +39,17 @@ regex = { workspace = true }
reqwest = { workspace = true, optional = true }
ryu = { workspace = true, optional = true }
serde = { workspace = true, features = ["rc"], optional = true }
serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value"], optional = true }
serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value", "std"], optional = true }
simd-json = { workspace = true, optional = true }
simdutf8 = { workspace = true, optional = true }
smartstring = { workspace = true }
tokio = { workspace = true, features = ["net", "rt-multi-thread", "time", "sync"], optional = true }
tokio = { workspace = true, features = ["fs", "net", "rt-multi-thread", "time", "sync"], optional = true }
tokio-util = { workspace = true, features = ["io", "io-util"], optional = true }
url = { workspace = true, optional = true }
zstd = { workspace = true, optional = true }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
fs4 = { version = "0.8.3", features = ["sync"], optional = true }
home = "0.5.4"

[dev-dependencies]
Expand Down Expand Up @@ -107,7 +109,8 @@ async = [
"polars-error/regex",
"polars-parquet?/async",
]
cloud = ["object_store", "async", "polars-error/object_store", "url", "serde_json", "serde"]
cloud = ["object_store", "async", "polars-error/object_store", "url", "serde_json", "serde", "file_cache"]
file_cache = ["async", "dep:blake3", "dep:fs4"]
aws = ["object_store/aws", "cloud", "reqwest"]
azure = ["object_store/azure", "cloud"]
gcp = ["object_store/gcp", "cloud"]
Expand Down
30 changes: 29 additions & 1 deletion crates/polars-io/src/cloud/polars_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use std::ops::Range;
use std::sync::Arc;

use bytes::Bytes;
use futures::StreamExt;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use polars_error::{to_compute_err, PolarsResult};
use tokio::io::AsyncWriteExt;

use crate::pl_async::{
tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST,
self, tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST,
};

/// Polars specific wrapper for `Arc<dyn ObjectStore>` that limits the number of
Expand Down Expand Up @@ -52,6 +54,32 @@ impl PolarsObjectStore {
.map_err(to_compute_err)
}

pub async fn download<F: tokio::io::AsyncWrite + std::marker::Unpin>(
&self,
path: &Path,
file: &mut F,
) -> PolarsResult<()> {
tune_with_concurrency_budget(1, || async {
let mut stream = self
.0
.get(path)
.await
.map_err(to_compute_err)?
.into_stream();

let mut len = 0;
while let Some(bytes) = stream.next().await {
let bytes = bytes.map_err(to_compute_err)?;
len += bytes.len();
file.write(bytes.as_ref()).await.map_err(to_compute_err)?;
}

PolarsResult::Ok(pl_async::Size::from(len as u64))
})
.await?;
Ok(())
}

/// Fetch the metadata of the parquet file, do not memoize it.
pub async fn head(&self, path: &Path) -> PolarsResult<ObjectMeta> {
with_concurrency_budget(1, || self.0.head(path))
Expand Down
22 changes: 19 additions & 3 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::path::PathBuf;
use memchr::memchr2_iter;
use num_traits::Pow;
use polars_core::prelude::*;
use polars_core::POOL;
use polars_core::{config, POOL};
use polars_utils::index::Bounded;
use polars_utils::slice::GetSaferUnchecked;
use rayon::prelude::*;
Expand All @@ -12,6 +12,7 @@ use super::buffer::Buffer;
use super::options::{CommentPrefix, NullValuesCompiled};
use super::splitfields::SplitFields;
use super::utils::get_file_chunks;
use crate::prelude::is_cloud_url;
use crate::utils::get_reader_bytes;

/// Read the number of rows without parsing columns
Expand All @@ -24,7 +25,22 @@ pub fn count_rows(
eol_char: u8,
has_header: bool,
) -> PolarsResult<usize> {
let mut reader = polars_utils::open_file(path)?;
let mut reader = if is_cloud_url(path) || config::force_async() {
#[cfg(feature = "cloud")]
{
crate::file_cache::FILE_CACHE
.get_entry(path.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()?
}
#[cfg(not(feature = "cloud"))]
{
panic!("required feature `cloud` is not enabled")
}
} else {
polars_utils::open_file(path)?
};
let reader_bytes = get_reader_bytes(&mut reader)?;
const MIN_ROWS_PER_THREAD: usize = 1024;
let max_threads = POOL.current_num_threads();
Expand All @@ -44,7 +60,7 @@ pub fn count_rows(
})
.unwrap_or(1);

let file_chunks = get_file_chunks(
let file_chunks: Vec<(usize, usize)> = get_file_chunks(
&reader_bytes,
n_threads,
None,
Expand Down
151 changes: 151 additions & 0 deletions crates/polars-io/src/file_cache/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::time::Duration;

use once_cell::sync::Lazy;
use polars_core::config;
use polars_error::PolarsResult;
use polars_utils::aliases::PlHashMap;

use super::entry::{FileCacheEntry, DATA_PREFIX, METADATA_PREFIX};
use super::eviction::EvictionManager;
use super::file_fetcher::FileFetcher;
use super::utils::FILE_CACHE_PREFIX;
use crate::prelude::is_cloud_url;

pub static FILE_CACHE: Lazy<FileCache> = Lazy::new(|| {
let prefix = FILE_CACHE_PREFIX.as_ref();
let prefix = Arc::<Path>::from(prefix);

if config::verbose() {
eprintln!("file cache prefix: {}", prefix.to_str().unwrap());
}

EvictionManager {
prefix: prefix.clone(),
files_to_remove: None,
limit_since_last_access: Duration::from_secs(
std::env::var("POLARS_FILE_CACHE_TTL")
.map(|x| x.parse::<u64>().expect("integer"))
.unwrap_or(60 * 60),
),
}
.run_in_background();

FileCache::new(prefix)
});

pub struct FileCache {
prefix: Arc<Path>,
entries: Arc<RwLock<PlHashMap<Arc<str>, Arc<FileCacheEntry>>>>,
}

impl FileCache {
fn new(prefix: Arc<Path>) -> Self {
let path = &prefix
.as_ref()
.join(std::str::from_utf8(&[METADATA_PREFIX]).unwrap());
let _ = std::fs::create_dir_all(path);
assert!(
path.is_dir(),
"failed to create file cache metadata directory: {}",
path.to_str().unwrap(),
);

let path = &prefix
.as_ref()
.join(std::str::from_utf8(&[DATA_PREFIX]).unwrap());
let _ = std::fs::create_dir_all(path);
assert!(
path.is_dir(),
"failed to create file cache data directory: {}",
path.to_str().unwrap(),
);

Self {
prefix,
entries: Default::default(),
}
}

/// If `uri` is a local path, it must be an absolute path.
pub fn init_entry<F: Fn() -> PolarsResult<Arc<dyn FileFetcher>>>(
&self,
uri: Arc<str>,
get_file_fetcher: F,
) -> PolarsResult<Arc<FileCacheEntry>> {
let verbose = config::verbose();

#[cfg(debug_assertions)]
{
// Local paths must be absolute or else the cache would be wrong.
if !crate::utils::is_cloud_url(uri.as_ref()) {
let path = Path::new(uri.as_ref());
assert_eq!(path, std::fs::canonicalize(path).unwrap().as_path());
}
}

{
let entries = self.entries.read().unwrap();

if let Some(entry) = entries.get(uri.as_ref()) {
if verbose {
eprintln!(
"[file_cache] init_entry: return existing entry for uri = {}",
uri.clone()
);
}
return Ok(entry.clone());
}
}

let uri_hash = blake3::hash(uri.as_bytes())
.to_hex()
.get(..32)
.unwrap()
.to_string();

{
let mut entries = self.entries.write().unwrap();

// May have been raced
if let Some(entry) = entries.get(uri.as_ref()) {
if verbose {
eprintln!("[file_cache] init_entry: return existing entry for uri = {} (lost init race)", uri.clone());
}
return Ok(entry.clone());
}

if verbose {
eprintln!(
"[file_cache] init_entry: creating new entry for uri = {}, hash = {}",
uri.clone(),
uri_hash.clone()
);
}

let entry = Arc::new(FileCacheEntry::new(
uri.clone(),
uri_hash,
self.prefix.clone(),
get_file_fetcher()?,
));
entries.insert_unique_unchecked(uri, entry.clone());
Ok(entry.clone())
}
}

/// This function can accept relative local paths.
pub fn get_entry(&self, uri: &str) -> Option<Arc<FileCacheEntry>> {
if is_cloud_url(uri) {
self.entries.read().unwrap().get(uri).map(Arc::clone)
} else {
let uri = std::fs::canonicalize(uri).unwrap();
self.entries
.read()
.unwrap()
.get(uri.to_str().unwrap())
.map(Arc::clone)
}
}
}
Loading