From c390fd7e1620dad93637a57c19ee82af9287eb17 Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Fri, 5 Jul 2024 16:56:44 +1000 Subject: [PATCH] feat: Support hive partitioning in `scan_ipc` (#17434) --- crates/polars-io/src/hive.rs | 42 +++ crates/polars-io/src/ipc/ipc_file.rs | 90 ++++-- crates/polars-io/src/lib.rs | 2 + .../polars-io/src/parquet/read/read_impl.rs | 40 +-- crates/polars-io/src/parquet/read/utils.rs | 2 +- crates/polars-lazy/src/scan/ipc.rs | 23 +- crates/polars-lazy/src/tests/io.rs | 1 + .../src/executors/scan/ipc.rs | 290 ++++++------------ .../src/executors/scan/mod.rs | 2 - crates/polars-mem-engine/src/planner/lp.rs | 4 +- crates/polars-plan/src/plans/builder_dsl.rs | 8 +- py-polars/polars/io/ipc/functions.py | 21 ++ py-polars/src/lazyframe/mod.rs | 13 +- py-polars/tests/unit/io/test_hive.py | 50 ++- 14 files changed, 315 insertions(+), 273 deletions(-) create mode 100644 crates/polars-io/src/hive.rs diff --git a/crates/polars-io/src/hive.rs b/crates/polars-io/src/hive.rs new file mode 100644 index 000000000000..ddf1d8973b3e --- /dev/null +++ b/crates/polars-io/src/hive.rs @@ -0,0 +1,42 @@ +use polars_core::frame::DataFrame; +use polars_core::schema::IndexOfSchema; +use polars_core::series::Series; + +/// Materializes hive partitions. +/// We have a special num_rows arg, as df can be empty when a projection contains +/// only hive partition columns. +/// +/// # Safety +/// +/// num_rows equals the height of the df when the df height is non-zero. +pub(crate) fn materialize_hive_partitions( + df: &mut DataFrame, + reader_schema: &S, + hive_partition_columns: Option<&[Series]>, + num_rows: usize, +) { + if let Some(hive_columns) = hive_partition_columns { + let Some(first) = hive_columns.first() else { + return; + }; + + if reader_schema.index_of(first.name()).is_some() { + // Insert these hive columns in the order they are stored in the file. + for s in hive_columns { + let i = match df.get_columns().binary_search_by_key( + &reader_schema.index_of(s.name()).unwrap_or(usize::MAX), + |s| reader_schema.index_of(s.name()).unwrap_or(usize::MIN), + ) { + Ok(i) => i, + Err(i) => i, + }; + + df.insert_column(i, s.new_from_index(0, num_rows)).unwrap(); + } + } else { + for s in hive_columns { + unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) }; + } + } + } +} diff --git a/crates/polars-io/src/ipc/ipc_file.rs b/crates/polars-io/src/ipc/ipc_file.rs index 198e75ab3afe..12cbf9dddf73 100644 --- a/crates/polars-io/src/ipc/ipc_file.rs +++ b/crates/polars-io/src/ipc/ipc_file.rs @@ -36,12 +36,13 @@ use std::io::{Read, Seek}; use std::path::PathBuf; use arrow::datatypes::ArrowSchemaRef; -use arrow::io::ipc::read; +use arrow::io::ipc::read::{self, get_row_count}; use arrow::record_batch::RecordBatch; use polars_core::prelude::*; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +use crate::hive::materialize_hive_partitions; use crate::mmap::MmapBytesReader; use crate::predicates::PhysicalIoExpr; use crate::prelude::*; @@ -79,6 +80,7 @@ pub struct IpcReader { pub(super) n_rows: Option, pub(super) projection: Option>, pub(crate) columns: Option>, + hive_partition_columns: Option>, pub(super) row_index: Option, // Stores the as key semaphore to make sure we don't write to the memory mapped file. pub(super) memory_map: Option, @@ -126,6 +128,11 @@ impl IpcReader { self } + pub fn with_hive_partition_columns(mut self, columns: Option>) -> Self { + self.hive_partition_columns = columns; + self + } + /// Add a row index column. pub fn with_row_index(mut self, row_index: Option) -> Self { self.row_index = row_index; @@ -200,6 +207,7 @@ impl SerReader for IpcReader { rechunk: true, n_rows: None, columns: None, + hive_partition_columns: None, projection: None, row_index: None, memory_map: None, @@ -214,29 +222,71 @@ impl SerReader for IpcReader { } fn finish(mut self) -> PolarsResult { - if self.memory_map.is_some() && self.reader.to_file().is_some() { - match self.finish_memmapped(None) { - Ok(df) => return Ok(df), - Err(err) => check_mmap_err(err)?, + let reader_schema = if let Some(ref schema) = self.schema { + schema.clone() + } else { + self.get_metadata()?.schema.clone() + }; + let reader_schema = reader_schema.as_ref(); + + let hive_partition_columns = self.hive_partition_columns.take(); + + // In case only hive columns are projected, the df would be empty, but we need the row count + // of the file in order to project the correct number of rows for the hive columns. + let (mut df, row_count) = (|| { + if self + .projection + .as_ref() + .map(|x| x.is_empty()) + .unwrap_or(false) + { + return PolarsResult::Ok(( + Default::default(), + get_row_count(&mut self.reader)? as usize, + )); } - } - let rechunk = self.rechunk; - let metadata = read::read_file_metadata(&mut self.reader)?; - let schema = &metadata.schema; - if let Some(columns) = &self.columns { - let prj = columns_to_projection(columns, schema)?; - self.projection = Some(prj); - } + if self.memory_map.is_some() && self.reader.to_file().is_some() { + match self.finish_memmapped(None) { + Ok(df) => { + let n = df.height(); + return Ok((df, n)); + }, + Err(err) => check_mmap_err(err)?, + } + } + let rechunk = self.rechunk; + let schema = self.get_metadata()?.schema.clone(); - let schema = if let Some(projection) = &self.projection { - Arc::new(apply_projection(&metadata.schema, projection)) - } else { - metadata.schema.clone() + if let Some(columns) = &self.columns { + let prj = columns_to_projection(columns, schema.as_ref())?; + self.projection = Some(prj); + } + + let schema = if let Some(projection) = &self.projection { + Arc::new(apply_projection(schema.as_ref(), projection)) + } else { + schema + }; + + let metadata = self.get_metadata()?.clone(); + + let ipc_reader = + read::FileReader::new(self.reader, metadata, self.projection, self.n_rows); + let df = finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)?; + let n = df.height(); + Ok((df, n)) + })()?; + + if let Some(hive_cols) = hive_partition_columns { + materialize_hive_partitions( + &mut df, + reader_schema, + Some(hive_cols.as_slice()), + row_count, + ); }; - let ipc_reader = - read::FileReader::new(self.reader, metadata.clone(), self.projection, self.n_rows); - finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index) + Ok(df) } } diff --git a/crates/polars-io/src/lib.rs b/crates/polars-io/src/lib.rs index d5fc527b822b..514f47b158d6 100644 --- a/crates/polars-io/src/lib.rs +++ b/crates/polars-io/src/lib.rs @@ -32,3 +32,5 @@ pub mod utils; pub use cloud::glob as async_glob; pub use options::*; pub use shared::*; + +pub mod hive; diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index cd869b96638c..f272daaedc63 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -17,6 +17,7 @@ use super::predicates::read_this_row_group; use super::to_metadata::ToMetadata; use super::utils::materialize_empty_df; use super::{mmap, ParallelStrategy}; +use crate::hive::materialize_hive_partitions; use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::parquet::metadata::FileMetaDataRef; use crate::predicates::{apply_predicate, PhysicalIoExpr}; @@ -149,45 +150,6 @@ pub(super) fn array_iter_to_series( } } -/// Materializes hive partitions. -/// We have a special num_rows arg, as df can be empty when a projection contains -/// only hive partition columns. -/// -/// # Safety -/// -/// num_rows equals the height of the df when the df height is non-zero. -pub(crate) fn materialize_hive_partitions( - df: &mut DataFrame, - reader_schema: &ArrowSchema, - hive_partition_columns: Option<&[Series]>, - num_rows: usize, -) { - if let Some(hive_columns) = hive_partition_columns { - let Some(first) = hive_columns.first() else { - return; - }; - - if reader_schema.index_of(first.name()).is_some() { - // Insert these hive columns in the order they are stored in the file. - for s in hive_columns { - let i = match df.get_columns().binary_search_by_key( - &reader_schema.index_of(s.name()).unwrap_or(usize::MAX), - |s| reader_schema.index_of(s.name()).unwrap_or(usize::MIN), - ) { - Ok(i) => i, - Err(i) => i, - }; - - df.insert_column(i, s.new_from_index(0, num_rows)).unwrap(); - } - } else { - for s in hive_columns { - unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) }; - } - } - } -} - #[allow(clippy::too_many_arguments)] fn rg_to_dfs( store: &mmap::ColumnStore, diff --git a/crates/polars-io/src/parquet/read/utils.rs b/crates/polars-io/src/parquet/read/utils.rs index 78a7c4f5b50e..bb476a5fad08 100644 --- a/crates/polars-io/src/parquet/read/utils.rs +++ b/crates/polars-io/src/parquet/read/utils.rs @@ -2,7 +2,7 @@ use std::borrow::Cow; use polars_core::prelude::{ArrowSchema, DataFrame, Series, IDX_DTYPE}; -use super::read_impl::materialize_hive_partitions; +use crate::hive::materialize_hive_partitions; use crate::utils::apply_projection; use crate::RowIndex; diff --git a/crates/polars-lazy/src/scan/ipc.rs b/crates/polars-lazy/src/scan/ipc.rs index 41a5b7b066de..5510f0d419d1 100644 --- a/crates/polars-lazy/src/scan/ipc.rs +++ b/crates/polars-lazy/src/scan/ipc.rs @@ -1,9 +1,11 @@ use std::path::{Path, PathBuf}; +use file_list_reader::get_glob_start_idx; use polars_core::prelude::*; use polars_io::cloud::CloudOptions; use polars_io::ipc::IpcScanOptions; -use polars_io::RowIndex; +use polars_io::utils::is_cloud_url; +use polars_io::{HiveOptions, RowIndex}; use crate::prelude::*; @@ -15,6 +17,7 @@ pub struct ScanArgsIpc { pub row_index: Option, pub memory_map: bool, pub cloud_options: Option, + pub hive_options: HiveOptions, } impl Default for ScanArgsIpc { @@ -26,6 +29,7 @@ impl Default for ScanArgsIpc { row_index: None, memory_map: true, cloud_options: Default::default(), + hive_options: Default::default(), } } } @@ -46,8 +50,20 @@ impl LazyIpcReader { } impl LazyFileListReader for LazyIpcReader { - fn finish(self) -> PolarsResult { - let paths = self.expand_paths(false)?.0; + fn finish(mut self) -> PolarsResult { + let (paths, hive_start_idx) = + self.expand_paths(self.args.hive_options.enabled.unwrap_or(false))?; + self.args.hive_options.enabled = + Some(self.args.hive_options.enabled.unwrap_or_else(|| { + self.paths.len() == 1 + && get_glob_start_idx(self.paths[0].to_str().unwrap().as_bytes()).is_none() + && !paths.is_empty() + && { + (!is_cloud_url(&paths[0]) && paths[0].is_dir()) + || (paths[0] != self.paths[0]) + } + })); + self.args.hive_options.hive_start_idx = hive_start_idx; let args = self.args; let options = IpcScanOptions { @@ -62,6 +78,7 @@ impl LazyFileListReader for LazyIpcReader { args.row_index, args.rechunk, args.cloud_options, + args.hive_options, )? .build() .into(); diff --git a/crates/polars-lazy/src/tests/io.rs b/crates/polars-lazy/src/tests/io.rs index a81b59dcef61..29dd7c02695a 100644 --- a/crates/polars-lazy/src/tests/io.rs +++ b/crates/polars-lazy/src/tests/io.rs @@ -419,6 +419,7 @@ fn test_ipc_globbing() -> PolarsResult<()> { row_index: None, memory_map: true, cloud_options: None, + hive_options: Default::default(), }, )? .collect()?; diff --git a/crates/polars-mem-engine/src/executors/scan/ipc.rs b/crates/polars-mem-engine/src/executors/scan/ipc.rs index a1a449afdefc..1ff50b037218 100644 --- a/crates/polars-mem-engine/src/executors/scan/ipc.rs +++ b/crates/polars-mem-engine/src/executors/scan/ipc.rs @@ -1,29 +1,27 @@ use std::path::PathBuf; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::RwLock; +use hive::HivePartitions; use polars_core::config; use polars_core::utils::accumulate_dataframes_vertical; use polars_io::cloud::CloudOptions; use polars_io::predicates::apply_predicate; use polars_io::utils::is_cloud_url; -use polars_io::RowIndex; use rayon::prelude::*; use super::*; pub struct IpcExec { pub(crate) paths: Arc<[PathBuf]>, - pub(crate) schema: SchemaRef, + pub(crate) file_info: FileInfo, pub(crate) predicate: Option>, pub(crate) options: IpcScanOptions, pub(crate) file_options: FileScanOptions, + pub(crate) hive_parts: Option>, pub(crate) cloud_options: Option, - pub(crate) metadata: Option, } impl IpcExec { - fn read(&mut self, verbose: bool) -> PolarsResult { + fn read(&mut self) -> PolarsResult { let is_cloud = self.paths.iter().any(is_cloud_url); let force_async = config::force_async(); @@ -35,12 +33,11 @@ impl IpcExec { #[cfg(feature = "cloud")] { - if force_async && verbose { + if force_async && config::verbose() { eprintln!("ASYNC READING FORCED"); } - polars_io::pl_async::get_runtime() - .block_on_potential_spawn(self.read_async(verbose))? + polars_io::pl_async::get_runtime().block_on_potential_spawn(self.read_async())? } } else { self.read_sync()? @@ -53,7 +50,10 @@ impl IpcExec { Ok(out) } - fn read_sync(&mut self) -> PolarsResult { + fn read_impl PolarsResult + Send + Sync>( + &mut self, + path_idx_to_file: F, + ) -> PolarsResult { if config::verbose() { eprintln!("executing ipc read sync with row_index = {:?}, n_rows = {:?}, predicate = {:?} for paths {:?}", self.file_options.row_index.as_ref(), @@ -65,211 +65,111 @@ impl IpcExec { let projection = materialize_projection( self.file_options.with_columns.as_deref(), - &self.schema, + &self.file_info.schema, None, self.file_options.row_index.is_some(), ); - let n_rows = self - .file_options - .n_rows - .map(|n| IdxSize::try_from(n).unwrap()); - - let row_limit = n_rows.unwrap_or(IdxSize::MAX); + let read_path = |path_index: usize, n_rows: Option| { + IpcReader::new(path_idx_to_file(path_index)?) + .with_n_rows(n_rows) + .with_row_index(self.file_options.row_index.clone()) + .with_projection(projection.clone()) + .with_hive_partition_columns( + self.hive_parts + .as_ref() + .map(|x| x[path_index].materialize_partition_columns()), + ) + .memory_mapped( + self.options + .memory_map + .then(|| self.paths[path_index].clone()), + ) + .finish() + }; - // Used to determine the next file to open. This guarantees the order. - let path_index = AtomicUsize::new(0); - let row_counter = RwLock::new(ConsecutiveCountState::new(self.paths.len())); + let mut dfs = if let Some(mut n_rows) = self.file_options.n_rows { + let mut out = Vec::with_capacity(self.paths.len()); - let index_and_dfs = (0..self.paths.len()) - .into_par_iter() - .map(|_| -> PolarsResult<(usize, DataFrame)> { - let index = path_index.fetch_add(1, Ordering::Relaxed); - let path = &self.paths[index]; + for i in 0..self.paths.len() { + let df = read_path(i, Some(n_rows))?; + let df_height = df.height(); + out.push(df); - let already_read_in_sequence = row_counter.read().unwrap().sum(); - if already_read_in_sequence >= row_limit { - return Ok((index, Default::default())); + assert!( + df_height <= n_rows, + "impl error: got more rows than expected" + ); + if df_height == n_rows { + break; } + n_rows -= df_height; + } - let file = std::fs::File::open(path)?; + out + } else { + POOL.install(|| { + (0..self.paths.len()) + .into_par_iter() + .map(|i| read_path(i, None)) + .collect::>>() + })? + }; - let memory_mapped = if self.options.memory_map { - Some(path.clone()) - } else { - None - }; + if let Some(ref row_index) = self.file_options.row_index { + let mut offset = 0; + for df in &mut dfs { + df.apply(&row_index.name, |series| series.idx().unwrap() + offset) + .unwrap(); + offset += df.height(); + } + }; - let df = IpcReader::new(file) - .with_n_rows( - // NOTE: If there is any file that by itself exceeds the - // row limit, passing the total row limit to each - // individual reader helps. - n_rows.map(|n| { - n.saturating_sub(already_read_in_sequence) - .try_into() - .unwrap() - }), - ) - .with_row_index(self.file_options.row_index.clone()) - .with_projection(projection.clone()) - .memory_mapped(memory_mapped) - .finish()?; + let dfs = if let Some(predicate) = self.predicate.clone() { + let predicate = phys_expr_to_io_expr(predicate); + let predicate = Some(predicate.as_ref()); - row_counter - .write() - .unwrap() - .write(index, df.height().try_into().unwrap()); + POOL.install(|| { + dfs.into_par_iter() + .map(|mut df| { + apply_predicate(&mut df, predicate, true)?; + Ok(df) + }) + .collect::>>() + })? + } else { + dfs + }; - Ok((index, df)) - }) - .collect::>>()?; + accumulate_dataframes_vertical(dfs) + } - finish_index_and_dfs( - index_and_dfs, - row_counter.into_inner().unwrap(), - self.file_options.row_index.as_ref(), - row_limit, - self.predicate.as_ref(), - ) + fn read_sync(&mut self) -> PolarsResult { + let paths = self.paths.clone(); + self.read_impl(move |i| std::fs::File::open(&paths[i]).map_err(Into::into)) } #[cfg(feature = "cloud")] - async fn read_async(&mut self, verbose: bool) -> PolarsResult { - use futures::stream::{self, StreamExt}; - use futures::TryStreamExt; - - /// See https://users.rust-lang.org/t/implementation-of-fnonce-is-not-general-enough-with-async-block/83427/3. - trait AssertSend { - fn assert_send(self) -> impl Send + stream::Stream - where - Self: Send + stream::Stream + Sized, - { - self - } - } - - impl AssertSend for T {} - - let n_rows = self - .file_options - .n_rows - .map(|limit| limit.try_into().unwrap()); - - let row_limit = n_rows.unwrap_or(IdxSize::MAX); - - let row_counter = RwLock::new(ConsecutiveCountState::new(self.paths.len())); + async fn read_async(&mut self) -> PolarsResult { + // TODO: Better async impl that can download only the parts of the file it needs, and do it + // concurrently. + use polars_io::file_cache::init_entries_from_uri_list; - let index_and_dfs = stream::iter(&*self.paths) - .enumerate() - .map(|(index, path)| { - let this = &*self; - let row_counter = &row_counter; - async move { - let already_read_in_sequence = row_counter.read().unwrap().sum(); - if already_read_in_sequence >= row_limit { - return Ok((index, Default::default())); - } - - let reader = IpcReaderAsync::from_uri( - path.to_str().unwrap(), - this.cloud_options.as_ref(), - ) - .await?; - let df = reader - .data( - this.metadata.as_ref(), - IpcReadOptions::default() - .with_row_limit( - // NOTE: If there is any file that by itself - // exceeds the row limit, passing the total - // row limit to each individual reader - // helps. - n_rows.map(|n| { - n.saturating_sub(already_read_in_sequence) - .try_into() - .unwrap() - }), - ) - .with_row_index(this.file_options.row_index.clone()) - .with_projection(this.file_options.with_columns.as_ref().cloned()), - verbose, - ) - .await?; - - row_counter - .write() - .unwrap() - .write(index, df.height().try_into().unwrap()); - - PolarsResult::Ok((index, df)) - } - }) - .assert_send() - .buffer_unordered(config::get_file_prefetch_size()) - .try_collect::>() - .await?; - - finish_index_and_dfs( - index_and_dfs, - row_counter.into_inner().unwrap(), - self.file_options.row_index.as_ref(), - row_limit, - self.predicate.as_ref(), - ) + tokio::task::block_in_place(|| { + let cache_entries = init_entries_from_uri_list( + self.paths + .iter() + .map(|x| Arc::from(x.to_str().unwrap())) + .collect::>() + .as_slice(), + self.cloud_options.as_ref(), + )?; + + self.read_impl(move |i| cache_entries[i].try_open_check_latest()) + }) } } -fn finish_index_and_dfs( - mut index_and_dfs: Vec<(usize, DataFrame)>, - row_counter: ConsecutiveCountState, - row_index: Option<&RowIndex>, - row_limit: IdxSize, - predicate: Option<&Arc>, -) -> PolarsResult { - index_and_dfs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); - - debug_assert!( - index_and_dfs.iter().enumerate().all(|(a, &(b, _))| a == b), - "expected dataframe indices in order from 0 to len" - ); - - debug_assert_eq!(index_and_dfs.len(), row_counter.len()); - let mut offset = 0; - let mut df = accumulate_dataframes_vertical( - index_and_dfs - .into_iter() - .zip(row_counter.counts()) - .filter_map(|((_, mut df), count)| { - let count = count?; - - let remaining = row_limit.checked_sub(offset)?; - - // If necessary, correct having read too much from a single file. - if remaining < count { - df = df.slice(0, remaining.try_into().unwrap()); - } - - // If necessary, correct row indices now that we know the offset. - if let Some(row_index) = row_index { - df.apply(&row_index.name, |series| { - series.idx().expect("index column should be of index type") + offset - }) - .expect("index column should exist"); - } - - offset += count; - - Some(df) - }), - )?; - - let predicate = predicate.cloned().map(phys_expr_to_io_expr); - apply_predicate(&mut df, predicate.as_deref(), true)?; - - Ok(df) -} - impl Executor for IpcExec { fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult { let profile_name = if state.has_node_timer() { @@ -283,6 +183,6 @@ impl Executor for IpcExec { Cow::Borrowed("") }; - state.record(|| self.read(state.verbose()), profile_name) + state.record(|| self.read(), profile_name) } } diff --git a/crates/polars-mem-engine/src/executors/scan/mod.rs b/crates/polars-mem-engine/src/executors/scan/mod.rs index cd7131f27632..1f50268db23f 100644 --- a/crates/polars-mem-engine/src/executors/scan/mod.rs +++ b/crates/polars-mem-engine/src/executors/scan/mod.rs @@ -24,8 +24,6 @@ use polars_io::predicates::PhysicalIoExpr; #[cfg(any(feature = "parquet", feature = "csv", feature = "ipc", feature = "cse"))] use polars_io::prelude::*; use polars_plan::global::_set_n_rows_for_scan; -#[cfg(feature = "ipc")] -pub(crate) use support::ConsecutiveCountState; use super::*; use crate::prelude::*; diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index f7f7bd46463a..b88ecaa1d7fe 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -269,12 +269,12 @@ fn create_physical_plan_impl( metadata, } => Ok(Box::new(executors::IpcExec { paths, - schema: file_info.schema, + file_info, predicate, options, file_options, + hive_parts, cloud_options, - metadata, })), #[cfg(feature = "parquet")] FileScan::Parquet { diff --git a/crates/polars-plan/src/plans/builder_dsl.rs b/crates/polars-plan/src/plans/builder_dsl.rs index 297c5cc554ee..fd8a58d79e53 100644 --- a/crates/polars-plan/src/plans/builder_dsl.rs +++ b/crates/polars-plan/src/plans/builder_dsl.rs @@ -115,6 +115,7 @@ impl DslBuilder { } #[cfg(feature = "ipc")] + #[allow(clippy::too_many_arguments)] pub fn scan_ipc>>( paths: P, options: IpcScanOptions, @@ -123,6 +124,7 @@ impl DslBuilder { row_index: Option, rechunk: bool, cloud_options: Option, + hive_options: HiveOptions, ) -> PolarsResult { let paths = paths.into(); @@ -137,11 +139,7 @@ impl DslBuilder { rechunk, row_index, file_counter: Default::default(), - // TODO: Support Hive partitioning. - hive_options: HiveOptions { - enabled: Some(false), - ..Default::default() - }, + hive_options, }, predicate: None, scan_type: FileScan::Ipc { diff --git a/py-polars/polars/io/ipc/functions.py b/py-polars/polars/io/ipc/functions.py index deb9b7a8956b..106a025bf679 100644 --- a/py-polars/polars/io/ipc/functions.py +++ b/py-polars/polars/io/ipc/functions.py @@ -27,6 +27,7 @@ if TYPE_CHECKING: from polars import DataFrame, DataType, LazyFrame + from polars._typing import SchemaDict @deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4") @@ -313,6 +314,9 @@ def scan_ipc( memory_map: bool = True, retries: int = 0, file_cache_ttl: int | None = None, + hive_partitioning: bool | None = None, + hive_schema: SchemaDict | None = None, + try_parse_hive_dates: bool = True, ) -> LazyFrame: """ Lazily read from an Arrow IPC (Feather v2) file or multiple files via glob patterns. @@ -349,6 +353,20 @@ def scan_ipc( Amount of time to keep downloaded cloud files since their last access time, in seconds. Uses the `POLARS_FILE_CACHE_TTL` environment variable (which defaults to 1 hour) if not given. + hive_partitioning + Infer statistics and schema from Hive partitioned URL and use them + to prune reads. This is unset by default (i.e. `None`), meaning it is + automatically enabled when a single directory is passed, and otherwise + disabled. + hive_schema + The column names and data types of the columns by which the data is partitioned. + If set to `None` (default), the schema of the Hive partitions is inferred. + + .. warning:: + This functionality is considered **unstable**. It may be changed + at any point without it being considered a breaking change. + try_parse_hive_dates + Whether to try parsing hive values as date/datetime types. """ if isinstance(source, (str, Path)): @@ -382,5 +400,8 @@ def scan_ipc( cloud_options=storage_options, retries=retries, file_cache_ttl=file_cache_ttl, + hive_partitioning=hive_partitioning, + hive_schema=hive_schema, + try_parse_hive_dates=try_parse_hive_dates, ) return wrap_ldf(pylf) diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index 7345d54f0399..0b83303c7ee0 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -310,7 +310,7 @@ impl PyLazyFrame { #[cfg(feature = "ipc")] #[staticmethod] - #[pyo3(signature = (path, paths, n_rows, cache, rechunk, row_index, memory_map, cloud_options, retries, file_cache_ttl))] + #[pyo3(signature = (path, paths, n_rows, cache, rechunk, row_index, memory_map, cloud_options, hive_partitioning, hive_schema, try_parse_hive_dates, retries, file_cache_ttl))] fn new_from_ipc( path: Option, paths: Vec, @@ -320,6 +320,9 @@ impl PyLazyFrame { row_index: Option<(String, IdxSize)>, memory_map: bool, cloud_options: Option>, + hive_partitioning: Option, + hive_schema: Option>, + try_parse_hive_dates: bool, retries: usize, file_cache_ttl: Option, ) -> PyResult { @@ -357,6 +360,13 @@ impl PyLazyFrame { Some(cloud_options) }; + let hive_options = HiveOptions { + enabled: hive_partitioning, + hive_start_idx: 0, + schema: hive_schema.map(|x| Arc::new(x.0)), + try_parse_dates: try_parse_hive_dates, + }; + let args = ScanArgsIpc { n_rows, cache, @@ -365,6 +375,7 @@ impl PyLazyFrame { memory_map, #[cfg(feature = "cloud")] cloud_options, + hive_options, }; let lf = if let Some(path) = &path { diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 98ff82408eb6..7ce5a1497c8f 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -336,6 +336,7 @@ def test_read_parquet_hive_schema_with_pyarrow() -> None: ("scan_func", "write_func"), [ (pl.scan_parquet, pl.DataFrame.write_parquet), + (pl.scan_ipc, pl.DataFrame.write_ipc), ], ) @pytest.mark.parametrize( @@ -368,7 +369,10 @@ def test_hive_partition_directory_scan( hive_schema = df.lazy().select("a", "b").collect_schema() scan = scan_func - scan = partial(scan_func, hive_schema=hive_schema, glob=glob) + scan = partial(scan_func, hive_schema=hive_schema) + + if scan_func is pl.scan_parquet: + scan = partial(scan, glob=glob) out = scan( tmp_path, @@ -529,10 +533,20 @@ def test_hive_partition_force_async_17155(tmp_path: Path, monkeypatch: Any) -> N ) +@pytest.mark.parametrize( + ("scan_func", "write_func"), + [ + (pl.scan_parquet, pl.DataFrame.write_parquet), + (pl.scan_ipc, pl.DataFrame.write_ipc), + ], +) @pytest.mark.write_disk() @pytest.mark.parametrize("projection_pushdown", [True, False]) def test_hive_partition_columns_contained_in_file( - tmp_path: Path, projection_pushdown: bool + tmp_path: Path, + scan_func: Callable[[Any], pl.LazyFrame], + write_func: Callable[[pl.DataFrame, Path], None], + projection_pushdown: bool, ) -> None: path = tmp_path / "a=1/b=2/data.bin" path.parent.mkdir(exist_ok=True, parents=True) @@ -540,7 +554,7 @@ def test_hive_partition_columns_contained_in_file( {"x": 1, "a": 1, "b": 2, "y": 1}, schema={"x": pl.Int32, "a": pl.Int8, "b": pl.Int16, "y": pl.Int32}, ) - df.write_parquet(path) + write_func(df, path) def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: for projection in [ @@ -561,12 +575,12 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: df.select(projection), ) - lf = pl.scan_parquet(path, hive_partitioning=True) + lf = scan_func(path, hive_partitioning=True) # type: ignore[call-arg] rhs = df assert_frame_equal(lf.collect(projection_pushdown=projection_pushdown), rhs) assert_with_projections(lf, rhs) - lf = pl.scan_parquet( + lf = scan_func( # type: ignore[call-arg] path, hive_schema={"a": pl.String, "b": pl.String}, hive_partitioning=True, @@ -646,3 +660,29 @@ def test_hive_partition_dates(tmp_path: Path, monkeypatch: Any) -> None: lf.collect(), df.with_columns(pl.col("date1", "date2").cast(pl.String)), ) + + +@pytest.mark.parametrize( + ("scan_func", "write_func"), + [ + (pl.scan_parquet, pl.DataFrame.write_parquet), + (pl.scan_ipc, pl.DataFrame.write_ipc), + ], +) +@pytest.mark.write_disk() +def test_projection_only_hive_parts_gives_correct_number_of_rows( + tmp_path: Path, + scan_func: Callable[[Any], pl.LazyFrame], + write_func: Callable[[pl.DataFrame, Path], None], +) -> None: + # Check the number of rows projected when projecting only hive parts, which + # should be the same as the number of rows in the file. + path = tmp_path / "a=3/data.bin" + path.parent.mkdir(exist_ok=True, parents=True) + + write_func(pl.DataFrame({"x": [1, 1, 1]}), path) + + assert_frame_equal( + scan_func(path, hive_partitioning=True).select("a").collect(), # type: ignore[call-arg] + pl.DataFrame({"a": [3, 3, 3]}), + )