From f996d44322d5d1a057df92737f6ea4495fe5e776 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 17 Sep 2024 20:14:27 +0200 Subject: [PATCH] c --- crates/polars-io/src/parquet/read/reader.rs | 4 + crates/polars-lazy/src/scan/csv.rs | 2 +- crates/polars-lazy/src/scan/ipc.rs | 2 +- crates/polars-lazy/src/scan/ndjson.rs | 7 +- crates/polars-lazy/src/scan/parquet.rs | 2 +- .../src/executors/scan/parquet.rs | 24 +- .../src/executors/sources/parquet.rs | 21 +- crates/polars-plan/src/client/check.rs | 3 +- crates/polars-plan/src/plans/builder_dsl.rs | 31 +- .../src/plans/conversion/dsl_to_ir.rs | 266 +++++++----------- .../polars-plan/src/plans/conversion/mod.rs | 37 +-- .../polars-plan/src/plans/ir/scan_sources.rs | 56 +++- crates/polars-plan/src/plans/mod.rs | 25 +- .../nodes/parquet_source/metadata_fetch.rs | 12 +- .../src/nodes/parquet_source/mod.rs | 4 +- .../src/physical_plan/to_graph.rs | 2 +- py-polars/tests/unit/io/test_lazy_parquet.py | 22 ++ 17 files changed, 269 insertions(+), 251 deletions(-) diff --git a/crates/polars-io/src/parquet/read/reader.rs b/crates/polars-io/src/parquet/read/reader.rs index 0f6f3b70b4f3..6d4a362c7198 100644 --- a/crates/polars-io/src/parquet/read/reader.rs +++ b/crates/polars-io/src/parquet/read/reader.rs @@ -138,6 +138,10 @@ impl ParquetReader { self } + pub fn set_metadata(&mut self, metadata: FileMetadataRef) { + self.metadata = Some(metadata); + } + pub fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> { if self.metadata.is_none() { self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?)); diff --git a/crates/polars-lazy/src/scan/csv.rs b/crates/polars-lazy/src/scan/csv.rs index 998f422820c6..7127d64e87cb 100644 --- a/crates/polars-lazy/src/scan/csv.rs +++ b/crates/polars-lazy/src/scan/csv.rs @@ -311,7 +311,7 @@ impl LazyFileListReader for LazyCsvReader { /// Get the final [LazyFrame]. fn finish(self) -> PolarsResult { let mut lf: LazyFrame = DslBuilder::scan_csv( - self.sources.to_dsl(false), + self.sources, self.read_options, self.cache, self.cloud_options, diff --git a/crates/polars-lazy/src/scan/ipc.rs b/crates/polars-lazy/src/scan/ipc.rs index a9f8c8b98b0f..8d5f9ef7708f 100644 --- a/crates/polars-lazy/src/scan/ipc.rs +++ b/crates/polars-lazy/src/scan/ipc.rs @@ -54,7 +54,7 @@ impl LazyFileListReader for LazyIpcReader { let options = IpcScanOptions {}; let mut lf: LazyFrame = DslBuilder::scan_ipc( - self.sources.to_dsl(false), + self.sources, options, args.n_rows, args.cache, diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index cbd7bbcee8f1..a44ce9053ef5 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -1,6 +1,6 @@ use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::Arc; use polars_core::prelude::*; use polars_io::cloud::CloudOptions; @@ -155,10 +155,11 @@ impl LazyFileListReader for LazyJsonLineReader { }; Ok(LazyFrame::from(DslPlan::Scan { - sources: Arc::new(Mutex::new(self.sources.to_dsl(false))), - file_info: Arc::new(RwLock::new(None)), + sources: self.sources, + file_info: None, file_options, scan_type, + cached_ir: Default::default(), })) } diff --git a/crates/polars-lazy/src/scan/parquet.rs b/crates/polars-lazy/src/scan/parquet.rs index 9adb0f1838be..eb26eafb6144 100644 --- a/crates/polars-lazy/src/scan/parquet.rs +++ b/crates/polars-lazy/src/scan/parquet.rs @@ -62,7 +62,7 @@ impl LazyFileListReader for LazyParquetReader { let row_index = self.args.row_index; let mut lf: LazyFrame = DslBuilder::scan_parquet( - self.sources.to_dsl(false), + self.sources, self.args.n_rows, self.args.cache, self.args.parallel, diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index b9012344abb9..27160e2356e6 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -82,7 +82,16 @@ impl ParquetExec { .into_par_iter() .map(|&i| { let memslice = self.sources.at(i).to_memslice()?; - ParquetReader::new(std::io::Cursor::new(memslice)).num_rows() + + let mut reader = ParquetReader::new(std::io::Cursor::new(memslice)); + + if i == 0 { + if let Some(md) = self.metadata.clone() { + reader.set_metadata(md) + } + } + + reader.num_rows() }) .collect::>>()?; @@ -151,7 +160,15 @@ impl ParquetExec { let memslice = source.to_memslice()?; - let mut reader = ParquetReader::new(std::io::Cursor::new(memslice)) + let mut reader = ParquetReader::new(std::io::Cursor::new(memslice)); + + if i == 0 { + if let Some(md) = self.metadata.clone() { + reader.set_metadata(md) + } + } + + let mut reader = reader .read_parallel(parallel) .set_low_memory(self.options.low_memory) .use_statistics(self.options.use_statistics) @@ -266,6 +283,7 @@ impl ParquetExec { let mut iter = stream::iter((0..paths.len()).rev().map(|i| { let paths = paths.clone(); let cloud_options = cloud_options.clone(); + let first_metadata = first_metadata.clone(); pl_async::get_runtime().spawn(async move { PolarsResult::Ok(( @@ -273,7 +291,7 @@ impl ParquetExec { ParquetAsyncReader::from_uri( paths[i].to_str().unwrap(), cloud_options.as_ref().as_ref(), - None, + first_metadata.filter(|_| i == 0), ) .await? .num_rows() diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index 7a0dabeb10df..b5d4230cb52a 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -41,7 +41,7 @@ pub struct ParquetSource { file_options: FileScanOptions, #[allow(dead_code)] cloud_options: Option, - metadata: Option, + first_metadata: Option, file_info: FileInfo, hive_parts: Option>>, verbose: bool, @@ -61,7 +61,6 @@ impl ParquetSource { } fn init_next_reader_sync(&mut self) -> PolarsResult<()> { - self.metadata = None; self.init_reader_sync() } @@ -133,7 +132,16 @@ impl ParquetSource { let batched_reader = { let file = std::fs::File::open(path).unwrap(); - let mut reader = ParquetReader::new(file) + + let mut reader = ParquetReader::new(file); + + if index == 0 { + if let Some(md) = self.first_metadata.clone() { + reader.set_metadata(md); + } + } + + let mut reader = reader .with_projection(projection) .check_schema( self.file_info @@ -191,7 +199,7 @@ impl ParquetSource { async fn init_reader_async(&self, index: usize) -> PolarsResult { use std::sync::atomic::Ordering; - let metadata = self.metadata.clone(); + let metadata = self.first_metadata.clone().filter(|_| index == 0); let predicate = self.predicate.clone(); let cloud_options = self.cloud_options.clone(); let (path, options, file_options, projection, chunk_size, hive_partitions) = @@ -252,7 +260,7 @@ impl ParquetSource { sources: ScanSources, options: ParquetOptions, cloud_options: Option, - metadata: Option, + first_metadata: Option, file_options: FileScanOptions, file_info: FileInfo, hive_parts: Option>>, @@ -282,7 +290,7 @@ impl ParquetSource { iter, sources, cloud_options, - metadata, + first_metadata, file_info, hive_parts, verbose, @@ -293,7 +301,6 @@ impl ParquetSource { // Already start downloading when we deal with cloud urls. if run_async { source.init_next_reader()?; - source.metadata = None; } Ok(source) } diff --git a/crates/polars-plan/src/client/check.rs b/crates/polars-plan/src/client/check.rs index 84189840a3dd..f76f508643f0 100644 --- a/crates/polars-plan/src/client/check.rs +++ b/crates/polars-plan/src/client/check.rs @@ -13,8 +13,7 @@ pub(super) fn assert_cloud_eligible(dsl: &DslPlan) -> PolarsResult<()> { DslPlan::Scan { sources, scan_type, .. } => { - let sources_lock = sources.lock().unwrap(); - match &sources_lock.sources { + match sources { ScanSources::Paths(paths) => { if paths.iter().any(|p| !is_cloud_url(p)) { return ineligible_error("contains scan of local file system"); diff --git a/crates/polars-plan/src/plans/builder_dsl.rs b/crates/polars-plan/src/plans/builder_dsl.rs index 4040087833f0..5458c0442abe 100644 --- a/crates/polars-plan/src/plans/builder_dsl.rs +++ b/crates/polars-plan/src/plans/builder_dsl.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::Arc; use polars_core::prelude::*; #[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] @@ -57,11 +57,8 @@ impl DslBuilder { }; Ok(DslPlan::Scan { - sources: Arc::new(Mutex::new(DslScanSources { - sources: ScanSources::Buffers(Arc::default()), - is_expanded: true, - })), - file_info: Arc::new(RwLock::new(Some(file_info))), + sources: ScanSources::Buffers(Arc::default()), + file_info: Some(file_info), file_options, scan_type: FileScan::Anonymous { function, @@ -70,6 +67,7 @@ impl DslBuilder { skip_rows, }), }, + cached_ir: Default::default(), } .into()) } @@ -77,7 +75,7 @@ impl DslBuilder { #[cfg(feature = "parquet")] #[allow(clippy::too_many_arguments)] pub fn scan_parquet( - sources: DslScanSources, + sources: ScanSources, n_rows: Option, cache: bool, parallel: polars_io::parquet::read::ParallelStrategy, @@ -102,8 +100,8 @@ impl DslBuilder { include_file_paths, }; Ok(DslPlan::Scan { - sources: Arc::new(Mutex::new(sources)), - file_info: Arc::new(RwLock::new(None)), + sources, + file_info: None, file_options: options, scan_type: FileScan::Parquet { options: ParquetOptions { @@ -114,6 +112,7 @@ impl DslBuilder { cloud_options, metadata: None, }, + cached_ir: Default::default(), } .into()) } @@ -121,7 +120,7 @@ impl DslBuilder { #[cfg(feature = "ipc")] #[allow(clippy::too_many_arguments)] pub fn scan_ipc( - sources: DslScanSources, + sources: ScanSources, options: IpcScanOptions, n_rows: Option, cache: bool, @@ -132,8 +131,8 @@ impl DslBuilder { include_file_paths: Option, ) -> PolarsResult { Ok(DslPlan::Scan { - sources: Arc::new(Mutex::new(sources)), - file_info: Arc::new(RwLock::new(None)), + sources, + file_info: None, file_options: FileScanOptions { with_columns: None, cache, @@ -150,6 +149,7 @@ impl DslBuilder { cloud_options, metadata: None, }, + cached_ir: Default::default(), } .into()) } @@ -157,7 +157,7 @@ impl DslBuilder { #[allow(clippy::too_many_arguments)] #[cfg(feature = "csv")] pub fn scan_csv( - sources: DslScanSources, + sources: ScanSources, read_options: CsvReadOptions, cache: bool, cloud_options: Option, @@ -183,13 +183,14 @@ impl DslBuilder { include_file_paths, }; Ok(DslPlan::Scan { - sources: Arc::new(Mutex::new(sources)), - file_info: Arc::new(RwLock::new(None)), + sources, + file_info: None, file_options: options, scan_type: FileScan::Csv { options: read_options, cloud_options, }, + cached_ir: Default::default(), } .into()) } diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 7cbb37bcb8e7..9b363d821d05 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -1,15 +1,7 @@ -use std::path::PathBuf; - use arrow::datatypes::ArrowSchemaRef; use either::Either; use expr_expansion::{is_regex_projection, rewrite_projections}; use hive::{hive_partitions_from_paths, HivePartitions}; -#[cfg(any(feature = "ipc", feature = "parquet"))] -use polars_io::cloud::CloudOptions; -#[cfg(any(feature = "csv", feature = "json"))] -use polars_io::path_utils::expand_paths; -#[cfg(any(feature = "ipc", feature = "parquet"))] -use polars_io::path_utils::{expand_paths_hive, expanded_from_single_directory}; use super::stack_opt::ConversionOptimizer; use super::*; @@ -107,31 +99,39 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult DslPlan::Scan { sources, file_info, - mut file_options, - mut scan_type, + file_options, + scan_type, + cached_ir, } => { - let mut sources_lock = sources.lock().unwrap(); - sources_lock.expand_paths(&mut scan_type, &mut file_options)?; - let sources = sources_lock.sources.clone(); - - let file_info_read = file_info.read().unwrap(); - - // leading `_` as clippy doesn't understand that you don't want to read from a lock guard - // if you want to keep it alive. - let mut _file_info_write: Option<_>; - let mut resolved_file_info = if let Some(file_info) = &*file_info_read { - _file_info_write = None; - let out = file_info.clone(); - drop(file_info_read); - out - } else { - // Lock so that we don't resolve the same schema in parallel. - drop(file_info_read); + // Note that the first metadata can still end up being `None` later if the files were + // filtered from predicate pushdown. + let mut cached_ir = cached_ir.lock().unwrap(); - // Set write lock and keep that lock until all fields in `file_info` are resolved. - _file_info_write = Some(file_info.write().unwrap()); + if cached_ir.is_none() { + let mut file_options = file_options.clone(); + let mut scan_type = scan_type.clone(); - match &mut scan_type { + let sources = match &scan_type { + #[cfg(feature = "parquet")] + FileScan::Parquet { + ref cloud_options, .. + } => sources + .expand_paths_with_hive_update(&mut file_options, cloud_options.as_ref())?, + #[cfg(feature = "ipc")] + FileScan::Ipc { + ref cloud_options, .. + } => sources + .expand_paths_with_hive_update(&mut file_options, cloud_options.as_ref())?, + #[cfg(feature = "csv")] + FileScan::Csv { + ref cloud_options, .. + } => sources.expand_paths(&file_options, cloud_options.as_ref())?, + #[cfg(feature = "json")] + FileScan::NDJson { .. } => sources.expand_paths(&file_options, None)?, + FileScan::Anonymous { .. } => sources, + }; + + let mut file_info = match &mut scan_type { #[cfg(feature = "parquet")] FileScan::Parquet { cloud_options, @@ -181,61 +181,58 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult cloud_options.as_ref(), ) .map_err(|e| e.context(failed_here!(ndjson scan)))?, - // FileInfo should be set. - FileScan::Anonymous { .. } => unreachable!(), - } - }; + FileScan::Anonymous { .. } => { + file_info.expect("FileInfo should be set for AnonymousScan") + }, + }; - let hive_parts = if file_options.hive_options.enabled.unwrap_or(false) - && resolved_file_info.reader_schema.is_some() - { - let paths = sources - .as_paths() - .ok_or_else(|| polars_err!(nyi = "Hive-partitioning of in-memory buffers"))?; - - #[allow(unused_assignments)] - let mut owned = None; - - hive_partitions_from_paths( - paths, - file_options.hive_options.hive_start_idx, - file_options.hive_options.schema.clone(), - match resolved_file_info.reader_schema.as_ref().unwrap() { - Either::Left(v) => { - owned = Some(Schema::from_arrow_schema(v.as_ref())); - owned.as_ref().unwrap() + let hive_parts = if file_options.hive_options.enabled.unwrap_or(false) + && file_info.reader_schema.is_some() + { + let paths = sources.as_paths().ok_or_else(|| { + polars_err!(nyi = "Hive-partitioning of in-memory buffers") + })?; + + #[allow(unused_assignments)] + let mut owned = None; + + hive_partitions_from_paths( + paths, + file_options.hive_options.hive_start_idx, + file_options.hive_options.schema.clone(), + match file_info.reader_schema.as_ref().unwrap() { + Either::Left(v) => { + owned = Some(Schema::from_arrow_schema(v.as_ref())); + owned.as_ref().unwrap() + }, + Either::Right(v) => v.as_ref(), }, - Either::Right(v) => v.as_ref(), - }, - file_options.hive_options.try_parse_dates, - )? - } else { - None - }; + file_options.hive_options.try_parse_dates, + )? + } else { + None + }; - file_options.include_file_paths = - file_options.include_file_paths.filter(|_| match scan_type { - #[cfg(feature = "parquet")] - FileScan::Parquet { .. } => true, - #[cfg(feature = "ipc")] - FileScan::Ipc { .. } => true, - #[cfg(feature = "csv")] - FileScan::Csv { .. } => true, - #[cfg(feature = "json")] - FileScan::NDJson { .. } => true, - FileScan::Anonymous { .. } => false, - }); + file_options.include_file_paths = + file_options.include_file_paths.filter(|_| match scan_type { + #[cfg(feature = "parquet")] + FileScan::Parquet { .. } => true, + #[cfg(feature = "ipc")] + FileScan::Ipc { .. } => true, + #[cfg(feature = "csv")] + FileScan::Csv { .. } => true, + #[cfg(feature = "json")] + FileScan::NDJson { .. } => true, + FileScan::Anonymous { .. } => false, + }); - // Only if we have a writing file handle we must resolve hive partitions - // update schema's etc. - if let Some(lock) = &mut _file_info_write { if let Some(ref hive_parts) = hive_parts { let hive_schema = hive_parts[0].schema(); - resolved_file_info.update_schema_with_hive_schema(hive_schema.clone()); + file_info.update_schema_with_hive_schema(hive_schema.clone()); } if let Some(ref file_path_col) = file_options.include_file_paths { - let schema = Arc::make_mut(&mut resolved_file_info.schema); + let schema = Arc::make_mut(&mut file_info.schema); if schema.contains(file_path_col) { polars_bail!( @@ -251,34 +248,34 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult )?; } - **lock = Some(resolved_file_info.clone()); - } + file_options.with_columns = if file_info.reader_schema.is_some() { + maybe_init_projection_excluding_hive( + file_info.reader_schema.as_ref().unwrap(), + hive_parts.as_ref().map(|x| &x[0]), + ) + } else { + None + }; - file_options.with_columns = if resolved_file_info.reader_schema.is_some() { - maybe_init_projection_excluding_hive( - resolved_file_info.reader_schema.as_ref().unwrap(), - hive_parts.as_ref().map(|x| &x[0]), - ) - } else { - None - }; + if let Some(row_index) = &file_options.row_index { + let schema = Arc::make_mut(&mut file_info.schema); + *schema = schema + .new_inserting_at_index(0, row_index.name.clone(), IDX_DTYPE) + .unwrap(); + } - if let Some(row_index) = &file_options.row_index { - let schema = Arc::make_mut(&mut resolved_file_info.schema); - *schema = schema - .new_inserting_at_index(0, row_index.name.clone(), IDX_DTYPE) - .unwrap(); + cached_ir.replace(IR::Scan { + sources, + file_info, + hive_parts, + predicate: None, + scan_type, + output_schema: None, + file_options, + }); } - IR::Scan { - sources, - file_info: resolved_file_info, - hive_parts, - output_schema: None, - predicate: None, - scan_type, - file_options, - } + cached_ir.clone().unwrap() }, #[cfg(feature = "python")] DslPlan::PythonScan { options } => IR::PythonScan { options }, @@ -798,75 +795,6 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult Ok(ctxt.lp_arena.add(v)) } -impl DslScanSources { - /// Expand scan paths if they were not already expanded. - pub fn expand_paths( - &mut self, - scan_type: &mut FileScan, - file_options: &mut FileScanOptions, - ) -> PolarsResult<()> { - if self.is_expanded { - return Ok(()); - } - - let ScanSources::Paths(paths) = &self.sources else { - self.is_expanded = true; - return Ok(()); - }; - - let expanded_sources = match &scan_type { - #[cfg(feature = "parquet")] - FileScan::Parquet { cloud_options, .. } => { - expand_scan_paths_with_hive_update(paths, file_options, cloud_options)? - }, - #[cfg(feature = "ipc")] - FileScan::Ipc { cloud_options, .. } => { - expand_scan_paths_with_hive_update(paths, file_options, cloud_options)? - }, - #[cfg(feature = "csv")] - FileScan::Csv { cloud_options, .. } => { - expand_paths(paths, file_options.glob, cloud_options.as_ref())? - }, - #[cfg(feature = "json")] - FileScan::NDJson { cloud_options, .. } => { - expand_paths(paths, file_options.glob, cloud_options.as_ref())? - }, - FileScan::Anonymous { .. } => unreachable!(), // Invariant: Anonymous scans are already expanded. - }; - - #[allow(unreachable_code)] - { - self.sources = ScanSources::Paths(expanded_sources); - self.is_expanded = true; - - Ok(()) - } - } -} - -/// Expand scan paths and update the Hive partition information of `file_options`. -#[cfg(any(feature = "ipc", feature = "parquet"))] -fn expand_scan_paths_with_hive_update( - paths: &[PathBuf], - file_options: &mut FileScanOptions, - cloud_options: &Option, -) -> PolarsResult> { - let hive_enabled = file_options.hive_options.enabled; - let (expanded_paths, hive_start_idx) = expand_paths_hive( - paths, - file_options.glob, - cloud_options.as_ref(), - hive_enabled.unwrap_or(false), - )?; - let inferred_hive_enabled = hive_enabled - .unwrap_or_else(|| expanded_from_single_directory(paths, expanded_paths.as_ref())); - - file_options.hive_options.enabled = Some(inferred_hive_enabled); - file_options.hive_options.hive_start_idx = hive_start_idx; - - Ok(expanded_paths) -} - fn expand_filter( predicate: Expr, input: Node, diff --git a/crates/polars-plan/src/plans/conversion/mod.rs b/crates/polars-plan/src/plans/conversion/mod.rs index f05482de7670..4b93f6f392b2 100644 --- a/crates/polars-plan/src/plans/conversion/mod.rs +++ b/crates/polars-plan/src/plans/conversion/mod.rs @@ -12,7 +12,7 @@ mod ir_to_dsl; mod scans; mod stack_opt; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex}; pub use dsl_to_ir::*; pub use expr_to_ir::*; @@ -49,22 +49,27 @@ impl IR { conversion_fn(node, lp_arena).into_lp(conversion_fn, lp_arena, expr_arena) }; match lp { - IR::Scan { - sources, - file_info, - hive_parts: _, - predicate: _, - scan_type, - output_schema: _, - file_options: options, - } => DslPlan::Scan { - sources: Arc::new(Mutex::new(DslScanSources { + ir @ IR::Scan { .. } => { + let IR::Scan { sources, - is_expanded: true, - })), - file_info: Arc::new(RwLock::new(Some(file_info))), - scan_type, - file_options: options, + file_info, + hive_parts: _, + predicate: _, + scan_type, + output_schema: _, + file_options, + } = ir.clone() + else { + unreachable!() + }; + + DslPlan::Scan { + sources: sources.clone(), + file_info: Some(file_info.clone()), + scan_type: scan_type.clone(), + file_options: file_options.clone(), + cached_ir: Arc::new(Mutex::new(Some(ir))), + } }, #[cfg(feature = "python")] IR::PythonScan { options, .. } => DslPlan::PythonScan { options }, diff --git a/crates/polars-plan/src/plans/ir/scan_sources.rs b/crates/polars-plan/src/plans/ir/scan_sources.rs index 08d8cad0bf49..f6674c70fbce 100644 --- a/crates/polars-plan/src/plans/ir/scan_sources.rs +++ b/crates/polars-plan/src/plans/ir/scan_sources.rs @@ -6,10 +6,11 @@ use polars_core::error::{feature_gated, PolarsResult}; use polars_io::cloud::CloudOptions; #[cfg(feature = "cloud")] use polars_io::utils::byte_source::{DynByteSource, DynByteSourceBuilder}; +use polars_io::{expand_paths, expand_paths_hive, expanded_from_single_directory}; use polars_utils::mmap::MemSlice; use polars_utils::pl_str::PlSmallStr; -use super::DslScanSources; +use super::FileScanOptions; /// Set of sources to scan from /// @@ -79,17 +80,56 @@ impl PartialEq for ScanSources { impl Eq for ScanSources {} impl ScanSources { - pub fn iter(&self) -> ScanSourceIter { - ScanSourceIter { - sources: self, - offset: 0, + pub fn expand_paths( + &self, + file_options: &FileScanOptions, + #[allow(unused_variables)] cloud_options: Option<&CloudOptions>, + ) -> PolarsResult { + match self { + Self::Paths(paths) => Ok(Self::Paths(expand_paths( + paths, + file_options.glob, + cloud_options, + )?)), + v => Ok(v.clone()), } } - pub fn to_dsl(self, is_expanded: bool) -> DslScanSources { - DslScanSources { + #[cfg(any(feature = "ipc", feature = "parquet"))] + pub fn expand_paths_with_hive_update( + &self, + file_options: &mut FileScanOptions, + #[allow(unused_variables)] cloud_options: Option<&CloudOptions>, + ) -> PolarsResult { + match self { + Self::Paths(paths) => { + let hive_enabled = file_options.hive_options.enabled; + let (expanded_paths, hive_start_idx) = expand_paths_hive( + paths, + file_options.glob, + cloud_options, + hive_enabled.unwrap_or(false), + )?; + let inferred_hive_enabled = hive_enabled.unwrap_or_else(|| { + expanded_from_single_directory(paths, expanded_paths.as_ref()) + }); + + file_options.hive_options.enabled = Some(inferred_hive_enabled); + file_options.hive_options.hive_start_idx = hive_start_idx; + + Ok(Self::Paths(expanded_paths)) + }, + v => { + file_options.hive_options.enabled = Some(false); + Ok(v.clone()) + }, + } + } + + pub fn iter(&self) -> ScanSourceIter { + ScanSourceIter { sources: self, - is_expanded, + offset: 0, } } diff --git a/crates/polars-plan/src/plans/mod.rs b/crates/polars-plan/src/plans/mod.rs index 100c16625fa5..03eb06387cc6 100644 --- a/crates/polars-plan/src/plans/mod.rs +++ b/crates/polars-plan/src/plans/mod.rs @@ -1,6 +1,6 @@ use std::fmt; use std::fmt::Debug; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex}; use polars_core::prelude::*; use recursive::recursive; @@ -57,13 +57,6 @@ pub enum Context { Default, } -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[derive(Clone)] -pub struct DslScanSources { - pub sources: ScanSources, - pub is_expanded: bool, -} - #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum DslPlan { #[cfg(feature = "python")] @@ -76,15 +69,15 @@ pub enum DslPlan { /// Cache the input at this point in the LP Cache { input: Arc, id: usize }, Scan { - sources: Arc>, - // Option as this is mostly materialized on the IR phase. - // During conversion we update the value in the DSL as well - // This is to cater to use cases where parts of a `LazyFrame` - // are used as base of different queries in a loop. That way - // the expensive schema resolving is cached. - file_info: Arc>>, + sources: ScanSources, + /// Materialized at IR except for AnonymousScan. + file_info: Option, file_options: FileScanOptions, scan_type: FileScan, + /// Local use cases often repeatedly collect the same `LazyFrame` (e.g. in interactive notebook use-cases), + /// so we cache the IR conversion here, as the path expansion can be quite slow (especially for cloud paths). + #[cfg_attr(feature = "serde", serde(skip))] + cached_ir: Arc>>, }, // we keep track of the projection and selection as it is cheaper to first project and then filter /// In memory DataFrame @@ -188,7 +181,7 @@ impl Clone for DslPlan { Self::PythonScan { options } => Self::PythonScan { options: options.clone() }, Self::Filter { input, predicate } => Self::Filter { input: input.clone(), predicate: predicate.clone() }, Self::Cache { input, id } => Self::Cache { input: input.clone(), id: id.clone() }, - Self::Scan { sources, file_info, file_options, scan_type } => Self::Scan { sources: sources.clone(), file_info: file_info.clone(), file_options: file_options.clone(), scan_type: scan_type.clone() }, + Self::Scan { sources, file_info, file_options, scan_type, cached_ir } => Self::Scan { sources: sources.clone(), file_info: file_info.clone(), file_options: file_options.clone(), scan_type: scan_type.clone(), cached_ir: cached_ir.clone() }, Self::DataFrameScan { df, schema, } => Self::DataFrameScan { df: df.clone(), schema: schema.clone(), }, Self::Select { expr, input, options } => Self::Select { expr: expr.clone(), input: input.clone(), options: options.clone() }, Self::GroupBy { input, keys, aggs, apply, maintain_order, options } => Self::GroupBy { input: input.clone(), keys: keys.clone(), aggs: aggs.clone(), apply: apply.clone(), maintain_order: maintain_order.clone(), options: options.clone() }, diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs index 05d59e76f468..8bddbaa24c7b 100644 --- a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs +++ b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs @@ -56,6 +56,7 @@ impl ParquetSourceNode { let scan_sources = scan_sources.clone(); let cloud_options = cloud_options.clone(); let byte_source_builder = byte_source_builder.clone(); + let have_first_metadata = self.first_metadata.is_some(); move |path_idx: usize| { let scan_sources = scan_sources.clone(); @@ -74,7 +75,7 @@ impl ParquetSourceNode { .await?, ); - if path_idx == 0 { + if path_idx == 0 && have_first_metadata { let metadata_bytes = MemSlice::EMPTY; return Ok((0, byte_source, metadata_bytes)); } @@ -118,13 +119,12 @@ impl ParquetSourceNode { let handle = async_executor::spawn(TaskPriority::Low, async move { let (path_index, byte_source, metadata_bytes) = handle.await.unwrap()?; - let metadata = if path_index == 0 { - Arc::unwrap_or_clone(first_metadata) - } else { - polars_parquet::parquet::read::deserialize_metadata( + let metadata = match first_metadata { + Some(md) if path_index == 0 => Arc::unwrap_or_clone(md), + _ => polars_parquet::parquet::read::deserialize_metadata( metadata_bytes.as_ref(), metadata_bytes.len() * 2 + 1024, - )? + )?, }; ensure_metadata_has_projected_fields( diff --git a/crates/polars-stream/src/nodes/parquet_source/mod.rs b/crates/polars-stream/src/nodes/parquet_source/mod.rs index f874dab1f066..7c496ac42d37 100644 --- a/crates/polars-stream/src/nodes/parquet_source/mod.rs +++ b/crates/polars-stream/src/nodes/parquet_source/mod.rs @@ -42,7 +42,7 @@ pub struct ParquetSourceNode { options: ParquetOptions, cloud_options: Option, file_options: FileScanOptions, - first_metadata: Arc, + first_metadata: Option>, // Run-time vars config: Config, verbose: bool, @@ -81,7 +81,7 @@ impl ParquetSourceNode { options: ParquetOptions, cloud_options: Option, file_options: FileScanOptions, - first_metadata: Arc, + first_metadata: Option>, ) -> Self { let verbose = config::verbose(); diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 15373db78fbe..7b11e44cc04b 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -317,7 +317,7 @@ fn to_graph_rec<'a>( options, cloud_options, file_options, - first_metadata.unwrap(), + first_metadata, ), [], ) diff --git a/py-polars/tests/unit/io/test_lazy_parquet.py b/py-polars/tests/unit/io/test_lazy_parquet.py index c7d9f3d1e7ef..f9fd0bd00991 100644 --- a/py-polars/tests/unit/io/test_lazy_parquet.py +++ b/py-polars/tests/unit/io/test_lazy_parquet.py @@ -541,3 +541,25 @@ def test_parquet_row_groups_shift_bug_18739(tmp_path: Path, streaming: bool) -> lf = pl.scan_parquet(path) assert_frame_equal(df, lf.collect(streaming=streaming)) + + +@pytest.mark.write_disk +@pytest.mark.parametrize("streaming", [True, False]) +def test_dsl2ir_cached_metadata(tmp_path: Path, streaming: bool) -> None: + df = pl.DataFrame({"x": 1}) + path = tmp_path / "1" + df.write_parquet(path) + + lf = pl.scan_parquet(path) + assert_frame_equal(lf.collect(), df) + + # Removes the metadata portion of the parquet file. + # Used to test that a reader doesn't try to read the metadata. + def remove_metadata(path: str | Path) -> None: + path = Path(path) + v = path.read_bytes() + metadata_and_footer_len = 8 + int.from_bytes(v[-8:][:4], "little") + path.write_bytes(v[:-metadata_and_footer_len] + b"PAR1") + + remove_metadata(path) + assert_frame_equal(lf.collect(streaming=streaming), df)