Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): eliminate ProjectionExprs and handle CSE by stacking extra columns #16682

Merged
merged 5 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ impl LazyFrame {
ProjectionOptions {
run_parallel: true,
duplicate_check: true,
should_broadcast: true,
},
)
}
Expand All @@ -881,6 +882,7 @@ impl LazyFrame {
ProjectionOptions {
run_parallel: false,
duplicate_check: true,
should_broadcast: true,
},
)
}
Expand Down Expand Up @@ -1281,6 +1283,7 @@ impl LazyFrame {
ProjectionOptions {
run_parallel: false,
duplicate_check: true,
should_broadcast: true,
},
)
.build();
Expand Down Expand Up @@ -1308,6 +1311,7 @@ impl LazyFrame {
ProjectionOptions {
run_parallel: true,
duplicate_check: true,
should_broadcast: true,
},
)
}
Expand All @@ -1320,6 +1324,7 @@ impl LazyFrame {
ProjectionOptions {
run_parallel: false,
duplicate_check: true,
should_broadcast: true,
},
)
}
Expand Down
17 changes: 2 additions & 15 deletions crates/polars-lazy/src/physical_plan/executors/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use super::*;
/// and a multiple PhysicalExpressions (create the output Series)
pub struct ProjectionExec {
pub(crate) input: Box<dyn Executor>,
pub(crate) cse_exprs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) expr: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) has_windows: bool,
pub(crate) input_schema: SchemaRef,
Expand All @@ -33,7 +32,6 @@ impl ProjectionExec {
let iter = chunks.into_par_iter().map(|mut df| {
let selected_cols = evaluate_physical_expressions(
&mut df,
&self.cse_exprs,
&self.expr,
state,
self.has_windows,
Expand All @@ -50,7 +48,6 @@ impl ProjectionExec {
#[allow(clippy::let_and_return)]
let selected_cols = evaluate_physical_expressions(
&mut df,
&self.cse_exprs,
&self.expr,
state,
self.has_windows,
Expand Down Expand Up @@ -79,11 +76,7 @@ impl Executor for ProjectionExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
if self.cse_exprs.is_empty() {
eprintln!("run ProjectionExec");
} else {
eprintln!("run ProjectionExec with {} CSE", self.cse_exprs.len())
};
eprintln!("run ProjectionExec");
}
}
let df = self.input.execute(state)?;
Expand All @@ -92,13 +85,7 @@ impl Executor for ProjectionExec {
let by = self
.expr
.iter()
.map(|s| {
profile_name(
s.as_ref(),
self.input_schema.as_ref(),
!self.cse_exprs.is_empty(),
)
})
.map(|s| profile_name(s.as_ref(), self.input_schema.as_ref()))
.collect::<PolarsResult<Vec<_>>>()?;
let name = comma_delimited("select".to_string(), &by);
Cow::Owned(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,10 @@ use super::*;
pub(super) fn profile_name(
s: &dyn PhysicalExpr,
input_schema: &Schema,
has_cse: bool,
) -> PolarsResult<SmartString> {
match (has_cse, s.to_field(input_schema)) {
(false, Err(e)) => Err(e),
(true, Err(_)) => Ok(expr_to_leaf_column_names_iter(s.as_expression().unwrap())
.map(|n| n.as_ref().into())
.next()
.unwrap()),
(_, Ok(fld)) => Ok(fld.name),
match s.to_field(input_schema) {
Err(e) => Err(e),
Ok(fld) => Ok(fld.name),
}
}

Expand Down Expand Up @@ -214,7 +209,6 @@ fn run_exprs_seq(

pub(super) fn evaluate_physical_expressions(
df: &mut DataFrame,
cse_exprs: &[Arc<dyn PhysicalExpr>],
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
has_windows: bool,
Expand All @@ -228,36 +222,7 @@ pub(super) fn evaluate_physical_expressions(
run_exprs_seq
};

let cse_expr_runner = if has_windows {
execute_projection_cached_window_fns
} else if run_parallel && cse_exprs.len() > 1 {
run_exprs_par
} else {
run_exprs_seq
};

let selected_columns = if !cse_exprs.is_empty() {
let tmp_cols = cse_expr_runner(df, cse_exprs, state)?;
if has_windows {
state.clear_window_expr_cache();
}

let width = df.width();

// put the cse expressions at the end
unsafe {
df.hstack_mut_unchecked(&tmp_cols);
}
let result = expr_runner(df, exprs, state)?;
// restore original df
unsafe {
df.get_columns_mut().truncate(width);
}

result
} else {
expr_runner(df, exprs, state)?
};
let selected_columns = expr_runner(df, exprs, state)?;

if has_windows {
state.clear_window_expr_cache();
Expand Down
59 changes: 42 additions & 17 deletions crates/polars-lazy/src/physical_plan/executors/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use super::*;
pub struct StackExec {
pub(crate) input: Box<dyn Executor>,
pub(crate) has_windows: bool,
pub(crate) cse_exprs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) exprs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) input_schema: SchemaRef,
pub(crate) options: ProjectionOptions,
Expand All @@ -29,13 +28,31 @@ impl StackExec {
let iter = chunks.into_par_iter().map(|mut df| {
let res = evaluate_physical_expressions(
&mut df,
&self.cse_exprs,
&self.exprs,
state,
self.has_windows,
self.options.run_parallel,
)?;
df._add_columns(res, schema)?;
if !self.options.should_broadcast {
for column in res.iter() {
// Safety: this case only appears as a
// result of CSE optimization, and the
// usage there produces new, unique column
// names.
// It is immediately followed by a
// projection which pulls out the possibly
// mismatching column lengths.
debug_assert!(
column.name().starts_with("__POLARS_CSER_0x"),
"non-broadcasting hstack should only be used for CSE columns"
);
unsafe {
df.with_column_unchecked(column.clone());
}
}
} else {
df._add_columns(res, schema)?;
}
Ok(df)
});

Expand All @@ -46,13 +63,31 @@ impl StackExec {
else {
let res = evaluate_physical_expressions(
&mut df,
&self.cse_exprs,
&self.exprs,
state,
self.has_windows,
self.options.run_parallel,
)?;
df._add_columns(res, schema)?;
if !self.options.should_broadcast {
for column in res.iter() {
wence- marked this conversation as resolved.
Show resolved Hide resolved
// Safety: this case only appears as a
// result of CSE optimization, and the
// usage there produces new, unique column
// names.
// It is immediately followed by a
// projection which pulls out the possibly
// mismatching column lengths.
debug_assert!(
column.name().starts_with("__POLARS_CSER_0x"),
"non-broadcasting hstack should only be used for CSE columns"
);
unsafe {
df.with_column_unchecked(column.clone());
}
}
} else {
df._add_columns(res, schema)?;
}
df
};

Expand All @@ -68,11 +103,7 @@ impl Executor for StackExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
if self.cse_exprs.is_empty() {
eprintln!("run StackExec");
} else {
eprintln!("run StackExec with {} CSE", self.cse_exprs.len());
};
eprintln!("run StackExec");
}
}
let df = self.input.execute(state)?;
Expand All @@ -81,13 +112,7 @@ impl Executor for StackExec {
let by = self
.exprs
.iter()
.map(|s| {
profile_name(
s.as_ref(),
self.input_schema.as_ref(),
!self.cse_exprs.is_empty(),
)
})
.map(|s| profile_name(s.as_ref(), self.input_schema.as_ref()))
.collect::<PolarsResult<Vec<_>>>()?;
let name = comma_delimited("with_column".to_string(), &by);
Cow::Owned(name)
Expand Down
35 changes: 5 additions & 30 deletions crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,28 +315,16 @@ fn create_physical_plan_impl(
state.expr_depth,
);

let streamable = if expr.has_sub_exprs() {
false
} else {
all_streamable(&expr, expr_arena, Context::Default)
};
let streamable = all_streamable(&expr, expr_arena, Context::Default);
let phys_expr = create_physical_expressions_from_irs(
expr.default_exprs(),
Context::Default,
expr_arena,
Some(&input_schema),
&mut state,
)?;
let cse_expr = create_physical_expressions_from_irs(
expr.cse_exprs(),
&expr,
Context::Default,
expr_arena,
Some(&input_schema),
&mut state,
)?;
Ok(Box::new(executors::ProjectionExec {
input,
cse_exprs: cse_expr,
expr: phys_expr,
has_windows: state.has_windows,
input_schema,
Expand All @@ -353,7 +341,7 @@ fn create_physical_plan_impl(
} => {
let select = Select {
input,
expr: exprs.into(),
expr: exprs,
schema,
options: Default::default(),
};
Expand Down Expand Up @@ -581,27 +569,15 @@ fn create_physical_plan_impl(
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?;

let streamable = if exprs.has_sub_exprs() {
false
} else {
all_streamable(&exprs, expr_arena, Context::Default)
};
let streamable = all_streamable(&exprs, expr_arena, Context::Default);

let mut state = ExpressionConversionState::new(
POOL.current_num_threads() > exprs.len(),
state.expr_depth,
);

let cse_exprs = create_physical_expressions_from_irs(
exprs.cse_exprs(),
Context::Default,
expr_arena,
Some(&input_schema),
&mut state,
)?;

let phys_exprs = create_physical_expressions_from_irs(
exprs.default_exprs(),
&exprs,
Context::Default,
expr_arena,
Some(&input_schema),
Expand All @@ -610,7 +586,6 @@ fn create_physical_plan_impl(
Ok(Box::new(executors::StackExec {
input,
has_windows: state.has_windows,
cse_exprs,
exprs: phys_exprs,
input_schema,
options,
Expand Down
Loading