Skip to content

Commit

Permalink
feat: Remove 'FileCacher' optimization (pola-rs#15357)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Mar 28, 2024
1 parent 7d4edb9 commit 4b0c86f
Show file tree
Hide file tree
Showing 14 changed files with 7 additions and 640 deletions.
36 changes: 2 additions & 34 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,38 +633,12 @@ impl LazyFrame {
mut self,
check_sink: bool,
) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
let file_caching = self.opt_state.file_caching && !self.opt_state.streaming;
let mut expr_arena = Arena::with_capacity(256);
let mut lp_arena = Arena::with_capacity(128);
let mut scratch = vec![];
let lp_top =
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?;

let finger_prints = if file_caching {
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
{
let mut fps = Vec::with_capacity(8);
collect_fingerprints(lp_top, &mut fps, &lp_arena, &expr_arena);
Some(fps)
}
#[cfg(not(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
)))]
{
None
}
} else {
None
};

// sink should be replaced
let no_file_sink = if check_sink {
!matches!(lp_arena.get(lp_top), ALogicalPlan::Sink { .. })
Expand All @@ -673,7 +647,7 @@ impl LazyFrame {
};
let physical_plan = create_physical_plan(lp_top, &mut lp_arena, &mut expr_arena)?;

let state = ExecutionState::with_finger_prints(finger_prints);
let state = ExecutionState::new();
Ok((state, physical_plan, no_file_sink))
}

Expand All @@ -696,13 +670,7 @@ impl LazyFrame {
/// ```
pub fn collect(self) -> PolarsResult<DataFrame> {
let (mut state, mut physical_plan, _) = self.prepare_collect(false)?;
let out = physical_plan.execute(&mut state);
#[cfg(debug_assertions)]
{
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
state.file_cache.assert_empty();
}
out
physical_plan.execute(&mut state)
}

/// Profile a LazyFrame.
Expand Down
21 changes: 1 addition & 20 deletions crates/polars-lazy/src/physical_plan/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,6 @@ impl CsvExec {

impl Executor for CsvExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
#[allow(clippy::useless_asref)]
let finger_print = FileFingerPrint {
paths: Arc::new([self.path.clone()]),
predicate: self
.predicate
.as_ref()
.map(|ae| ae.as_expression().unwrap().clone()),
slice: (self.options.skip_rows, self.file_options.n_rows),
};

let profile_name = if state.has_node_timer() {
let mut ids = vec![self.path.to_string_lossy().into()];
if self.predicate.is_some() {
Expand All @@ -73,15 +63,6 @@ impl Executor for CsvExec {
Cow::Borrowed("")
};

state.record(
|| {
state
.file_cache
.read(finger_print, self.file_options.file_counter, &mut || {
self.read()
})
},
profile_name,
)
state.record(|| self.read(), profile_name)
}
}
21 changes: 1 addition & 20 deletions crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,6 @@ fn finish_index_and_dfs(

impl Executor for IpcExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
let finger_print = FileFingerPrint {
paths: Arc::clone(&self.paths),
#[allow(clippy::useless_asref)]
predicate: self
.predicate
.as_ref()
.map(|ae| ae.as_expression().unwrap().clone()),
slice: (0, self.file_options.n_rows),
};

let profile_name = if state.has_node_timer() {
let mut ids = vec![self.paths[0].to_string_lossy().into()];
if self.predicate.is_some() {
Expand All @@ -289,15 +279,6 @@ impl Executor for IpcExec {
Cow::Borrowed("")
};

state.record(
|| {
state
.file_cache
.read(finger_print, self.file_options.file_counter, &mut || {
self.read(state.verbose())
})
},
profile_name,
)
state.record(|| self.read(state.verbose()), profile_name)
}
}
2 changes: 0 additions & 2 deletions crates/polars-lazy/src/physical_plan/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ use polars_io::predicates::PhysicalIoExpr;
#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc", feature = "cse"))]
use polars_io::prelude::*;
use polars_plan::global::_set_n_rows_for_scan;
#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc", feature = "cse"))]
use polars_plan::logical_plan::FileFingerPrint;
#[cfg(feature = "ipc")]
pub(crate) use support::ConsecutiveCountState;

Expand Down
21 changes: 1 addition & 20 deletions crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,16 +379,6 @@ impl ParquetExec {

impl Executor for ParquetExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
let finger_print = FileFingerPrint {
paths: self.paths.clone(),
#[allow(clippy::useless_asref)]
predicate: self
.predicate
.as_ref()
.map(|ae| ae.as_expression().unwrap().clone()),
slice: (0, self.file_options.n_rows),
};

let profile_name = if state.has_node_timer() {
let mut ids = vec![self.paths[0].to_string_lossy().into()];
if self.predicate.is_some() {
Expand All @@ -400,15 +390,6 @@ impl Executor for ParquetExec {
Cow::Borrowed("")
};

state.record(
|| {
state
.file_cache
.read(finger_print, self.file_options.file_counter, &mut || {
self.read()
})
},
profile_name,
)
state.record(|| self.read(), profile_name)
}
}
68 changes: 0 additions & 68 deletions crates/polars-lazy/src/physical_plan/file_cache.rs

This file was deleted.

7 changes: 0 additions & 7 deletions crates/polars-lazy/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@ pub mod executors;
#[cfg(any(feature = "list_eval", feature = "pivot"))]
pub(crate) mod exotic;
pub mod expressions;
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
mod file_cache;
mod node_timer;
pub mod planner;
pub(crate) mod state;
Expand Down
66 changes: 1 addition & 65 deletions crates/polars-lazy/src/physical_plan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,7 @@ use once_cell::sync::OnceCell;
use polars_core::config::verbose;
use polars_core::prelude::*;
use polars_ops::prelude::ChunkJoinOptIds;
#[cfg(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
))]
use polars_plan::logical_plan::FileFingerPrint;

#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
use super::file_cache::FileCache;

use crate::physical_plan::node_timer::NodeTimer;

pub type JoinTuplesCache = Arc<Mutex<PlHashMap<String, ChunkJoinOptIds>>>;
Expand Down Expand Up @@ -75,14 +61,6 @@ type CachedValue = Arc<(AtomicI64, OnceCell<DataFrame>)>;
pub struct ExecutionState {
// cached by a `.cache` call and kept in memory for the duration of the plan.
df_cache: Arc<Mutex<PlHashMap<usize, CachedValue>>>,
// cache file reads until all branches got there file, then we delete it
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
pub(crate) file_cache: FileCache,
pub(super) schema_cache: RwLock<Option<SchemaRef>>,
/// Used by Window Expression to prevent redundant grouping
pub(super) group_tuples: GroupsProxyCache,
Expand All @@ -105,13 +83,6 @@ impl ExecutionState {
Self {
df_cache: Default::default(),
schema_cache: Default::default(),
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: FileCache::new(None),
group_tuples: Default::default(),
join_tuples: Default::default(),
branch_idx: 0,
Expand Down Expand Up @@ -163,13 +134,6 @@ impl ExecutionState {
pub(super) fn split(&self) -> Self {
Self {
df_cache: self.df_cache.clone(),
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: self.file_cache.clone(),
schema_cache: Default::default(),
group_tuples: Default::default(),
join_tuples: Default::default(),
Expand All @@ -185,13 +149,6 @@ impl ExecutionState {
pub(super) fn clone(&self) -> Self {
Self {
df_cache: self.df_cache.clone(),
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "json"
))]
file_cache: self.file_cache.clone(),
schema_cache: self.schema_cache.read().unwrap().clone().into(),
group_tuples: self.group_tuples.clone(),
join_tuples: self.join_tuples.clone(),
Expand All @@ -203,27 +160,6 @@ impl ExecutionState {
}
}

#[cfg(not(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
)))]
pub(crate) fn with_finger_prints(_finger_prints: Option<usize>) -> Self {
Self::new()
}
#[cfg(any(
feature = "parquet",
feature = "csv",
feature = "ipc",
feature = "json"
))]
pub(crate) fn with_finger_prints(finger_prints: Option<Vec<FileFingerPrint>>) -> Self {
let mut new = Self::new();
new.file_cache = FileCache::new(finger_prints);
new
}

pub(crate) fn set_schema(&self, schema: SchemaRef) {
let mut lock = self.schema_cache.write().unwrap();
*lock = Some(schema);
Expand Down
10 changes: 0 additions & 10 deletions crates/polars-plan/src/logical_plan/file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,6 @@ impl FileScan {
}
}

#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv", feature = "cse"))]
pub(crate) fn skip_rows(&self) -> usize {
#[allow(unreachable_patterns)]
match self {
#[cfg(feature = "csv")]
Self::Csv { options } => options.skip_rows,
_ => 0,
}
}

pub(crate) fn sort_projection(&self, _file_options: &FileScanOptions) -> bool {
match self {
#[cfg(feature = "csv")]
Expand Down
8 changes: 0 additions & 8 deletions crates/polars-plan/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,6 @@ use serde::{Deserialize, Serialize};
use strum_macros::IntoStaticStr;

use self::tree_format::{TreeFmtNode, TreeFmtVisitor};
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "csv",
feature = "cse",
feature = "json"
))]
pub use crate::logical_plan::optimizer::file_caching::FileFingerPrint;

pub type ColumnName = Arc<str>;

Expand Down
Loading

0 comments on commit 4b0c86f

Please sign in to comment.