Skip to content

Commit

Permalink
feat: Support hive partitioning in scan_ipc (#17434)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Jul 5, 2024
1 parent f803053 commit c390fd7
Show file tree
Hide file tree
Showing 14 changed files with 315 additions and 273 deletions.
42 changes: 42 additions & 0 deletions crates/polars-io/src/hive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use polars_core::frame::DataFrame;
use polars_core::schema::IndexOfSchema;
use polars_core::series::Series;

/// Materializes hive partitions.
/// We have a special num_rows arg, as df can be empty when a projection contains
/// only hive partition columns.
///
/// # Safety
///
/// num_rows equals the height of the df when the df height is non-zero.
pub(crate) fn materialize_hive_partitions<S: IndexOfSchema>(
df: &mut DataFrame,
reader_schema: &S,
hive_partition_columns: Option<&[Series]>,
num_rows: usize,
) {
if let Some(hive_columns) = hive_partition_columns {
let Some(first) = hive_columns.first() else {
return;
};

if reader_schema.index_of(first.name()).is_some() {
// Insert these hive columns in the order they are stored in the file.
for s in hive_columns {
let i = match df.get_columns().binary_search_by_key(
&reader_schema.index_of(s.name()).unwrap_or(usize::MAX),
|s| reader_schema.index_of(s.name()).unwrap_or(usize::MIN),
) {
Ok(i) => i,
Err(i) => i,
};

df.insert_column(i, s.new_from_index(0, num_rows)).unwrap();
}
} else {
for s in hive_columns {
unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) };
}
}
}
}
90 changes: 70 additions & 20 deletions crates/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ use std::io::{Read, Seek};
use std::path::PathBuf;

use arrow::datatypes::ArrowSchemaRef;
use arrow::io::ipc::read;
use arrow::io::ipc::read::{self, get_row_count};
use arrow::record_batch::RecordBatch;
use polars_core::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

use crate::hive::materialize_hive_partitions;
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::prelude::*;
Expand Down Expand Up @@ -79,6 +80,7 @@ pub struct IpcReader<R: MmapBytesReader> {
pub(super) n_rows: Option<usize>,
pub(super) projection: Option<Vec<usize>>,
pub(crate) columns: Option<Vec<String>>,
hive_partition_columns: Option<Vec<Series>>,
pub(super) row_index: Option<RowIndex>,
// Stores the as key semaphore to make sure we don't write to the memory mapped file.
pub(super) memory_map: Option<PathBuf>,
Expand Down Expand Up @@ -126,6 +128,11 @@ impl<R: MmapBytesReader> IpcReader<R> {
self
}

pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
self.hive_partition_columns = columns;
self
}

/// Add a row index column.
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
self.row_index = row_index;
Expand Down Expand Up @@ -200,6 +207,7 @@ impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
rechunk: true,
n_rows: None,
columns: None,
hive_partition_columns: None,
projection: None,
row_index: None,
memory_map: None,
Expand All @@ -214,29 +222,71 @@ impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
}

fn finish(mut self) -> PolarsResult<DataFrame> {
if self.memory_map.is_some() && self.reader.to_file().is_some() {
match self.finish_memmapped(None) {
Ok(df) => return Ok(df),
Err(err) => check_mmap_err(err)?,
let reader_schema = if let Some(ref schema) = self.schema {
schema.clone()
} else {
self.get_metadata()?.schema.clone()
};
let reader_schema = reader_schema.as_ref();

let hive_partition_columns = self.hive_partition_columns.take();

// In case only hive columns are projected, the df would be empty, but we need the row count
// of the file in order to project the correct number of rows for the hive columns.
let (mut df, row_count) = (|| {
if self
.projection
.as_ref()
.map(|x| x.is_empty())
.unwrap_or(false)
{
return PolarsResult::Ok((
Default::default(),
get_row_count(&mut self.reader)? as usize,
));
}
}
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;
let schema = &metadata.schema;

if let Some(columns) = &self.columns {
let prj = columns_to_projection(columns, schema)?;
self.projection = Some(prj);
}
if self.memory_map.is_some() && self.reader.to_file().is_some() {
match self.finish_memmapped(None) {
Ok(df) => {
let n = df.height();
return Ok((df, n));
},
Err(err) => check_mmap_err(err)?,
}
}
let rechunk = self.rechunk;
let schema = self.get_metadata()?.schema.clone();

let schema = if let Some(projection) = &self.projection {
Arc::new(apply_projection(&metadata.schema, projection))
} else {
metadata.schema.clone()
if let Some(columns) = &self.columns {
let prj = columns_to_projection(columns, schema.as_ref())?;
self.projection = Some(prj);
}

let schema = if let Some(projection) = &self.projection {
Arc::new(apply_projection(schema.as_ref(), projection))
} else {
schema
};

let metadata = self.get_metadata()?.clone();

let ipc_reader =
read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
let df = finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)?;
let n = df.height();
Ok((df, n))
})()?;

if let Some(hive_cols) = hive_partition_columns {
materialize_hive_partitions(
&mut df,
reader_schema,
Some(hive_cols.as_slice()),
row_count,
);
};

let ipc_reader =
read::FileReader::new(self.reader, metadata.clone(), self.projection, self.n_rows);
finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)
Ok(df)
}
}
2 changes: 2 additions & 0 deletions crates/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ pub mod utils;
pub use cloud::glob as async_glob;
pub use options::*;
pub use shared::*;

pub mod hive;
40 changes: 1 addition & 39 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use super::predicates::read_this_row_group;
use super::to_metadata::ToMetadata;
use super::utils::materialize_empty_df;
use super::{mmap, ParallelStrategy};
use crate::hive::materialize_hive_partitions;
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::parquet::metadata::FileMetaDataRef;
use crate::predicates::{apply_predicate, PhysicalIoExpr};
Expand Down Expand Up @@ -149,45 +150,6 @@ pub(super) fn array_iter_to_series(
}
}

/// Materializes hive partitions.
/// We have a special num_rows arg, as df can be empty when a projection contains
/// only hive partition columns.
///
/// # Safety
///
/// num_rows equals the height of the df when the df height is non-zero.
pub(crate) fn materialize_hive_partitions(
df: &mut DataFrame,
reader_schema: &ArrowSchema,
hive_partition_columns: Option<&[Series]>,
num_rows: usize,
) {
if let Some(hive_columns) = hive_partition_columns {
let Some(first) = hive_columns.first() else {
return;
};

if reader_schema.index_of(first.name()).is_some() {
// Insert these hive columns in the order they are stored in the file.
for s in hive_columns {
let i = match df.get_columns().binary_search_by_key(
&reader_schema.index_of(s.name()).unwrap_or(usize::MAX),
|s| reader_schema.index_of(s.name()).unwrap_or(usize::MIN),
) {
Ok(i) => i,
Err(i) => i,
};

df.insert_column(i, s.new_from_index(0, num_rows)).unwrap();
}
} else {
for s in hive_columns {
unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) };
}
}
}
}

#[allow(clippy::too_many_arguments)]
fn rg_to_dfs(
store: &mmap::ColumnStore,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::borrow::Cow;

use polars_core::prelude::{ArrowSchema, DataFrame, Series, IDX_DTYPE};

use super::read_impl::materialize_hive_partitions;
use crate::hive::materialize_hive_partitions;
use crate::utils::apply_projection;
use crate::RowIndex;

Expand Down
23 changes: 20 additions & 3 deletions crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::path::{Path, PathBuf};

use file_list_reader::get_glob_start_idx;
use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::ipc::IpcScanOptions;
use polars_io::RowIndex;
use polars_io::utils::is_cloud_url;
use polars_io::{HiveOptions, RowIndex};

use crate::prelude::*;

Expand All @@ -15,6 +17,7 @@ pub struct ScanArgsIpc {
pub row_index: Option<RowIndex>,
pub memory_map: bool,
pub cloud_options: Option<CloudOptions>,
pub hive_options: HiveOptions,
}

impl Default for ScanArgsIpc {
Expand All @@ -26,6 +29,7 @@ impl Default for ScanArgsIpc {
row_index: None,
memory_map: true,
cloud_options: Default::default(),
hive_options: Default::default(),
}
}
}
Expand All @@ -46,8 +50,20 @@ impl LazyIpcReader {
}

impl LazyFileListReader for LazyIpcReader {
fn finish(self) -> PolarsResult<LazyFrame> {
let paths = self.expand_paths(false)?.0;
fn finish(mut self) -> PolarsResult<LazyFrame> {
let (paths, hive_start_idx) =
self.expand_paths(self.args.hive_options.enabled.unwrap_or(false))?;
self.args.hive_options.enabled =
Some(self.args.hive_options.enabled.unwrap_or_else(|| {
self.paths.len() == 1
&& get_glob_start_idx(self.paths[0].to_str().unwrap().as_bytes()).is_none()
&& !paths.is_empty()
&& {
(!is_cloud_url(&paths[0]) && paths[0].is_dir())
|| (paths[0] != self.paths[0])
}
}));
self.args.hive_options.hive_start_idx = hive_start_idx;
let args = self.args;

let options = IpcScanOptions {
Expand All @@ -62,6 +78,7 @@ impl LazyFileListReader for LazyIpcReader {
args.row_index,
args.rechunk,
args.cloud_options,
args.hive_options,
)?
.build()
.into();
Expand Down
1 change: 1 addition & 0 deletions crates/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ fn test_ipc_globbing() -> PolarsResult<()> {
row_index: None,
memory_map: true,
cloud_options: None,
hive_options: Default::default(),
},
)?
.collect()?;
Expand Down
Loading

0 comments on commit c390fd7

Please sign in to comment.