diff --git a/crates/polars-stream/src/nodes/parquet_source/init.rs b/crates/polars-stream/src/nodes/parquet_source/init.rs index 3187bbe797e4..a722186ff497 100644 --- a/crates/polars-stream/src/nodes/parquet_source/init.rs +++ b/crates/polars-stream/src/nodes/parquet_source/init.rs @@ -1,3 +1,4 @@ +use std::collections::VecDeque; use std::future::Future; use std::sync::Arc; @@ -14,7 +15,6 @@ use super::{AsyncTaskData, ParquetSourceNode}; use crate::async_executor; use crate::async_primitives::connector::connector; use crate::async_primitives::wait_group::{WaitGroup, WaitToken}; -use crate::morsel::get_ideal_morsel_size; use crate::nodes::{MorselSeq, TaskPriority}; use crate::utils::task_handles_ext; @@ -118,6 +118,8 @@ impl ParquetSourceNode { let row_group_decoder = self.init_row_group_decoder(); let row_group_decoder = Arc::new(row_group_decoder); + let ideal_morsel_size = self.config.ideal_morsel_size; + // Distributes morsels across pipelines. This does not perform any CPU or I/O bound work - // it is purely a dispatch loop. let raw_morsel_distributor_task_handle = io_runtime.spawn(async move { @@ -191,25 +193,31 @@ impl ParquetSourceNode { ); let morsel_seq_ref = &mut MorselSeq::default(); - let mut dfs = vec![].into_iter(); + let mut dfs = VecDeque::with_capacity(1); 'main: loop { let Some(mut indexed_wait_group) = wait_groups.next().await else { break; }; - if dfs.len() == 0 { + while dfs.is_empty() { let Some(v) = df_stream.next().await else { - break; + break 'main; }; - let v = v?; - assert!(!v.is_empty()); + let df = v?; + + if df.is_empty() { + continue; + } - dfs = v.into_iter(); + let (iter, n) = split_to_morsels(&df, ideal_morsel_size); + + dfs.reserve(n); + dfs.extend(iter); } - let mut df = dfs.next().unwrap(); + let mut df = dfs.pop_front().unwrap(); let morsel_seq = *morsel_seq_ref; *morsel_seq_ref = morsel_seq.successor(); @@ -270,7 +278,6 @@ impl ParquetSourceNode { let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap(); let row_index = self.file_options.row_index.clone(); let physical_predicate = self.physical_predicate.clone(); - let ideal_morsel_size = get_ideal_morsel_size(); let min_values_per_thread = self.config.min_values_per_thread; let mut use_prefiltered = physical_predicate.is_some() @@ -348,7 +355,6 @@ impl ParquetSourceNode { predicate_arrow_field_indices, non_predicate_arrow_field_indices, predicate_arrow_field_mask, - ideal_morsel_size, min_values_per_thread, } } @@ -402,6 +408,28 @@ fn filtered_range(exclude: &[usize], len: usize) -> Vec { .collect() } +/// Note: The 2nd return is an upper bound on the number of morsels rather than an exact count. +fn split_to_morsels( + df: &DataFrame, + ideal_morsel_size: usize, +) -> (impl Iterator + '_, usize) { + let n_morsels = if df.height() > 3 * ideal_morsel_size / 2 { + // num_rows > (1.5 * ideal_morsel_size) + (df.height() / ideal_morsel_size).max(2) + } else { + 1 + }; + + let rows_per_morsel = 1 + df.height() / n_morsels; + + ( + (0..i64::try_from(df.height()).unwrap()) + .step_by(rows_per_morsel) + .map(move |offset| df.slice(offset, rows_per_morsel)), + n_morsels, + ) +} + mod tests { #[test] diff --git a/crates/polars-stream/src/nodes/parquet_source/mod.rs b/crates/polars-stream/src/nodes/parquet_source/mod.rs index 44fc4e1f1239..5da3a934dde3 100644 --- a/crates/polars-stream/src/nodes/parquet_source/mod.rs +++ b/crates/polars-stream/src/nodes/parquet_source/mod.rs @@ -18,7 +18,7 @@ use polars_plan::prelude::FileScanOptions; use super::compute_node_prelude::*; use super::{MorselSeq, TaskPriority}; use crate::async_primitives::wait_group::WaitToken; -use crate::morsel::SourceToken; +use crate::morsel::{get_ideal_morsel_size, SourceToken}; use crate::utils::task_handles_ext; mod init; @@ -70,6 +70,7 @@ struct Config { /// Minimum number of values for a parallel spawned task to process to amortize /// parallelism overhead. min_values_per_thread: usize, + ideal_morsel_size: usize, } #[allow(clippy::too_many_arguments)] @@ -110,6 +111,7 @@ impl ParquetSourceNode { metadata_decode_ahead_size: 0, row_group_prefetch_size: 0, min_values_per_thread: 0, + ideal_morsel_size: 0, }, verbose, physical_predicate: None, @@ -142,6 +144,7 @@ impl ComputeNode for ParquetSourceNode { let min_values_per_thread = std::env::var("POLARS_MIN_VALUES_PER_THREAD") .map(|x| x.parse::().expect("integer").max(1)) .unwrap_or(16_777_216); + let ideal_morsel_size = get_ideal_morsel_size(); Config { num_pipelines, @@ -149,6 +152,7 @@ impl ComputeNode for ParquetSourceNode { metadata_decode_ahead_size, row_group_prefetch_size, min_values_per_thread, + ideal_morsel_size, } }; diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs index 975ff6de22cb..d31f1e51f71e 100644 --- a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs +++ b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs @@ -38,7 +38,6 @@ pub(super) struct RowGroupDecoder { pub(super) non_predicate_arrow_field_indices: Vec, /// The nth bit is set to `true` if the field at that index is used in the predicate. pub(super) predicate_arrow_field_mask: Vec, - pub(super) ideal_morsel_size: usize, pub(super) min_values_per_thread: usize, } @@ -46,7 +45,7 @@ impl RowGroupDecoder { pub(super) async fn row_group_data_to_df( &self, row_group_data: RowGroupData, - ) -> PolarsResult> { + ) -> PolarsResult { if self.use_prefiltered.is_some() { self.row_group_data_to_df_prefiltered(row_group_data).await } else { @@ -57,7 +56,7 @@ impl RowGroupDecoder { async fn row_group_data_to_df_impl( &self, row_group_data: RowGroupData, - ) -> PolarsResult> { + ) -> PolarsResult { let row_group_data = Arc::new(row_group_data); let out_width = self.row_index.is_some() as usize @@ -131,7 +130,7 @@ impl RowGroupDecoder { assert_eq!(df.width(), out_width); // `out_width` should have been calculated correctly - Ok(self.split_to_morsels(df)) + Ok(df) } async fn shared_file_state_init_func(&self, row_group_data: &RowGroupData) -> SharedFileState { @@ -308,26 +307,6 @@ impl RowGroupDecoder { Ok(()) } - - fn split_to_morsels(&self, df: DataFrame) -> Vec { - let n_morsels = if df.height() > 3 * self.ideal_morsel_size / 2 { - // num_rows > (1.5 * ideal_morsel_size) - (df.height() / self.ideal_morsel_size).max(2) - } else { - 1 - } as u64; - - if n_morsels == 1 { - return vec![df]; - } - - let rows_per_morsel = 1 + df.height() / n_morsels as usize; - - (0..i64::try_from(df.height()).unwrap()) - .step_by(rows_per_morsel) - .map(|offset| df.slice(offset, rows_per_morsel)) - .collect::>() - } } fn decode_column( @@ -478,7 +457,7 @@ impl RowGroupDecoder { async fn row_group_data_to_df_prefiltered( &self, row_group_data: RowGroupData, - ) -> PolarsResult> { + ) -> PolarsResult { debug_assert!(row_group_data.slice.is_none()); // Invariant of the optimizer. assert!(self.predicate_arrow_field_indices.len() <= self.projected_arrow_schema.len()); @@ -614,7 +593,7 @@ impl RowGroupDecoder { assert_eq!(dead_rem.len(), 0); let df = unsafe { DataFrame::new_no_checks(expected_num_rows, out_columns) }; - Ok(self.split_to_morsels(df)) + Ok(df) } }