Skip to content

Commit

Permalink
fix many errors in new_multifile
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Jan 23, 2025
1 parent 4d0ee89 commit aae525d
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 97 deletions.
55 changes: 27 additions & 28 deletions crates/polars-mem-engine/src/executors/multi_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ fn source_to_exec(
if allow_missing_columns && !is_first_file {
options.schema.take();
}
if !is_first_file {
file_info.row_estimation.0.take();
}

Box::new(CsvExec {
sources: source,
Expand Down Expand Up @@ -240,23 +243,8 @@ impl MultiScanExec {
}
}

// Remove the hive columns for each file load.
let mut file_with_columns = self.file_options.with_columns.take();
if self.hive_parts.is_some() {
if let Some(with_columns) = &self.file_options.with_columns {
file_with_columns = Some(
with_columns
.iter()
.filter(|&c| !hive_column_set.contains(c))
.cloned()
.collect(),
);
}
}

let allow_missing_columns = self.file_options.allow_missing_columns;
self.file_options.allow_missing_columns = false;
let mut row_index = self.file_options.row_index.take();
let slice = self.file_options.slice.take();

let mut first_slice_file = None;
Expand All @@ -273,21 +261,32 @@ impl MultiScanExec {
}),
};

let final_per_source_schema = &self.file_info.schema;
let file_output_schema = if let Some(file_with_columns) = file_with_columns.as_ref() {
let mut schema = final_per_source_schema.try_project(file_with_columns.as_ref())?;
let mut file_with_columns = self.file_options.with_columns.take();
let mut row_index = self.file_options.row_index.take();

let mut final_per_source_schema = Cow::Borrowed(self.file_info.schema.as_ref());
if let Some(with_columns) = file_with_columns.as_ref() {
final_per_source_schema
.to_mut()
.try_project(with_columns.as_ref())
.unwrap();
}

if let Some(v) = include_file_paths.clone() {
schema.extend([(v, DataType::String)]);
// Remove the hive columns for each file load.
if self.hive_parts.is_some() {
if let Some(with_columns) = &file_with_columns {
file_with_columns = Some(
with_columns
.iter()
.filter(|&c| !hive_column_set.contains(c))
.cloned()
.collect(),
);
}

Arc::new(schema)
} else {
final_per_source_schema.clone()
};
}

if slice.is_some_and(|x| x.1 == 0) {
return Ok(DataFrame::empty_with_schema(final_per_source_schema));
return Ok(DataFrame::empty_with_schema(&final_per_source_schema));
}

let mut missing_columns = Vec::new();
Expand Down Expand Up @@ -497,12 +496,12 @@ impl MultiScanExec {
}

// Project to ensure that all DataFrames have the proper order.
df = df.select(file_output_schema.iter_names().cloned())?;
df = df.select(final_per_source_schema.iter_names().cloned())?;
dfs.push(df);
}

if dfs.is_empty() {
Ok(DataFrame::empty_with_schema(final_per_source_schema))
Ok(DataFrame::empty_with_schema(&final_per_source_schema))
} else {
Ok(accumulate_dataframes_vertical_unchecked(dfs))
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-mem-engine/src/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl ScanExec for CsvExec {

fn num_unfiltered_rows(&mut self) -> PolarsResult<IdxSize> {
let (lb, ub) = self.file_info.row_estimation;
if lb.is_none_or(|lb| lb != ub) {
if lb.is_some_and(|lb| lb == ub) {
return Ok(ub as IdxSize);
}

Expand Down
3 changes: 2 additions & 1 deletion crates/polars-plan/src/plans/optimizer/cache_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub(super) fn set_cache_states(
scratch: &mut Vec<Node>,
expr_eval: ExprEval<'_>,
verbose: bool,
new_streaming: bool,
) -> PolarsResult<()> {
let mut stack = Vec::with_capacity(4);
let mut names_scratch = vec![];
Expand Down Expand Up @@ -289,7 +290,7 @@ pub(super) fn set_cache_states(
// and finally remove that last projection and stitch the subplan
// back to the cache node again
if !cache_schema_and_children.is_empty() {
let mut proj_pd = ProjectionPushDown::new();
let mut proj_pd = ProjectionPushDown::new(new_streaming);
let mut pred_pd = PredicatePushDown::new(expr_eval).block_at_cache(false);
for (_cache_id, v) in cache_schema_and_children {
// # CHECK IF WE NEED TO REMOVE CACHES
Expand Down
12 changes: 10 additions & 2 deletions crates/polars-plan/src/plans/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub fn optimize(

// Should be run before predicate pushdown.
if opt_state.projection_pushdown() {
let mut projection_pushdown_opt = ProjectionPushDown::new();
let mut projection_pushdown_opt = ProjectionPushDown::new(opt_state.new_streaming());
let alp = lp_arena.take(lp_top);
let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
lp_arena.replace(lp_top, alp);
Expand Down Expand Up @@ -206,7 +206,15 @@ pub fn optimize(

if members.has_joins_or_unions && members.has_cache && _cse_plan_changed {
// We only want to run this on cse inserted caches
cache_states::set_cache_states(lp_top, lp_arena, expr_arena, scratch, expr_eval, verbose)?;
cache_states::set_cache_states(
lp_top,
lp_arena,
expr_arena,
scratch,
expr_eval,
verbose,
opt_state.new_streaming(),
)?;
}

// This one should run (nearly) last as this modifies the projections
Expand Down
137 changes: 73 additions & 64 deletions crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,15 @@ fn update_scan_schema(

pub struct ProjectionPushDown {
pub is_count_star: bool,
// @TODO: This is a hack to support both pre-NEW_MULTIFILE and post-NEW_MULTIFILE.
pub in_new_streaming_engine: bool,
}

impl ProjectionPushDown {
pub(super) fn new() -> Self {
pub(super) fn new(in_new_streaming_engine: bool) -> Self {
Self {
is_count_star: false,
in_new_streaming_engine,
}
}

Expand Down Expand Up @@ -488,74 +491,80 @@ impl ProjectionPushDown {
};

if let Some(ref hive_parts) = hive_parts {
// Skip reading hive columns from the file.
let partition_schema = hive_parts.first().unwrap().schema();

file_options.with_columns = file_options.with_columns.map(|x| {
x.iter()
.filter(|x| !partition_schema.contains(x))
.cloned()
.collect::<Arc<[_]>>()
});

let mut out = Schema::with_capacity(schema.len());

// Ensure the ordering of `schema` matches what the reader will give -
// namely, if a hive column also exists in the file it will be projected
// based on its position in the file. This is extremely important for the
// new-streaming engine.

// row_index is separate
let opt_row_index_col_name = file_options
.row_index
.as_ref()
.map(|v| &v.name)
.filter(|v| schema.contains(v))
.cloned();

if let Some(name) = &opt_row_index_col_name {
out.insert_at_index(
0,
name.clone(),
schema.get(name).unwrap().clone(),
)
.unwrap();
}

// @TODO:
// This is a hack to support both pre-NEW_MULTIFILE and
// post-NEW_MULTIFILE.
if !self.in_new_streaming_engine
&& std::env::var("POLARS_NEW_MULTIFILE").as_deref() != Ok("1")
{
let df_fields_iter = &mut schema
.iter()
.filter(|fld| {
!partition_schema.contains(fld.0)
&& Some(fld.0) != opt_row_index_col_name.as_ref()
})
.map(|(a, b)| (a.clone(), b.clone()));

let hive_fields_iter = &mut partition_schema
.iter()
.map(|(a, b)| (a.clone(), b.clone()));

// `schema` also contains the `row_index` column here, so we don't need to handle it
// separately.

macro_rules! do_merge {
($schema:expr) => {
hive::merge_sorted_to_schema_order_impl(
df_fields_iter,
hive_fields_iter,
&mut out,
&|v| $schema.index_of(&v.0),
)
};
// Skip reading hive columns from the file.
let partition_schema = hive_parts.first().unwrap().schema();
file_options.with_columns = file_options.with_columns.map(|x| {
x.iter()
.filter(|x| !partition_schema.contains(x))
.cloned()
.collect::<Arc<[_]>>()
});

let mut out = Schema::with_capacity(schema.len());

// Ensure the ordering of `schema` matches what the reader will give -
// namely, if a hive column also exists in the file it will be projected
// based on its position in the file. This is extremely important for the
// new-streaming engine.

// row_index is separate
let opt_row_index_col_name = file_options
.row_index
.as_ref()
.map(|v| &v.name)
.filter(|v| schema.contains(v))
.cloned();

if let Some(name) = &opt_row_index_col_name {
out.insert_at_index(
0,
name.clone(),
schema.get(name).unwrap().clone(),
)
.unwrap();
}

match file_info.reader_schema.as_ref().unwrap() {
Either::Left(reader_schema) => do_merge!(reader_schema),
Either::Right(reader_schema) => do_merge!(reader_schema),
{
let df_fields_iter = &mut schema
.iter()
.filter(|fld| {
!partition_schema.contains(fld.0)
&& Some(fld.0) != opt_row_index_col_name.as_ref()
})
.map(|(a, b)| (a.clone(), b.clone()));

let hive_fields_iter = &mut partition_schema
.iter()
.map(|(a, b)| (a.clone(), b.clone()));

// `schema` also contains the `row_index` column here, so we don't need to handle it
// separately.

macro_rules! do_merge {
($schema:expr) => {
hive::merge_sorted_to_schema_order_impl(
df_fields_iter,
hive_fields_iter,
&mut out,
&|v| $schema.index_of(&v.0),
)
};
}

match file_info.reader_schema.as_ref().unwrap() {
Either::Left(reader_schema) => do_merge!(reader_schema),
Either::Right(reader_schema) => do_merge!(reader_schema),
}
}
}

schema = out;
schema = out;
}
}

if let Some(ref file_path_col) = file_options.include_file_paths {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl FileInfo {
let schema = Arc::make_mut(&mut self.schema);

for field in hive_schema.iter_fields() {
if let Ok(existing) = schema.try_get_mut(&field.name) {
if let Some(existing) = schema.get_mut(&field.name) {
*existing = field.dtype().clone();
} else {
schema
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-schema/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ impl<D> Schema<D> {
self.fields.get(name)
}

/// Get a mutable reference to the dtype of the field named `name`, or `None` if the field doesn't exist.
pub fn get_mut(&mut self, name: &str) -> Option<&mut D> {
self.fields.get_mut(name)
}

/// Get a reference to the dtype of the field named `name`, or `Err(PolarsErr)` if the field doesn't exist.
pub fn try_get(&self, name: &str) -> PolarsResult<&D> {
self.get(name)
Expand Down

0 comments on commit aae525d

Please sign in to comment.