Skip to content

Commit

Permalink
perf: Cache schema resolve back to DSL (#17610)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jul 13, 2024
1 parent 6816707 commit 4f0fe6c
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 33 deletions.
5 changes: 3 additions & 2 deletions crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::sync::RwLock;

use polars_core::prelude::*;
use polars_io::RowIndex;
Expand Down Expand Up @@ -123,7 +124,7 @@ impl LazyFileListReader for LazyJsonLineReader {

Ok(LazyFrame::from(DslPlan::Scan {
paths,
file_info: None,
file_info: Arc::new(RwLock::new(None)),
hive_parts: None,
predicate: None,
file_options,
Expand Down Expand Up @@ -157,7 +158,7 @@ impl LazyFileListReader for LazyJsonLineReader {

Ok(LazyFrame::from(DslPlan::Scan {
paths: self.paths,
file_info: None,
file_info: Arc::new(RwLock::new(None)),
hive_parts: None,
predicate: None,
file_options,
Expand Down
10 changes: 6 additions & 4 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::RwLock;

use polars_core::prelude::*;
#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
use polars_io::cloud::CloudOptions;
Expand Down Expand Up @@ -57,7 +59,7 @@ impl DslBuilder {

Ok(DslPlan::Scan {
paths: Arc::new([]),
file_info: Some(file_info),
file_info: Arc::new(RwLock::new(Some(file_info))),
hive_parts: None,
predicate: None,
file_options,
Expand Down Expand Up @@ -103,7 +105,7 @@ impl DslBuilder {
};
Ok(DslPlan::Scan {
paths,
file_info: None,
file_info: Arc::new(RwLock::new(None)),
hive_parts: None,
predicate: None,
file_options: options,
Expand Down Expand Up @@ -137,7 +139,7 @@ impl DslBuilder {

Ok(DslPlan::Scan {
paths,
file_info: None,
file_info: Arc::new(RwLock::new(None)),
hive_parts: None,
file_options: FileScanOptions {
with_columns: None,
Expand Down Expand Up @@ -192,7 +194,7 @@ impl DslBuilder {
};
Ok(DslPlan::Scan {
paths,
file_info: None,
file_info: Arc::new(RwLock::new(None)),
hive_parts: None,
file_options: options,
predicate: None,
Expand Down
68 changes: 44 additions & 24 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,23 @@ pub fn to_alp_impl(
FileScan::Anonymous { .. } => paths,
};

let mut file_info = if let Some(file_info) = file_info {
file_info
let file_info_read = file_info.read().unwrap();

// leading `_` as clippy doesn't understand that you don't want to read from a lock guard
// if you want to keep it alive.
let mut _file_info_write: Option<_>;
let mut resolved_file_info = if let Some(file_info) = &*file_info_read {
_file_info_write = None;
let out = file_info.clone();
drop(file_info_read);
out
} else {
// Lock so that we don't resolve the same schema in parallel.
drop(file_info_read);

// Set write lock and keep that lock until all fields in `file_info` are resolved.
_file_info_write = Some(file_info.write().unwrap());

match &mut scan_type {
#[cfg(feature = "parquet")]
FileScan::Parquet {
Expand Down Expand Up @@ -166,7 +180,7 @@ pub fn to_alp_impl(
let hive_parts = if hive_parts.is_some() {
hive_parts
} else if file_options.hive_options.enabled.unwrap()
&& file_info.reader_schema.is_some()
&& resolved_file_info.reader_schema.is_some()
{
#[allow(unused_assignments)]
let mut owned = None;
Expand All @@ -175,7 +189,7 @@ pub fn to_alp_impl(
paths.as_ref(),
file_options.hive_options.hive_start_idx,
file_options.hive_options.schema.clone(),
match file_info.reader_schema.as_ref().unwrap() {
match resolved_file_info.reader_schema.as_ref().unwrap() {
Either::Left(v) => {
owned = Some(Schema::from(v));
owned.as_ref().unwrap()
Expand All @@ -188,47 +202,53 @@ pub fn to_alp_impl(
None
};

if let Some(ref hive_parts) = hive_parts {
let hive_schema = hive_parts[0].schema();
file_info.update_schema_with_hive_schema(hive_schema.clone());
}
// Only if we have a writing file handle we must resolve hive partitions
// update schema's etc.
if let Some(lock) = &mut _file_info_write {
if let Some(ref hive_parts) = hive_parts {
let hive_schema = hive_parts[0].schema();
resolved_file_info.update_schema_with_hive_schema(hive_schema.clone());
}

if let Some(ref file_path_col) = file_options.include_file_paths {
let schema = Arc::make_mut(&mut file_info.schema);
if let Some(ref file_path_col) = file_options.include_file_paths {
let schema = Arc::make_mut(&mut resolved_file_info.schema);

if schema.contains(file_path_col) {
polars_bail!(
Duplicate: r#"column name for file paths "{}" conflicts with column name from file"#,
file_path_col
);
}

if schema.contains(file_path_col) {
polars_bail!(
Duplicate: r#"column name for file paths "{}" conflicts with column name from file"#,
file_path_col
);
schema.insert_at_index(
schema.len(),
file_path_col.as_ref().into(),
DataType::String,
)?;
}

schema.insert_at_index(
schema.len(),
file_path_col.as_ref().into(),
DataType::String,
)?;
**lock = Some(resolved_file_info.clone());
}

file_options.with_columns = if file_info.reader_schema.is_some() {
file_options.with_columns = if resolved_file_info.reader_schema.is_some() {
maybe_init_projection_excluding_hive(
file_info.reader_schema.as_ref().unwrap(),
resolved_file_info.reader_schema.as_ref().unwrap(),
hive_parts.as_ref().map(|x| &x[0]),
)
} else {
None
};

if let Some(row_index) = &file_options.row_index {
let schema = Arc::make_mut(&mut file_info.schema);
let schema = Arc::make_mut(&mut resolved_file_info.schema);
*schema = schema
.new_inserting_at_index(0, row_index.name.as_ref().into(), IDX_DTYPE)
.unwrap();
}

IR::Scan {
paths,
file_info,
file_info: resolved_file_info,
hive_parts,
output_schema: None,
predicate: predicate.map(|expr| to_expr_ir(expr, expr_arena)),
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-plan/src/plans/conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod scans;
mod stack_opt;

use std::borrow::Cow;
use std::sync::RwLock;

pub use dsl_to_ir::*;
pub use expr_to_ir::*;
Expand Down Expand Up @@ -52,7 +53,7 @@ impl IR {
file_options: options,
} => DslPlan::Scan {
paths,
file_info: Some(file_info),
file_info: Arc::new(RwLock::new(Some(file_info))),
hive_parts,
predicate: predicate.map(|e| e.to_expr(expr_arena)),
scan_type,
Expand Down
8 changes: 6 additions & 2 deletions crates/polars-plan/src/plans/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt;
use std::fmt::Debug;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use hive::HivePartitions;
use polars_core::prelude::*;
Expand Down Expand Up @@ -80,7 +80,11 @@ pub enum DslPlan {
Scan {
paths: Arc<[PathBuf]>,
// Option as this is mostly materialized on the IR phase.
file_info: Option<FileInfo>,
// During conversion we update the value in the DSL as well
// This is to cater to use cases where parts of a `LazyFrame`
// are used as base of different queries in a loop. That way
// the expensive schema resolving is cached.
file_info: Arc<RwLock<Option<FileInfo>>>,
hive_parts: Option<Arc<[HivePartitions]>>,
predicate: Option<Expr>,
file_options: FileScanOptions,
Expand Down

0 comments on commit 4f0fe6c

Please sign in to comment.