diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index a6d0b5360a94..f42b0ec48111 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -1,5 +1,6 @@ use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; +use std::sync::RwLock; use polars_core::prelude::*; use polars_io::RowIndex; @@ -123,7 +124,7 @@ impl LazyFileListReader for LazyJsonLineReader { Ok(LazyFrame::from(DslPlan::Scan { paths, - file_info: None, + file_info: Arc::new(RwLock::new(None)), hive_parts: None, predicate: None, file_options, @@ -157,7 +158,7 @@ impl LazyFileListReader for LazyJsonLineReader { Ok(LazyFrame::from(DslPlan::Scan { paths: self.paths, - file_info: None, + file_info: Arc::new(RwLock::new(None)), hive_parts: None, predicate: None, file_options, diff --git a/crates/polars-plan/src/plans/builder_dsl.rs b/crates/polars-plan/src/plans/builder_dsl.rs index 39c24fb711e8..c180842daff5 100644 --- a/crates/polars-plan/src/plans/builder_dsl.rs +++ b/crates/polars-plan/src/plans/builder_dsl.rs @@ -1,3 +1,5 @@ +use std::sync::RwLock; + use polars_core::prelude::*; #[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))] use polars_io::cloud::CloudOptions; @@ -57,7 +59,7 @@ impl DslBuilder { Ok(DslPlan::Scan { paths: Arc::new([]), - file_info: Some(file_info), + file_info: Arc::new(RwLock::new(Some(file_info))), hive_parts: None, predicate: None, file_options, @@ -103,7 +105,7 @@ impl DslBuilder { }; Ok(DslPlan::Scan { paths, - file_info: None, + file_info: Arc::new(RwLock::new(None)), hive_parts: None, predicate: None, file_options: options, @@ -137,7 +139,7 @@ impl DslBuilder { Ok(DslPlan::Scan { paths, - file_info: None, + file_info: Arc::new(RwLock::new(None)), hive_parts: None, file_options: FileScanOptions { with_columns: None, @@ -192,7 +194,7 @@ impl DslBuilder { }; Ok(DslPlan::Scan { paths, - file_info: None, + file_info: Arc::new(RwLock::new(None)), hive_parts: None, file_options: options, predicate: None, diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 65b8e9473ca2..70630923091e 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -117,9 +117,23 @@ pub fn to_alp_impl( FileScan::Anonymous { .. } => paths, }; - let mut file_info = if let Some(file_info) = file_info { - file_info + let file_info_read = file_info.read().unwrap(); + + // leading `_` as clippy doesn't understand that you don't want to read from a lock guard + // if you want to keep it alive. + let mut _file_info_write: Option<_>; + let mut resolved_file_info = if let Some(file_info) = &*file_info_read { + _file_info_write = None; + let out = file_info.clone(); + drop(file_info_read); + out } else { + // Lock so that we don't resolve the same schema in parallel. + drop(file_info_read); + + // Set write lock and keep that lock until all fields in `file_info` are resolved. + _file_info_write = Some(file_info.write().unwrap()); + match &mut scan_type { #[cfg(feature = "parquet")] FileScan::Parquet { @@ -166,7 +180,7 @@ pub fn to_alp_impl( let hive_parts = if hive_parts.is_some() { hive_parts } else if file_options.hive_options.enabled.unwrap() - && file_info.reader_schema.is_some() + && resolved_file_info.reader_schema.is_some() { #[allow(unused_assignments)] let mut owned = None; @@ -175,7 +189,7 @@ pub fn to_alp_impl( paths.as_ref(), file_options.hive_options.hive_start_idx, file_options.hive_options.schema.clone(), - match file_info.reader_schema.as_ref().unwrap() { + match resolved_file_info.reader_schema.as_ref().unwrap() { Either::Left(v) => { owned = Some(Schema::from(v)); owned.as_ref().unwrap() @@ -188,31 +202,37 @@ pub fn to_alp_impl( None }; - if let Some(ref hive_parts) = hive_parts { - let hive_schema = hive_parts[0].schema(); - file_info.update_schema_with_hive_schema(hive_schema.clone()); - } + // Only if we have a writing file handle we must resolve hive partitions + // update schema's etc. + if let Some(lock) = &mut _file_info_write { + if let Some(ref hive_parts) = hive_parts { + let hive_schema = hive_parts[0].schema(); + resolved_file_info.update_schema_with_hive_schema(hive_schema.clone()); + } - if let Some(ref file_path_col) = file_options.include_file_paths { - let schema = Arc::make_mut(&mut file_info.schema); + if let Some(ref file_path_col) = file_options.include_file_paths { + let schema = Arc::make_mut(&mut resolved_file_info.schema); + + if schema.contains(file_path_col) { + polars_bail!( + Duplicate: r#"column name for file paths "{}" conflicts with column name from file"#, + file_path_col + ); + } - if schema.contains(file_path_col) { - polars_bail!( - Duplicate: r#"column name for file paths "{}" conflicts with column name from file"#, - file_path_col - ); + schema.insert_at_index( + schema.len(), + file_path_col.as_ref().into(), + DataType::String, + )?; } - schema.insert_at_index( - schema.len(), - file_path_col.as_ref().into(), - DataType::String, - )?; + **lock = Some(resolved_file_info.clone()); } - file_options.with_columns = if file_info.reader_schema.is_some() { + file_options.with_columns = if resolved_file_info.reader_schema.is_some() { maybe_init_projection_excluding_hive( - file_info.reader_schema.as_ref().unwrap(), + resolved_file_info.reader_schema.as_ref().unwrap(), hive_parts.as_ref().map(|x| &x[0]), ) } else { @@ -220,7 +240,7 @@ pub fn to_alp_impl( }; if let Some(row_index) = &file_options.row_index { - let schema = Arc::make_mut(&mut file_info.schema); + let schema = Arc::make_mut(&mut resolved_file_info.schema); *schema = schema .new_inserting_at_index(0, row_index.name.as_ref().into(), IDX_DTYPE) .unwrap(); @@ -228,7 +248,7 @@ pub fn to_alp_impl( IR::Scan { paths, - file_info, + file_info: resolved_file_info, hive_parts, output_schema: None, predicate: predicate.map(|expr| to_expr_ir(expr, expr_arena)), diff --git a/crates/polars-plan/src/plans/conversion/mod.rs b/crates/polars-plan/src/plans/conversion/mod.rs index d0d6a41e9fb9..50c28984b188 100644 --- a/crates/polars-plan/src/plans/conversion/mod.rs +++ b/crates/polars-plan/src/plans/conversion/mod.rs @@ -8,6 +8,7 @@ mod scans; mod stack_opt; use std::borrow::Cow; +use std::sync::RwLock; pub use dsl_to_ir::*; pub use expr_to_ir::*; @@ -52,7 +53,7 @@ impl IR { file_options: options, } => DslPlan::Scan { paths, - file_info: Some(file_info), + file_info: Arc::new(RwLock::new(Some(file_info))), hive_parts, predicate: predicate.map(|e| e.to_expr(expr_arena)), scan_type, diff --git a/crates/polars-plan/src/plans/mod.rs b/crates/polars-plan/src/plans/mod.rs index c92f3b8bc48b..1b55b6c3ba81 100644 --- a/crates/polars-plan/src/plans/mod.rs +++ b/crates/polars-plan/src/plans/mod.rs @@ -1,7 +1,7 @@ use std::fmt; use std::fmt::Debug; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use hive::HivePartitions; use polars_core::prelude::*; @@ -80,7 +80,11 @@ pub enum DslPlan { Scan { paths: Arc<[PathBuf]>, // Option as this is mostly materialized on the IR phase. - file_info: Option, + // During conversion we update the value in the DSL as well + // This is to cater to use cases where parts of a `LazyFrame` + // are used as base of different queries in a loop. That way + // the expensive schema resolving is cached. + file_info: Arc>>, hive_parts: Option>, predicate: Option, file_options: FileScanOptions,