Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove redundant projection attribute in IR::DataFrameScan #16952

Merged
merged 2 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/polars-core/src/datatypes/aliases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub type IdxType = UInt32Type;
#[cfg(feature = "bigidx")]
pub type IdxType = UInt64Type;

pub use smartstring::alias::String as SmartString;

/// This hashmap uses an IdHasher
pub type PlIdHashMap<K, V> = hashbrown::HashMap<K, V, IdBuildHasher>;

Expand Down
8 changes: 4 additions & 4 deletions crates/polars-lazy/src/physical_plan/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ fn prepare_scan_args(
/// Producer of an in memory DataFrame
pub struct DataFrameExec {
pub(crate) df: Arc<DataFrame>,
pub(crate) selection: Option<Arc<dyn PhysicalExpr>>,
pub(crate) projection: Option<Arc<[String]>>,
pub(crate) filter: Option<Arc<dyn PhysicalExpr>>,
pub(crate) projection: Option<Vec<SmartString>>,
pub(crate) predicate_has_windows: bool,
}

Expand All @@ -72,10 +72,10 @@ impl Executor for DataFrameExec {
// projection should be before selection as those are free
// TODO: this is only the case if we don't create new columns
if let Some(projection) = &self.projection {
df = df.select(projection.as_ref())?;
df = df.select(projection.as_slice())?;
}

if let Some(selection) = &self.selection {
if let Some(selection) = &self.filter {
if self.predicate_has_windows {
state.insert_has_window_function_flag()
}
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,9 @@ fn create_physical_plan_impl(
},
DataFrameScan {
df,
projection,
filter: predicate,
schema,
output_schema,
..
} => {
let mut state = ExpressionConversionState::new(true, state.expr_depth);
Expand All @@ -369,8 +369,8 @@ fn create_physical_plan_impl(
.transpose()?;
Ok(Box::new(executors::DataFrameExec {
df,
projection,
selection,
projection: output_schema.map(|s| s.iter_names().cloned().collect()),
filter: selection,
predicate_has_windows: state.has_windows,
}))
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ fn get_pipeline_node(
df: Arc::new(DataFrame::empty()),
schema: Arc::new(Schema::new()),
output_schema: None,
projection: None,
filter: None,
});

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/cse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ fn test_cse_cache_union_projection_pd() -> PolarsResult<()> {
true
},
DataFrameScan {
projection: Some(projection),
output_schema: Some(projection),
..
} => projection.as_ref().len() <= 2,
DataFrameScan { .. } => false,
Expand Down
9 changes: 4 additions & 5 deletions crates/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,11 +480,10 @@ fn test_with_column_prune() -> PolarsResult<()> {
(&lp_arena).iter(lp).for_each(|(_, lp)| {
use IR::*;
match lp {
DataFrameScan { projection, .. } => {
let projection = projection.as_ref().unwrap();
let projection = projection.as_ref();
DataFrameScan { output_schema, .. } => {
let projection = output_schema.as_ref().unwrap();
assert_eq!(projection.len(), 1);
let name = &projection[0];
let name = projection.get_at_index(0).unwrap().0;
assert_eq!(name, "c1");
},
HStack { exprs, .. } => {
Expand All @@ -503,7 +502,7 @@ fn test_with_column_prune() -> PolarsResult<()> {
assert!((&lp_arena).iter(lp).all(|(_, lp)| {
use IR::*;

matches!(lp, IR::SimpleProjection { .. } | DataFrameScan { .. })
matches!(lp, SimpleProjection { .. } | DataFrameScan { .. })
}));
assert_eq!(
q.schema().unwrap().as_ref(),
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ where
match source {
DataFrameScan {
df,
projection,
filter: selection,
output_schema,
..
Expand All @@ -67,8 +66,9 @@ where
operator_objects.push(op)
}
// projection is free
if let Some(projection) = projection {
df = df.select(projection.as_ref())?;
if let Some(schema) = output_schema {
let columns = schema.iter_names().cloned().collect::<Vec<_>>();
df = df._select_impl_unchecked(&columns)?;
}
}
Ok(Box::new(sources::DataFrameSource::from_df(df)) as Box<dyn Source>)
Expand Down
1 change: 0 additions & 1 deletion crates/polars-plan/src/logical_plan/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,6 @@ impl DslBuilder {
df: Arc::new(df),
schema,
output_schema: None,
projection: None,
filter: None,
}
.into()
Expand Down
3 changes: 0 additions & 3 deletions crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ fn empty_df() -> IR {
df: Arc::new(Default::default()),
schema: Arc::new(Default::default()),
output_schema: None,
projection: None,
filter: None,
}
}
Expand Down Expand Up @@ -203,13 +202,11 @@ pub fn to_alp_impl(
df,
schema,
output_schema,
projection,
filter: selection,
} => IR::DataFrameScan {
df,
schema,
output_schema,
projection,
filter: selection.map(|expr| to_expr_ir(expr, expr_arena)),
},
DslPlan::Select {
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-plan/src/logical_plan/conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,11 @@ impl IR {
df,
schema,
output_schema,
projection,
filter: selection,
} => DslPlan::DataFrameScan {
df,
schema,
output_schema,
projection,
filter: selection.map(|e| e.to_expr(expr_arena)),
},
IR::Select {
Expand Down
16 changes: 14 additions & 2 deletions crates/polars-plan/src/logical_plan/ir/dot.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::fmt;
use std::path::PathBuf;

use polars_core::schema::Schema;

use super::format::ExprIRSliceDisplay;
use crate::constants::UNLIMITED_CACHE;
use crate::prelude::ir::format::ColumnsDisplay;
Expand Down Expand Up @@ -227,11 +229,11 @@ impl<'a> IRDotDisplay<'a> {
},
DataFrameScan {
schema,
projection,
output_schema,
filter: selection,
..
} => {
let num_columns = NumColumns(projection.as_ref().map(|p| p.as_ref()));
let num_columns = NumColumnsSchema(output_schema.as_ref().map(|p| p.as_ref()));
let selection = selection.as_ref().map(|e| self.display_expr(e));
let selection = OptionExprIRDisplay(selection);
let total_columns = schema.len();
Expand Down Expand Up @@ -331,6 +333,7 @@ impl<'a> IRDotDisplay<'a> {
// A few utility structures for formatting
struct PathsDisplay<'a>(&'a [PathBuf]);
struct NumColumns<'a>(Option<&'a [String]>);
struct NumColumnsSchema<'a>(Option<&'a Schema>);
struct OptionExprIRDisplay<'a>(Option<ExprIRDisplay<'a>>);

impl fmt::Display for PathsDisplay<'_> {
Expand All @@ -357,6 +360,15 @@ impl fmt::Display for NumColumns<'_> {
}
}

impl fmt::Display for NumColumnsSchema<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.0 {
None => f.write_str("*"),
Some(columns) => columns.len().fmt(f),
}
}
}

impl fmt::Display for OptionExprIRDisplay<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.0 {
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/src/logical_plan/ir/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,12 @@ impl<'a> IRDisplay<'a> {
},
DataFrameScan {
schema,
projection,
output_schema,
filter: selection,
..
} => {
let total_columns = schema.len();
let n_columns = if let Some(columns) = projection {
let n_columns = if let Some(columns) = output_schema {
columns.len().to_string()
} else {
"*".to_string()
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-plan/src/logical_plan/ir/inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ impl IR {
df,
schema,
output_schema,
projection,
filter: selection,
} => {
let mut new_selection = None;
Expand All @@ -138,7 +137,6 @@ impl IR {
df: df.clone(),
schema: schema.clone(),
output_schema: output_schema.clone(),
projection: projection.clone(),
filter: new_selection,
}
},
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-plan/src/logical_plan/ir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ pub enum IR {
DataFrameScan {
df: Arc<DataFrame>,
schema: SchemaRef,
// schema of the projected file
// Schema of the projected file
// If `None`, no projection is applied
output_schema: Option<SchemaRef>,
projection: Option<Arc<[String]>>,
// Predicate to apply on the DataFrame
// All the columns required for the predicate are projected.
filter: Option<ExprIR>,
},
// Only selects columns (semantically only has row access).
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/src/logical_plan/ir/tree_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl<'a> TreeFmtNode<'a> {
Scan { .. } => ND(wh(h, &lp.describe()), vec![]),
DataFrameScan {
schema,
projection,
output_schema,
filter: selection,
..
} => ND(
Expand All @@ -201,7 +201,7 @@ impl<'a> TreeFmtNode<'a> {
&format!(
"DF {:?}\nPROJECT {}/{} COLUMNS",
schema.iter_names().take(4).collect::<Vec<_>>(),
if let Some(columns) = projection {
if let Some(columns) = output_schema {
format!("{}", columns.len())
} else {
"*".to_string()
Expand Down
4 changes: 1 addition & 3 deletions crates/polars-plan/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ pub enum DslPlan {
schema: SchemaRef,
// schema of the projected file
output_schema: Option<SchemaRef>,
projection: Option<Arc<[String]>>,
filter: Option<Expr>,
},
/// Polars' `select` operation, this can mean projection, but also full data access.
Expand Down Expand Up @@ -188,7 +187,7 @@ impl Clone for DslPlan {
Self::Filter { input, predicate } => Self::Filter { input: input.clone(), predicate: predicate.clone() },
Self::Cache { input, id, cache_hits } => Self::Cache { input: input.clone(), id: id.clone(), cache_hits: cache_hits.clone() },
Self::Scan { paths, file_info, predicate, file_options, scan_type } => Self::Scan { paths: paths.clone(), file_info: file_info.clone(), predicate: predicate.clone(), file_options: file_options.clone(), scan_type: scan_type.clone() },
Self::DataFrameScan { df, schema, output_schema, projection, filter: selection } => Self::DataFrameScan { df: df.clone(), schema: schema.clone(), output_schema: output_schema.clone(), projection: projection.clone(), filter: selection.clone() },
Self::DataFrameScan { df, schema, output_schema, filter: selection } => Self::DataFrameScan { df: df.clone(), schema: schema.clone(), output_schema: output_schema.clone(), filter: selection.clone() },
Self::Select { expr, input, options } => Self::Select { expr: expr.clone(), input: input.clone(), options: options.clone() },
Self::GroupBy { input, keys, aggs, apply, maintain_order, options } => Self::GroupBy { input: input.clone(), keys: keys.clone(), aggs: aggs.clone(), apply: apply.clone(), maintain_order: maintain_order.clone(), options: options.clone() },
Self::Join { input_left, input_right, left_on, right_on, options } => Self::Join { input_left: input_left.clone(), input_right: input_right.clone(), left_on: left_on.clone(), right_on: right_on.clone(), options: options.clone() },
Expand All @@ -214,7 +213,6 @@ impl Default for DslPlan {
df: Arc::new(df),
schema: Arc::new(schema),
output_schema: None,
projection: None,
filter: None,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ impl OptimizationRule for CountStar {
df: Arc::new(Default::default()),
schema: Arc::new(Default::default()),
output_schema: None,
projection: None,
filter: None,
};
let placeholder_node = lp_arena.add(placeholder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,13 @@ impl<'a> PredicatePushDown<'a> {
df,
schema,
output_schema,
projection,
filter: selection,
} => {
let selection = predicate_at_scan(acc_predicates, selection, expr_arena);
let lp = DataFrameScan {
df,
schema,
output_schema,
projection,
filter: selection,
};
Ok(lp)
Expand Down Expand Up @@ -390,7 +388,6 @@ impl<'a> PredicatePushDown<'a> {
df: Arc::new(df),
schema: schema.clone(),
output_schema: None,
projection: None,
filter: None,
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,21 +357,18 @@ impl ProjectionPushDown {
filter: selection,
..
} => {
let mut projection = None;
if !acc_projections.is_empty() {
output_schema = Some(Arc::new(update_scan_schema(
&acc_projections,
expr_arena,
&schema,
false,
)?));
projection = get_scan_columns(&mut acc_projections, expr_arena, None);
}
let lp = DataFrameScan {
df,
schema,
output_schema,
projection,
filter: selection,
};
Ok(lp)
Expand Down
13 changes: 5 additions & 8 deletions crates/polars-plan/src/logical_plan/visitor/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,11 @@ impl Hash for HashableEqLP<'_> {
IR::DataFrameScan {
df,
schema: _,
output_schema: _,
projection,
output_schema,
filter: selection,
} => {
(Arc::as_ptr(df) as usize).hash(state);
projection.hash(state);
output_schema.hash(state);
hash_option_expr(selection, self.expr_arena, state);
},
IR::SimpleProjection { columns, input: _ } => {
Expand Down Expand Up @@ -279,20 +278,18 @@ impl HashableEqLP<'_> {
IR::DataFrameScan {
df: dfl,
schema: _,
output_schema: _,
projection: pl,
output_schema: s_l,
filter: sl,
},
IR::DataFrameScan {
df: dfr,
schema: _,
output_schema: _,
projection: pr,
output_schema: s_r,
filter: sr,
},
) => {
Arc::as_ptr(dfl) == Arc::as_ptr(dfr)
&& pl == pr
&& s_l == s_r
&& opt_expr_ir_eq(sl, sr, self.expr_arena)
},
(
Expand Down
Loading
Loading