Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support cloud storage in scan_csv #16674

Merged
merged 20 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ahash = { workspace = true }
arrow = { workspace = true }
async-trait = { version = "0.1.59", optional = true }
atoi_simd = { workspace = true, optional = true }
blake3 = "1.5.1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this an optional dependency? Only activated when we activate the caching?

Copy link
Collaborator Author

@nameexhaustion nameexhaustion Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put them behind a new file_cache feature flag (although maybe we could also just use the cloud feature flag if you want, let me know)
*edit: nvm, I think I prefer the new feature flag because it also specifies "dep:blake3", "dep:fs4"

bytes = { version = "1.3" }
chrono = { workspace = true, optional = true }
chrono-tz = { workspace = true, optional = true }
Expand All @@ -38,17 +39,18 @@ regex = { workspace = true }
reqwest = { workspace = true, optional = true }
ryu = { workspace = true, optional = true }
serde = { workspace = true, features = ["rc"], optional = true }
serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value"], optional = true }
serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value", "std"], optional = true }
simd-json = { workspace = true, optional = true }
simdutf8 = { workspace = true, optional = true }
smartstring = { workspace = true }
tokio = { workspace = true, features = ["net", "rt-multi-thread", "time", "sync"], optional = true }
tokio = { workspace = true, features = ["fs", "net", "rt-multi-thread", "time", "sync"], optional = true }
tokio-util = { workspace = true, features = ["io", "io-util"], optional = true }
url = { workspace = true, optional = true }
zstd = { workspace = true, optional = true }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
home = "0.5.4"
fs4 = { version = "0.8.3", features = ["sync"] }
nameexhaustion marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
tempfile = "3"
Expand Down
30 changes: 29 additions & 1 deletion crates/polars-io/src/cloud/polars_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use std::ops::Range;
use std::sync::Arc;

use bytes::Bytes;
use futures::StreamExt;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use polars_error::{to_compute_err, PolarsResult};
use tokio::io::AsyncWriteExt;

use crate::pl_async::{
tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST,
self, tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST,
};

/// Polars specific wrapper for `Arc<dyn ObjectStore>` that limits the number of
Expand Down Expand Up @@ -52,6 +54,32 @@ impl PolarsObjectStore {
.map_err(to_compute_err)
}

pub async fn download<F: tokio::io::AsyncWrite + std::marker::Unpin>(
&self,
path: &Path,
file: &mut F,
) -> PolarsResult<()> {
tune_with_concurrency_budget(1, || async {
let mut stream = self
.0
.get(path)
.await
.map_err(to_compute_err)?
.into_stream();

let mut len = 0;
while let Some(bytes) = stream.next().await {
let bytes = bytes.map_err(to_compute_err)?;
len += bytes.len();
file.write(bytes.as_ref()).await.map_err(to_compute_err)?;
}

PolarsResult::Ok(pl_async::Size::from(len as u64))
})
.await?;
Ok(())
}

/// Fetch the metadata of the parquet file, do not memoize it.
pub async fn head(&self, path: &Path) -> PolarsResult<ObjectMeta> {
with_concurrency_budget(1, || self.0.head(path))
Expand Down
24 changes: 21 additions & 3 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::path::PathBuf;
use memchr::memchr2_iter;
use num_traits::Pow;
use polars_core::prelude::*;
use polars_core::POOL;
use polars_core::{config, POOL};
use polars_utils::index::Bounded;
use polars_utils::slice::GetSaferUnchecked;
use rayon::prelude::*;
Expand All @@ -12,6 +12,9 @@ use super::buffer::Buffer;
use super::options::{CommentPrefix, NullValuesCompiled};
use super::splitfields::SplitFields;
use super::utils::get_file_chunks;
#[cfg(feature = "cloud")]
use crate::file_cache::FILE_CACHE;
use crate::prelude::is_cloud_url;
use crate::utils::get_reader_bytes;

/// Read the number of rows without parsing columns
Expand All @@ -24,7 +27,22 @@ pub fn count_rows(
eol_char: u8,
has_header: bool,
) -> PolarsResult<usize> {
let mut reader = polars_utils::open_file(path)?;
let mut reader = if is_cloud_url(path) || config::force_async() {
#[cfg(feature = "cloud")]
{
FILE_CACHE
.get_entry(path.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()?
}
#[cfg(not(feature = "cloud"))]
{
panic!("required feature `cloud` is not enabled")
}
} else {
polars_utils::open_file(path)?
};
let reader_bytes = get_reader_bytes(&mut reader)?;
const MIN_ROWS_PER_THREAD: usize = 1024;
let max_threads = POOL.current_num_threads();
Expand All @@ -44,7 +62,7 @@ pub fn count_rows(
})
.unwrap_or(1);

let file_chunks = get_file_chunks(
let file_chunks: Vec<(usize, usize)> = get_file_chunks(
&reader_bytes,
n_threads,
None,
Expand Down
153 changes: 153 additions & 0 deletions crates/polars-io/src/file_cache/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::time::Duration;

use once_cell::sync::Lazy;
use polars_core::config;
use polars_error::PolarsResult;
use polars_utils::aliases::PlHashMap;

use super::entry::FileCacheEntry;
use super::eviction::EvictionManager;
use super::file_fetcher::FileFetcher;
use crate::file_cache::entry::{DATA_PREFIX, METADATA_PREFIX};
use crate::prelude::is_cloud_url;

pub static FILE_CACHE: Lazy<FileCache> = Lazy::new(|| {
let prefix = std::env::var("POLARS_TEMP_DIR")
.unwrap_or_else(|_| std::env::temp_dir().to_string_lossy().into_owned());
let prefix = PathBuf::from(prefix).join("polars/file-cache/");
let prefix = Arc::<Path>::from(prefix.as_path());

if config::verbose() {
eprintln!("file cache prefix: {}", prefix.to_str().unwrap());
}

EvictionManager {
prefix: prefix.clone(),
files_to_remove: None,
limit_since_last_access: Duration::from_secs(
std::env::var("POLARS_FILE_CACHE_TTL")
.map(|x| x.parse::<u64>().expect("integer"))
.unwrap_or(60 * 60),
),
}
.run_in_background();

FileCache::new(prefix)
});

pub struct FileCache {
prefix: Arc<Path>,
entries: Arc<RwLock<PlHashMap<Arc<str>, Arc<FileCacheEntry>>>>,
}

impl FileCache {
fn new(prefix: Arc<Path>) -> Self {
let path = &prefix
.as_ref()
.join(std::str::from_utf8(&[METADATA_PREFIX]).unwrap());
let _ = std::fs::create_dir_all(path);
assert!(
path.is_dir(),
"failed to create file cache metadata directory: {}",
path.to_str().unwrap(),
);

let path = &prefix
.as_ref()
.join(std::str::from_utf8(&[DATA_PREFIX]).unwrap());
let _ = std::fs::create_dir_all(path);
assert!(
path.is_dir(),
"failed to create file cache data directory: {}",
path.to_str().unwrap(),
);

Self {
prefix,
entries: Default::default(),
}
}

/// If `uri` is a local path, it must be an absolute path.
pub fn init_entry<F: Fn() -> PolarsResult<Arc<dyn FileFetcher>>>(
&self,
uri: Arc<str>,
get_file_fetcher: F,
) -> PolarsResult<Arc<FileCacheEntry>> {
let verbose = config::verbose();

#[cfg(debug_assertions)]
{
// Local paths must be absolute or else the cache would be wrong.
if !crate::utils::is_cloud_url(uri.as_ref()) {
let path = Path::new(uri.as_ref());
assert_eq!(path, std::fs::canonicalize(path).unwrap().as_path());
}
}

{
let entries = self.entries.read().unwrap();

if let Some(entry) = entries.get(uri.as_ref()) {
if verbose {
eprintln!(
"[file_cache] init_entry: return existing entry for uri = {}",
uri.clone()
);
}
return Ok(entry.clone());
}
}

let uri_hash = blake3::hash(uri.as_bytes())
.to_hex()
.get(..32)
.unwrap()
.to_string();

{
let mut entries = self.entries.write().unwrap();

// May have been raced
if let Some(entry) = entries.get(uri.as_ref()) {
if verbose {
eprintln!("[file_cache] init_entry: return existing entry for uri = {} (lost init race)", uri.clone());
}
return Ok(entry.clone());
}

if verbose {
eprintln!(
"[file_cache] init_entry: creating new entry for uri = {}, hash = {}",
uri.clone(),
uri_hash.clone()
);
}

let entry = Arc::new(FileCacheEntry::new(
uri.clone(),
uri_hash,
self.prefix.clone(),
get_file_fetcher()?,
));
entries.insert_unique_unchecked(uri, entry.clone());
Ok(entry.clone())
}
}

/// This function can accept relative local paths.
pub fn get_entry(&self, uri: &str) -> Option<Arc<FileCacheEntry>> {
if is_cloud_url(uri) {
self.entries.read().unwrap().get(uri).map(Arc::clone)
} else {
let uri = std::fs::canonicalize(uri).unwrap();
self.entries
.read()
.unwrap()
.get(uri.to_str().unwrap())
.map(Arc::clone)
}
}
}
Loading
Loading