diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 0a8f130dfe51..ab172e4ddc47 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -1121,7 +1121,7 @@ impl DataFrame { series: Series, ) -> PolarsResult<&mut Self> { polars_ensure!( - series.len() == self.height(), + self.width() == 0 || series.len() == self.height(), ShapeMismatch: "unable to add a column of length {} to a DataFrame of height {}", series.len(), self.height(), ); diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 26442980b05f..cd869b96638c 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -158,12 +158,32 @@ pub(super) fn array_iter_to_series( /// num_rows equals the height of the df when the df height is non-zero. pub(crate) fn materialize_hive_partitions( df: &mut DataFrame, + reader_schema: &ArrowSchema, hive_partition_columns: Option<&[Series]>, num_rows: usize, ) { if let Some(hive_columns) = hive_partition_columns { - for s in hive_columns { - unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) }; + let Some(first) = hive_columns.first() else { + return; + }; + + if reader_schema.index_of(first.name()).is_some() { + // Insert these hive columns in the order they are stored in the file. + for s in hive_columns { + let i = match df.get_columns().binary_search_by_key( + &reader_schema.index_of(s.name()).unwrap_or(usize::MAX), + |s| reader_schema.index_of(s.name()).unwrap_or(usize::MIN), + ) { + Ok(i) => i, + Err(i) => i, + }; + + df.insert_column(i, s.new_from_index(0, num_rows)).unwrap(); + } + } else { + for s in hive_columns { + unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) }; + } } } } @@ -294,7 +314,12 @@ fn rg_to_dfs_optionally_par_over_columns( df.with_row_index_mut(&rc.name, Some(*previous_row_count + rc.offset)); } - materialize_hive_partitions(&mut df, hive_partition_columns, projection_height); + materialize_hive_partitions( + &mut df, + schema.as_ref(), + hive_partition_columns, + projection_height, + ); apply_predicate(&mut df, predicate, true)?; *previous_row_count += current_row_count; @@ -382,7 +407,12 @@ fn rg_to_dfs_par_over_rg( df.with_row_index_mut(&rc.name, Some(row_count_start as IdxSize + rc.offset)); } - materialize_hive_partitions(&mut df, hive_partition_columns, projection_height); + materialize_hive_partitions( + &mut df, + schema.as_ref(), + hive_partition_columns, + projection_height, + ); apply_predicate(&mut df, predicate, false)?; Ok(Some(df)) diff --git a/crates/polars-io/src/parquet/read/utils.rs b/crates/polars-io/src/parquet/read/utils.rs index f3b79f3cd756..d7a2faa37a40 100644 --- a/crates/polars-io/src/parquet/read/utils.rs +++ b/crates/polars-io/src/parquet/read/utils.rs @@ -24,7 +24,7 @@ pub fn materialize_empty_df( .unwrap(); } - materialize_hive_partitions(&mut df, hive_partition_columns, 0); + materialize_hive_partitions(&mut df, reader_schema, hive_partition_columns, 0); df } diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 0d94c775c984..0836e0ca305f 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -1,5 +1,7 @@ +use arrow::datatypes::ArrowSchemaRef; +use either::Either; use expr_expansion::{is_regex_projection, rewrite_projections}; -use hive::hive_partitions_from_paths; +use hive::{hive_partitions_from_paths, HivePartitions}; use super::stack_opt::ConversionOptimizer; use super::*; @@ -86,7 +88,7 @@ pub fn to_alp_impl( paths, predicate, mut scan_type, - file_options, + mut file_options, } => { let mut file_info = if let Some(file_info) = file_info { file_info @@ -136,20 +138,42 @@ pub fn to_alp_impl( let hive_parts = if hive_parts.is_some() { hive_parts - } else if file_options.hive_options.enabled.unwrap() { + } else if file_options.hive_options.enabled.unwrap() + && file_info.reader_schema.is_some() + { + #[allow(unused_assignments)] + let mut owned = None; + hive_partitions_from_paths( paths.as_ref(), file_options.hive_options.hive_start_idx, file_options.hive_options.schema.clone(), + match file_info.reader_schema.as_ref().unwrap() { + Either::Left(v) => { + owned = Some(Schema::from(v)); + owned.as_ref().unwrap() + }, + Either::Right(v) => v.as_ref(), + }, )? } else { None }; if let Some(ref hive_parts) = hive_parts { - file_info.update_schema_with_hive_schema(hive_parts[0].schema().clone())?; + let hive_schema = hive_parts[0].schema(); + file_info.update_schema_with_hive_schema(hive_schema.clone()); } + file_options.with_columns = if file_info.reader_schema.is_some() { + maybe_init_projection_excluding_hive( + file_info.reader_schema.as_ref().unwrap(), + hive_parts.as_ref().map(|x| &x[0]), + ) + } else { + None + }; + if let Some(row_index) = &file_options.row_index { let schema = Arc::make_mut(&mut file_info.schema); *schema = schema @@ -802,3 +826,34 @@ where }) .collect() } + +pub(crate) fn maybe_init_projection_excluding_hive( + reader_schema: &Either, + hive_parts: Option<&HivePartitions>, +) -> Option> { + // Update `with_columns` with a projection so that hive columns aren't loaded from the + // file + let hive_parts = hive_parts?; + + let hive_schema = hive_parts.schema(); + + let (first_hive_name, _) = hive_schema.get_at_index(0)?; + + let names = match reader_schema { + Either::Left(ref v) => { + let names = v.get_names(); + names.contains(&first_hive_name.as_str()).then_some(names) + }, + Either::Right(ref v) => v.contains(first_hive_name.as_str()).then(|| v.get_names()), + }; + + let names = names?; + + Some( + names + .iter() + .filter(|x| !hive_schema.contains(x)) + .map(ToString::to_string) + .collect::>(), + ) +} diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index e0c2b0282d71..44937dd3ecef 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -17,11 +17,10 @@ pub struct HivePartitions { } impl HivePartitions { - pub fn get_projection_schema_and_indices>( + pub fn get_projection_schema_and_indices( &self, - names: &[T], + names: &PlHashSet, ) -> (SchemaRef, Vec) { - let names = names.iter().map(T::as_ref).collect::>(); let mut out_schema = Schema::with_capacity(self.stats.schema().len()); let mut out_indices = Vec::with_capacity(self.stats.column_stats().len()); @@ -66,6 +65,7 @@ pub fn hive_partitions_from_paths( paths: &[PathBuf], hive_start_idx: usize, schema: Option, + reader_schema: &Schema, ) -> PolarsResult>> { let Some(path) = paths.first() else { return Ok(None); @@ -88,14 +88,30 @@ pub fn hive_partitions_from_paths( }}; } - let hive_schema = if let Some(v) = schema { - v + let hive_schema = if let Some(ref schema) = schema { + Arc::new(get_hive_parts_iter!(path_string).map(|(name, _)| { + let Some(dtype) = schema.get(name) else { + polars_bail!( + SchemaFieldNotFound: + "path contains column not present in the given Hive schema: {:?}, path = {:?}", + name, + path + ) + }; + Ok(Field::new(name, dtype.clone())) + }).collect::>()?) } else { let mut hive_schema = Schema::with_capacity(16); let mut schema_inference_map: PlHashMap<&str, PlHashSet> = PlHashMap::with_capacity(16); for (name, _) in get_hive_parts_iter!(path_string) { + // If the column is also in the file we can use the dtype stored there. + if let Some(dtype) = reader_schema.get(name) { + hive_schema.insert_at_index(hive_schema.len(), name.into(), dtype.clone())?; + continue; + } + hive_schema.insert_at_index(hive_schema.len(), name.into(), DataType::String)?; schema_inference_map.insert(name, PlHashSet::with_capacity(4)); } diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs index e7a65b50ec79..173498519f8f 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs @@ -35,7 +35,7 @@ fn init_set() -> PlHashSet> { /// utility function to get names of the columns needed in projection at scan level fn get_scan_columns( - acc_projections: &mut Vec, + acc_projections: &Vec, expr_arena: &Arena, row_index: Option<&RowIndex>, ) -> Option> { @@ -378,7 +378,7 @@ impl ProjectionPushDown { mut options, predicate, } => { - options.with_columns = get_scan_columns(&mut acc_projections, expr_arena, None); + options.with_columns = get_scan_columns(&acc_projections, expr_arena, None); options.output_schema = if options.with_columns.is_none() { None @@ -417,7 +417,7 @@ impl ProjectionPushDown { if do_optimization { file_options.with_columns = get_scan_columns( - &mut acc_projections, + &acc_projections, expr_arena, file_options.row_index.as_ref(), ); @@ -432,7 +432,9 @@ impl ProjectionPushDown { hive_parts = if let Some(hive_parts) = hive_parts { let (new_schema, projected_indices) = hive_parts[0] - .get_projection_schema_and_indices(with_columns.as_ref()); + .get_projection_schema_and_indices( + &with_columns.iter().cloned().collect::>(), + ); Some( hive_parts @@ -448,15 +450,22 @@ impl ProjectionPushDown { .collect::>(), ) } else { - hive_parts + None }; // Hive partitions are created AFTER the projection, so the output // schema is incorrect. Here we ensure the columns that are projected and hive // parts are added at the proper place in the schema, which is at the end. - if let Some(ref mut hive_parts) = hive_parts { + if let Some(ref hive_parts) = hive_parts { let partition_schema = hive_parts.first().unwrap().schema(); + file_options.with_columns = file_options.with_columns.map(|x| { + x.iter() + .filter(|x| !partition_schema.contains(x)) + .cloned() + .collect::>() + }); + for (name, _) in partition_schema.iter() { if let Some(dt) = schema.shift_remove(name) { schema.with_column(name.clone(), dt); @@ -465,6 +474,10 @@ impl ProjectionPushDown { } Some(Arc::new(schema)) } else { + file_options.with_columns = maybe_init_projection_excluding_hive( + file_info.reader_schema.as_ref().unwrap(), + hive_parts.as_ref().map(|x| &x[0]), + ); None }; } diff --git a/crates/polars-plan/src/plans/schema.rs b/crates/polars-plan/src/plans/schema.rs index 1f97e045eab4..82b3a17fd3f5 100644 --- a/crates/polars-plan/src/plans/schema.rs +++ b/crates/polars-plan/src/plans/schema.rs @@ -51,20 +51,18 @@ impl FileInfo { } /// Merge the [`Schema`] of a [`HivePartitions`] with the schema of this [`FileInfo`]. - /// - /// Returns an `Err` if any of the columns in either schema overlap. - pub fn update_schema_with_hive_schema(&mut self, hive_schema: SchemaRef) -> PolarsResult<()> { - let expected_len = self.schema.len() + hive_schema.len(); + pub fn update_schema_with_hive_schema(&mut self, hive_schema: SchemaRef) { + let schema = Arc::make_mut(&mut self.schema); - let file_schema = Arc::make_mut(&mut self.schema); - file_schema.merge(Arc::unwrap_or_clone(hive_schema)); - - polars_ensure!( - file_schema.len() == expected_len, - Duplicate: "invalid Hive partition schema\n\n\ - Extending the schema with the Hive partition schema would create duplicate fields." - ); - Ok(()) + for field in hive_schema.iter_fields() { + if let Ok(existing) = schema.try_get_mut(&field.name) { + *existing = field.data_type().clone(); + } else { + schema + .insert_at_index(schema.len(), field.name, field.dtype.clone()) + .unwrap(); + } + } } } diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 78ac01336f18..4563885c72ad 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -10,7 +10,7 @@ import pytest import polars as pl -from polars.exceptions import DuplicateError, SchemaFieldNotFoundError +from polars.exceptions import SchemaFieldNotFoundError from polars.testing import assert_frame_equal, assert_series_equal @@ -247,17 +247,6 @@ def test_hive_partitioned_projection_pushdown( assert_frame_equal(result, expected) -@pytest.mark.write_disk() -def test_hive_partitioned_err(io_files_path: Path, tmp_path: Path) -> None: - df = pl.read_ipc(io_files_path / "*.ipc") - root = tmp_path / "sugars_g=10" - root.mkdir() - df.write_parquet(root / "file.parquet") - - with pytest.raises(DuplicateError, match="invalid Hive partition schema"): - pl.scan_parquet(tmp_path, hive_partitioning=True).collect() - - @pytest.mark.write_disk() def test_hive_partitioned_projection_skip_files( io_files_path: Path, tmp_path: Path @@ -538,3 +527,53 @@ def test_hive_partition_force_async_17155(tmp_path: Path, monkeypatch: Any) -> N assert_frame_equal( lf.collect(), pl.DataFrame({k: [1, 2, 3] for k in ["x", "a", "b"]}) ) + + +@pytest.mark.write_disk() +@pytest.mark.parametrize("projection_pushdown", [True, False]) +def test_hive_partition_columns_contained_in_file( + tmp_path: Path, projection_pushdown: bool +) -> None: + path = tmp_path / "a=1/b=2/data.bin" + path.parent.mkdir(exist_ok=True, parents=True) + df = pl.DataFrame( + {"x": 1, "a": 1, "b": 2, "y": 1}, + schema={"x": pl.Int32, "a": pl.Int8, "b": pl.Int16, "y": pl.Int32}, + ) + df.write_parquet(path) + + def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: + for projection in [ + ["a"], + ["b"], + ["x"], + ["y"], + ["a", "x"], + ["b", "x"], + ["a", "y"], + ["b", "y"], + ["x", "y"], + ["a", "b", "x"], + ["a", "b", "y"], + ]: + assert_frame_equal( + lf.select(projection).collect(projection_pushdown=projection_pushdown), + df.select(projection), + ) + + lf = pl.scan_parquet(path, hive_partitioning=True) + rhs = df + assert_frame_equal(lf.collect(projection_pushdown=projection_pushdown), rhs) + assert_with_projections(lf, rhs) + + lf = pl.scan_parquet( + path, + hive_schema={"a": pl.String, "b": pl.String}, + hive_partitioning=True, + ) + rhs = df.with_columns(pl.col("a", "b").cast(pl.String)) + assert_frame_equal( + lf.collect(projection_pushdown=projection_pushdown), + rhs, + ) + assert_with_projections(lf, rhs)