Skip to content

Commit

Permalink
feat: Support loading from datasets where the hive columns are also s…
Browse files Browse the repository at this point in the history
…tored in the file (#17203)

Co-authored-by: Ritchie Vink <[email protected]>
  • Loading branch information
nameexhaustion and ritchie46 authored Jun 26, 2024
1 parent 053765b commit 4d2f928
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 46 deletions.
2 changes: 1 addition & 1 deletion crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,7 @@ impl DataFrame {
series: Series,
) -> PolarsResult<&mut Self> {
polars_ensure!(
series.len() == self.height(),
self.width() == 0 || series.len() == self.height(),
ShapeMismatch: "unable to add a column of length {} to a DataFrame of height {}",
series.len(), self.height(),
);
Expand Down
38 changes: 34 additions & 4 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,32 @@ pub(super) fn array_iter_to_series(
/// num_rows equals the height of the df when the df height is non-zero.
pub(crate) fn materialize_hive_partitions(
df: &mut DataFrame,
reader_schema: &ArrowSchema,
hive_partition_columns: Option<&[Series]>,
num_rows: usize,
) {
if let Some(hive_columns) = hive_partition_columns {
for s in hive_columns {
unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) };
let Some(first) = hive_columns.first() else {
return;
};

if reader_schema.index_of(first.name()).is_some() {
// Insert these hive columns in the order they are stored in the file.
for s in hive_columns {
let i = match df.get_columns().binary_search_by_key(
&reader_schema.index_of(s.name()).unwrap_or(usize::MAX),
|s| reader_schema.index_of(s.name()).unwrap_or(usize::MIN),
) {
Ok(i) => i,
Err(i) => i,
};

df.insert_column(i, s.new_from_index(0, num_rows)).unwrap();
}
} else {
for s in hive_columns {
unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) };
}
}
}
}
Expand Down Expand Up @@ -294,7 +314,12 @@ fn rg_to_dfs_optionally_par_over_columns(
df.with_row_index_mut(&rc.name, Some(*previous_row_count + rc.offset));
}

materialize_hive_partitions(&mut df, hive_partition_columns, projection_height);
materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
projection_height,
);
apply_predicate(&mut df, predicate, true)?;

*previous_row_count += current_row_count;
Expand Down Expand Up @@ -382,7 +407,12 @@ fn rg_to_dfs_par_over_rg(
df.with_row_index_mut(&rc.name, Some(row_count_start as IdxSize + rc.offset));
}

materialize_hive_partitions(&mut df, hive_partition_columns, projection_height);
materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
projection_height,
);
apply_predicate(&mut df, predicate, false)?;

Ok(Some(df))
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub fn materialize_empty_df(
.unwrap();
}

materialize_hive_partitions(&mut df, hive_partition_columns, 0);
materialize_hive_partitions(&mut df, reader_schema, hive_partition_columns, 0);

df
}
63 changes: 59 additions & 4 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use arrow::datatypes::ArrowSchemaRef;
use either::Either;
use expr_expansion::{is_regex_projection, rewrite_projections};
use hive::hive_partitions_from_paths;
use hive::{hive_partitions_from_paths, HivePartitions};

use super::stack_opt::ConversionOptimizer;
use super::*;
Expand Down Expand Up @@ -86,7 +88,7 @@ pub fn to_alp_impl(
paths,
predicate,
mut scan_type,
file_options,
mut file_options,
} => {
let mut file_info = if let Some(file_info) = file_info {
file_info
Expand Down Expand Up @@ -136,20 +138,42 @@ pub fn to_alp_impl(

let hive_parts = if hive_parts.is_some() {
hive_parts
} else if file_options.hive_options.enabled.unwrap() {
} else if file_options.hive_options.enabled.unwrap()
&& file_info.reader_schema.is_some()
{
#[allow(unused_assignments)]
let mut owned = None;

hive_partitions_from_paths(
paths.as_ref(),
file_options.hive_options.hive_start_idx,
file_options.hive_options.schema.clone(),
match file_info.reader_schema.as_ref().unwrap() {
Either::Left(v) => {
owned = Some(Schema::from(v));
owned.as_ref().unwrap()
},
Either::Right(v) => v.as_ref(),
},
)?
} else {
None
};

if let Some(ref hive_parts) = hive_parts {
file_info.update_schema_with_hive_schema(hive_parts[0].schema().clone())?;
let hive_schema = hive_parts[0].schema();
file_info.update_schema_with_hive_schema(hive_schema.clone());
}

file_options.with_columns = if file_info.reader_schema.is_some() {
maybe_init_projection_excluding_hive(
file_info.reader_schema.as_ref().unwrap(),
hive_parts.as_ref().map(|x| &x[0]),
)
} else {
None
};

if let Some(row_index) = &file_options.row_index {
let schema = Arc::make_mut(&mut file_info.schema);
*schema = schema
Expand Down Expand Up @@ -802,3 +826,34 @@ where
})
.collect()
}

pub(crate) fn maybe_init_projection_excluding_hive(
reader_schema: &Either<ArrowSchemaRef, SchemaRef>,
hive_parts: Option<&HivePartitions>,
) -> Option<Arc<[String]>> {
// Update `with_columns` with a projection so that hive columns aren't loaded from the
// file
let hive_parts = hive_parts?;

let hive_schema = hive_parts.schema();

let (first_hive_name, _) = hive_schema.get_at_index(0)?;

let names = match reader_schema {
Either::Left(ref v) => {
let names = v.get_names();
names.contains(&first_hive_name.as_str()).then_some(names)
},
Either::Right(ref v) => v.contains(first_hive_name.as_str()).then(|| v.get_names()),
};

let names = names?;

Some(
names
.iter()
.filter(|x| !hive_schema.contains(x))
.map(ToString::to_string)
.collect::<Arc<[_]>>(),
)
}
26 changes: 21 additions & 5 deletions crates/polars-plan/src/plans/hive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ pub struct HivePartitions {
}

impl HivePartitions {
pub fn get_projection_schema_and_indices<T: AsRef<str>>(
pub fn get_projection_schema_and_indices(
&self,
names: &[T],
names: &PlHashSet<String>,
) -> (SchemaRef, Vec<usize>) {
let names = names.iter().map(T::as_ref).collect::<PlHashSet<&str>>();
let mut out_schema = Schema::with_capacity(self.stats.schema().len());
let mut out_indices = Vec::with_capacity(self.stats.column_stats().len());

Expand Down Expand Up @@ -66,6 +65,7 @@ pub fn hive_partitions_from_paths(
paths: &[PathBuf],
hive_start_idx: usize,
schema: Option<SchemaRef>,
reader_schema: &Schema,
) -> PolarsResult<Option<Arc<[HivePartitions]>>> {
let Some(path) = paths.first() else {
return Ok(None);
Expand All @@ -88,14 +88,30 @@ pub fn hive_partitions_from_paths(
}};
}

let hive_schema = if let Some(v) = schema {
v
let hive_schema = if let Some(ref schema) = schema {
Arc::new(get_hive_parts_iter!(path_string).map(|(name, _)| {
let Some(dtype) = schema.get(name) else {
polars_bail!(
SchemaFieldNotFound:
"path contains column not present in the given Hive schema: {:?}, path = {:?}",
name,
path
)
};
Ok(Field::new(name, dtype.clone()))
}).collect::<PolarsResult<Schema>>()?)
} else {
let mut hive_schema = Schema::with_capacity(16);
let mut schema_inference_map: PlHashMap<&str, PlHashSet<DataType>> =
PlHashMap::with_capacity(16);

for (name, _) in get_hive_parts_iter!(path_string) {
// If the column is also in the file we can use the dtype stored there.
if let Some(dtype) = reader_schema.get(name) {
hive_schema.insert_at_index(hive_schema.len(), name.into(), dtype.clone())?;
continue;
}

hive_schema.insert_at_index(hive_schema.len(), name.into(), DataType::String)?;
schema_inference_map.insert(name, PlHashSet::with_capacity(4));
}
Expand Down
25 changes: 19 additions & 6 deletions crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn init_set() -> PlHashSet<Arc<str>> {

/// utility function to get names of the columns needed in projection at scan level
fn get_scan_columns(
acc_projections: &mut Vec<ColumnNode>,
acc_projections: &Vec<ColumnNode>,
expr_arena: &Arena<AExpr>,
row_index: Option<&RowIndex>,
) -> Option<Arc<[String]>> {
Expand Down Expand Up @@ -378,7 +378,7 @@ impl ProjectionPushDown {
mut options,
predicate,
} => {
options.with_columns = get_scan_columns(&mut acc_projections, expr_arena, None);
options.with_columns = get_scan_columns(&acc_projections, expr_arena, None);

options.output_schema = if options.with_columns.is_none() {
None
Expand Down Expand Up @@ -417,7 +417,7 @@ impl ProjectionPushDown {

if do_optimization {
file_options.with_columns = get_scan_columns(
&mut acc_projections,
&acc_projections,
expr_arena,
file_options.row_index.as_ref(),
);
Expand All @@ -432,7 +432,9 @@ impl ProjectionPushDown {

hive_parts = if let Some(hive_parts) = hive_parts {
let (new_schema, projected_indices) = hive_parts[0]
.get_projection_schema_and_indices(with_columns.as_ref());
.get_projection_schema_and_indices(
&with_columns.iter().cloned().collect::<PlHashSet<_>>(),
);

Some(
hive_parts
Expand All @@ -448,15 +450,22 @@ impl ProjectionPushDown {
.collect::<Arc<[_]>>(),
)
} else {
hive_parts
None
};

// Hive partitions are created AFTER the projection, so the output
// schema is incorrect. Here we ensure the columns that are projected and hive
// parts are added at the proper place in the schema, which is at the end.
if let Some(ref mut hive_parts) = hive_parts {
if let Some(ref hive_parts) = hive_parts {
let partition_schema = hive_parts.first().unwrap().schema();

file_options.with_columns = file_options.with_columns.map(|x| {
x.iter()
.filter(|x| !partition_schema.contains(x))
.cloned()
.collect::<Arc<[_]>>()
});

for (name, _) in partition_schema.iter() {
if let Some(dt) = schema.shift_remove(name) {
schema.with_column(name.clone(), dt);
Expand All @@ -465,6 +474,10 @@ impl ProjectionPushDown {
}
Some(Arc::new(schema))
} else {
file_options.with_columns = maybe_init_projection_excluding_hive(
file_info.reader_schema.as_ref().unwrap(),
hive_parts.as_ref().map(|x| &x[0]),
);
None
};
}
Expand Down
24 changes: 11 additions & 13 deletions crates/polars-plan/src/plans/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,18 @@ impl FileInfo {
}

/// Merge the [`Schema`] of a [`HivePartitions`] with the schema of this [`FileInfo`].
///
/// Returns an `Err` if any of the columns in either schema overlap.
pub fn update_schema_with_hive_schema(&mut self, hive_schema: SchemaRef) -> PolarsResult<()> {
let expected_len = self.schema.len() + hive_schema.len();
pub fn update_schema_with_hive_schema(&mut self, hive_schema: SchemaRef) {
let schema = Arc::make_mut(&mut self.schema);

let file_schema = Arc::make_mut(&mut self.schema);
file_schema.merge(Arc::unwrap_or_clone(hive_schema));

polars_ensure!(
file_schema.len() == expected_len,
Duplicate: "invalid Hive partition schema\n\n\
Extending the schema with the Hive partition schema would create duplicate fields."
);
Ok(())
for field in hive_schema.iter_fields() {
if let Ok(existing) = schema.try_get_mut(&field.name) {
*existing = field.data_type().clone();
} else {
schema
.insert_at_index(schema.len(), field.name, field.dtype.clone())
.unwrap();
}
}
}
}

Expand Down
Loading

0 comments on commit 4d2f928

Please sign in to comment.