From 503758b40d22d6cbf9ebcb0a0f09cc0d9c0cc31d Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Mon, 28 Oct 2024 22:59:01 +1100 Subject: [PATCH 1/6] c --- .../src/nodes/parquet_source/init.rs | 49 +++++++++++++++---- .../src/nodes/parquet_source/mod.rs | 6 ++- .../nodes/parquet_source/row_group_decode.rs | 31 ++---------- 3 files changed, 50 insertions(+), 36 deletions(-) diff --git a/crates/polars-stream/src/nodes/parquet_source/init.rs b/crates/polars-stream/src/nodes/parquet_source/init.rs index 3187bbe797e4..ddbed67962a1 100644 --- a/crates/polars-stream/src/nodes/parquet_source/init.rs +++ b/crates/polars-stream/src/nodes/parquet_source/init.rs @@ -1,3 +1,4 @@ +use std::collections::VecDeque; use std::future::Future; use std::sync::Arc; @@ -14,7 +15,6 @@ use super::{AsyncTaskData, ParquetSourceNode}; use crate::async_executor; use crate::async_primitives::connector::connector; use crate::async_primitives::wait_group::{WaitGroup, WaitToken}; -use crate::morsel::get_ideal_morsel_size; use crate::nodes::{MorselSeq, TaskPriority}; use crate::utils::task_handles_ext; @@ -118,6 +118,8 @@ impl ParquetSourceNode { let row_group_decoder = self.init_row_group_decoder(); let row_group_decoder = Arc::new(row_group_decoder); + let ideal_morsel_size = self.config.ideal_morsel_size; + // Distributes morsels across pipelines. This does not perform any CPU or I/O bound work - // it is purely a dispatch loop. let raw_morsel_distributor_task_handle = io_runtime.spawn(async move { @@ -191,25 +193,33 @@ impl ParquetSourceNode { ); let morsel_seq_ref = &mut MorselSeq::default(); - let mut dfs = vec![].into_iter(); + let mut dfs = VecDeque::with_capacity(1); 'main: loop { let Some(mut indexed_wait_group) = wait_groups.next().await else { break; }; - if dfs.len() == 0 { + if dfs.is_empty() { let Some(v) = df_stream.next().await else { break; }; - let v = v?; - assert!(!v.is_empty()); + let df = v?; + assert!(!df.is_empty()); + + let opt_splitted = split_to_morsels(&df, ideal_morsel_size); - dfs = v.into_iter(); + if let Some((iter, n)) = opt_splitted { + dfs.reserve(n); + dfs.extend(iter); + } else { + drop(opt_splitted); + dfs.push_back(df); + } } - let mut df = dfs.next().unwrap(); + let mut df = dfs.pop_front().unwrap(); let morsel_seq = *morsel_seq_ref; *morsel_seq_ref = morsel_seq.successor(); @@ -270,7 +280,6 @@ impl ParquetSourceNode { let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap(); let row_index = self.file_options.row_index.clone(); let physical_predicate = self.physical_predicate.clone(); - let ideal_morsel_size = get_ideal_morsel_size(); let min_values_per_thread = self.config.min_values_per_thread; let mut use_prefiltered = physical_predicate.is_some() @@ -348,7 +357,6 @@ impl ParquetSourceNode { predicate_arrow_field_indices, non_predicate_arrow_field_indices, predicate_arrow_field_mask, - ideal_morsel_size, min_values_per_thread, } } @@ -402,6 +410,29 @@ fn filtered_range(exclude: &[usize], len: usize) -> Vec { .collect() } +/// Note: The 2nd argument is an upper bound on the number of morsels rather than an exact count. +fn split_to_morsels( + df: &DataFrame, + ideal_morsel_size: usize, +) -> Option<(impl Iterator + '_, usize)> { + let n_morsels = if df.height() > 3 * ideal_morsel_size / 2 { + // num_rows > (1.5 * ideal_morsel_size) + (df.height() / ideal_morsel_size).max(2) + } else { + 1 + }; + + (n_morsels > 1).then(move || { + let rows_per_morsel = 1 + df.height() / n_morsels as usize; + ( + (0..i64::try_from(df.height()).unwrap()) + .step_by(rows_per_morsel) + .map(move |offset| df.slice(offset, rows_per_morsel)), + n_morsels, + ) + }) +} + mod tests { #[test] diff --git a/crates/polars-stream/src/nodes/parquet_source/mod.rs b/crates/polars-stream/src/nodes/parquet_source/mod.rs index 44fc4e1f1239..5da3a934dde3 100644 --- a/crates/polars-stream/src/nodes/parquet_source/mod.rs +++ b/crates/polars-stream/src/nodes/parquet_source/mod.rs @@ -18,7 +18,7 @@ use polars_plan::prelude::FileScanOptions; use super::compute_node_prelude::*; use super::{MorselSeq, TaskPriority}; use crate::async_primitives::wait_group::WaitToken; -use crate::morsel::SourceToken; +use crate::morsel::{get_ideal_morsel_size, SourceToken}; use crate::utils::task_handles_ext; mod init; @@ -70,6 +70,7 @@ struct Config { /// Minimum number of values for a parallel spawned task to process to amortize /// parallelism overhead. min_values_per_thread: usize, + ideal_morsel_size: usize, } #[allow(clippy::too_many_arguments)] @@ -110,6 +111,7 @@ impl ParquetSourceNode { metadata_decode_ahead_size: 0, row_group_prefetch_size: 0, min_values_per_thread: 0, + ideal_morsel_size: 0, }, verbose, physical_predicate: None, @@ -142,6 +144,7 @@ impl ComputeNode for ParquetSourceNode { let min_values_per_thread = std::env::var("POLARS_MIN_VALUES_PER_THREAD") .map(|x| x.parse::().expect("integer").max(1)) .unwrap_or(16_777_216); + let ideal_morsel_size = get_ideal_morsel_size(); Config { num_pipelines, @@ -149,6 +152,7 @@ impl ComputeNode for ParquetSourceNode { metadata_decode_ahead_size, row_group_prefetch_size, min_values_per_thread, + ideal_morsel_size, } }; diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs index 975ff6de22cb..d31f1e51f71e 100644 --- a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs +++ b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs @@ -38,7 +38,6 @@ pub(super) struct RowGroupDecoder { pub(super) non_predicate_arrow_field_indices: Vec, /// The nth bit is set to `true` if the field at that index is used in the predicate. pub(super) predicate_arrow_field_mask: Vec, - pub(super) ideal_morsel_size: usize, pub(super) min_values_per_thread: usize, } @@ -46,7 +45,7 @@ impl RowGroupDecoder { pub(super) async fn row_group_data_to_df( &self, row_group_data: RowGroupData, - ) -> PolarsResult> { + ) -> PolarsResult { if self.use_prefiltered.is_some() { self.row_group_data_to_df_prefiltered(row_group_data).await } else { @@ -57,7 +56,7 @@ impl RowGroupDecoder { async fn row_group_data_to_df_impl( &self, row_group_data: RowGroupData, - ) -> PolarsResult> { + ) -> PolarsResult { let row_group_data = Arc::new(row_group_data); let out_width = self.row_index.is_some() as usize @@ -131,7 +130,7 @@ impl RowGroupDecoder { assert_eq!(df.width(), out_width); // `out_width` should have been calculated correctly - Ok(self.split_to_morsels(df)) + Ok(df) } async fn shared_file_state_init_func(&self, row_group_data: &RowGroupData) -> SharedFileState { @@ -308,26 +307,6 @@ impl RowGroupDecoder { Ok(()) } - - fn split_to_morsels(&self, df: DataFrame) -> Vec { - let n_morsels = if df.height() > 3 * self.ideal_morsel_size / 2 { - // num_rows > (1.5 * ideal_morsel_size) - (df.height() / self.ideal_morsel_size).max(2) - } else { - 1 - } as u64; - - if n_morsels == 1 { - return vec![df]; - } - - let rows_per_morsel = 1 + df.height() / n_morsels as usize; - - (0..i64::try_from(df.height()).unwrap()) - .step_by(rows_per_morsel) - .map(|offset| df.slice(offset, rows_per_morsel)) - .collect::>() - } } fn decode_column( @@ -478,7 +457,7 @@ impl RowGroupDecoder { async fn row_group_data_to_df_prefiltered( &self, row_group_data: RowGroupData, - ) -> PolarsResult> { + ) -> PolarsResult { debug_assert!(row_group_data.slice.is_none()); // Invariant of the optimizer. assert!(self.predicate_arrow_field_indices.len() <= self.projected_arrow_schema.len()); @@ -614,7 +593,7 @@ impl RowGroupDecoder { assert_eq!(dead_rem.len(), 0); let df = unsafe { DataFrame::new_no_checks(expected_num_rows, out_columns) }; - Ok(self.split_to_morsels(df)) + Ok(df) } } From 3239d77f6b60b730d2f31fb99813d504772aa094 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Mon, 28 Oct 2024 23:24:55 +1100 Subject: [PATCH 2/6] c --- crates/polars-stream/src/nodes/parquet_source/init.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/polars-stream/src/nodes/parquet_source/init.rs b/crates/polars-stream/src/nodes/parquet_source/init.rs index ddbed67962a1..5354d3f36029 100644 --- a/crates/polars-stream/src/nodes/parquet_source/init.rs +++ b/crates/polars-stream/src/nodes/parquet_source/init.rs @@ -423,7 +423,7 @@ fn split_to_morsels( }; (n_morsels > 1).then(move || { - let rows_per_morsel = 1 + df.height() / n_morsels as usize; + let rows_per_morsel = 1 + df.height() / n_morsels; ( (0..i64::try_from(df.height()).unwrap()) .step_by(rows_per_morsel) From 9661f41dfb9940e4bc6b9872466b946eafd6e835 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Mon, 28 Oct 2024 23:46:42 +1100 Subject: [PATCH 3/6] c --- .../src/nodes/parquet_source/init.rs | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/crates/polars-stream/src/nodes/parquet_source/init.rs b/crates/polars-stream/src/nodes/parquet_source/init.rs index 5354d3f36029..508022ea9c6f 100644 --- a/crates/polars-stream/src/nodes/parquet_source/init.rs +++ b/crates/polars-stream/src/nodes/parquet_source/init.rs @@ -201,21 +201,26 @@ impl ParquetSourceNode { }; if dfs.is_empty() { - let Some(v) = df_stream.next().await else { - break; - }; - - let df = v?; - assert!(!df.is_empty()); - - let opt_splitted = split_to_morsels(&df, ideal_morsel_size); - - if let Some((iter, n)) = opt_splitted { - dfs.reserve(n); - dfs.extend(iter); - } else { - drop(opt_splitted); - dfs.push_back(df); + loop { + let Some(v) = df_stream.next().await else { + break 'main; + }; + + let df = v?; + + if df.is_empty() { + continue; + } + + let opt_splitted = split_to_morsels(&df, ideal_morsel_size); + + if let Some((iter, n)) = opt_splitted { + dfs.reserve(n); + dfs.extend(iter); + } else { + drop(opt_splitted); + dfs.push_back(df); + } } } From 37f84ec0fb806cd5aa0fd881633b451ab718d2d0 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Mon, 28 Oct 2024 23:47:25 +1100 Subject: [PATCH 4/6] c --- crates/polars-stream/src/nodes/parquet_source/init.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/polars-stream/src/nodes/parquet_source/init.rs b/crates/polars-stream/src/nodes/parquet_source/init.rs index 508022ea9c6f..f4d2b7cbf73c 100644 --- a/crates/polars-stream/src/nodes/parquet_source/init.rs +++ b/crates/polars-stream/src/nodes/parquet_source/init.rs @@ -209,6 +209,7 @@ impl ParquetSourceNode { let df = v?; if df.is_empty() { + // Can be empty due to filtering continue; } From 8de1c53b2decb3fdf928a61fc3fb7237c449f559 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 29 Oct 2024 00:14:39 +1100 Subject: [PATCH 5/6] c --- .../src/nodes/parquet_source/init.rs | 58 ++++++++----------- 1 file changed, 25 insertions(+), 33 deletions(-) diff --git a/crates/polars-stream/src/nodes/parquet_source/init.rs b/crates/polars-stream/src/nodes/parquet_source/init.rs index f4d2b7cbf73c..4c5a04067d5c 100644 --- a/crates/polars-stream/src/nodes/parquet_source/init.rs +++ b/crates/polars-stream/src/nodes/parquet_source/init.rs @@ -200,29 +200,22 @@ impl ParquetSourceNode { break; }; - if dfs.is_empty() { - loop { - let Some(v) = df_stream.next().await else { - break 'main; - }; - - let df = v?; - - if df.is_empty() { - // Can be empty due to filtering - continue; - } - - let opt_splitted = split_to_morsels(&df, ideal_morsel_size); - - if let Some((iter, n)) = opt_splitted { - dfs.reserve(n); - dfs.extend(iter); - } else { - drop(opt_splitted); - dfs.push_back(df); - } + while dfs.is_empty() { + let Some(v) = df_stream.next().await else { + break 'main; + }; + + let df = v?; + + if df.is_empty() { + continue; } + + let (iter, n) = split_to_morsels(&df, ideal_morsel_size); + + dfs.reserve(n); + dfs.extend(iter); + break; } let mut df = dfs.pop_front().unwrap(); @@ -416,11 +409,11 @@ fn filtered_range(exclude: &[usize], len: usize) -> Vec { .collect() } -/// Note: The 2nd argument is an upper bound on the number of morsels rather than an exact count. +/// Note: The 2nd return is an upper bound on the number of morsels rather than an exact count. fn split_to_morsels( df: &DataFrame, ideal_morsel_size: usize, -) -> Option<(impl Iterator + '_, usize)> { +) -> (impl Iterator + '_, usize) { let n_morsels = if df.height() > 3 * ideal_morsel_size / 2 { // num_rows > (1.5 * ideal_morsel_size) (df.height() / ideal_morsel_size).max(2) @@ -428,15 +421,14 @@ fn split_to_morsels( 1 }; - (n_morsels > 1).then(move || { - let rows_per_morsel = 1 + df.height() / n_morsels; - ( - (0..i64::try_from(df.height()).unwrap()) - .step_by(rows_per_morsel) - .map(move |offset| df.slice(offset, rows_per_morsel)), - n_morsels, - ) - }) + let rows_per_morsel = 1 + df.height() / n_morsels; + + ( + (0..i64::try_from(df.height()).unwrap()) + .step_by(rows_per_morsel) + .map(move |offset| df.slice(offset, rows_per_morsel)), + n_morsels, + ) } mod tests { From 727fdb1de5430082f5433c88e83086d83c539f77 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 29 Oct 2024 00:14:50 +1100 Subject: [PATCH 6/6] c --- crates/polars-stream/src/nodes/parquet_source/init.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/polars-stream/src/nodes/parquet_source/init.rs b/crates/polars-stream/src/nodes/parquet_source/init.rs index 4c5a04067d5c..a722186ff497 100644 --- a/crates/polars-stream/src/nodes/parquet_source/init.rs +++ b/crates/polars-stream/src/nodes/parquet_source/init.rs @@ -215,7 +215,6 @@ impl ParquetSourceNode { dfs.reserve(n); dfs.extend(iter); - break; } let mut df = dfs.pop_front().unwrap();