Skip to content

Commit

Permalink
feat: Support directory paths in scans for Parquet, IPC and CSV (#17017)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Jun 18, 2024
1 parent 2d3b0c2 commit 306a918
Show file tree
Hide file tree
Showing 33 changed files with 420 additions and 336 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/polars-io/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ pub struct RowIndex {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct HiveOptions {
pub enabled: bool,
pub hive_start_idx: usize,
pub schema: Option<SchemaRef>,
}

impl Default for HiveOptions {
fn default() -> Self {
Self {
enabled: true,
hive_start_idx: 0,
schema: None,
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ polars-utils = { workspace = true }
ahash = { workspace = true }
bitflags = { workspace = true }
glob = { version = "0.3" }
memchr = { workspace = true }
once_cell = { workspace = true }
pyo3 = { workspace = true, optional = true }
rayon = { workspace = true }
Expand Down
55 changes: 15 additions & 40 deletions crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::prelude::*;
#[derive(Clone)]
#[cfg(feature = "csv")]
pub struct LazyCsvReader {
path: PathBuf,
paths: Arc<[PathBuf]>,
glob: bool,
cache: bool,
Expand All @@ -35,8 +34,7 @@ impl LazyCsvReader {

pub fn new(path: impl AsRef<Path>) -> Self {
LazyCsvReader {
path: path.as_ref().to_owned(),
paths: Arc::new([]),
paths: Arc::new([path.as_ref().to_path_buf()]),
glob: true,
cache: true,
read_options: Default::default(),
Expand Down Expand Up @@ -218,15 +216,13 @@ impl LazyCsvReader {
where
F: Fn(Schema) -> PolarsResult<Schema>,
{
let mut file = if let Some(mut paths) = self.iter_paths()? {
let path = match paths.next() {
Some(globresult) => globresult?,
None => polars_bail!(ComputeError: "globbing pattern did not match any files"),
};
polars_utils::open_file(path)
} else {
polars_utils::open_file(&self.path)
}?;
let paths = self.expand_paths()?.0;
let Some(path) = paths.first() else {
polars_bail!(ComputeError: "no paths specified for this reader");
};

let mut file = polars_utils::open_file(path)?;

let reader_bytes = get_reader_bytes(&mut file).expect("could not mmap file");
let skip_rows = self.read_options.skip_rows;
let parse_options = self.read_options.get_parse_options();
Expand Down Expand Up @@ -264,25 +260,9 @@ impl LazyCsvReader {

impl LazyFileListReader for LazyCsvReader {
/// Get the final [LazyFrame].
fn finish(mut self) -> PolarsResult<LazyFrame> {
if !self.glob {
return self.finish_no_glob();
}
if let Some(paths) = self.iter_paths()? {
let paths = paths
.into_iter()
.collect::<PolarsResult<Arc<[PathBuf]>>>()?;
self.paths = paths;
}
self.finish_no_glob()
}

fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
let paths = if self.paths.is_empty() {
Arc::new([self.path])
} else {
self.paths
};
fn finish(self) -> PolarsResult<LazyFrame> {
// `expand_paths` respects globs
let paths = self.expand_paths()?.0;

let mut lf: LazyFrame =
DslBuilder::scan_csv(paths, self.read_options, self.cache, self.cloud_options)?
Expand All @@ -292,23 +272,18 @@ impl LazyFileListReader for LazyCsvReader {
Ok(lf)
}

fn glob(&self) -> bool {
self.glob
fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
unreachable!();
}

fn path(&self) -> &Path {
&self.path
fn glob(&self) -> bool {
self.glob
}

fn paths(&self) -> &[PathBuf] {
&self.paths
}

fn with_path(mut self, path: PathBuf) -> Self {
self.path = path;
self
}

fn with_paths(mut self, paths: Arc<[PathBuf]>) -> Self {
self.paths = paths;
self
Expand Down
Loading

0 comments on commit 306a918

Please sign in to comment.