diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index e0a096de498e..07bb47bb88a7 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -716,7 +716,10 @@ fn rg_to_dfs_optionally_par_over_columns( let mut df = unsafe { DataFrame::new_no_checks(rg_slice.1, columns) }; if let Some(rc) = &row_index { - df.with_row_index_mut(rc.name.clone(), Some(*previous_row_count + rc.offset)); + df.with_row_index_mut( + rc.name.clone(), + Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize), + ); } materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns, rg_slice.1); @@ -830,7 +833,7 @@ fn rg_to_dfs_par_over_rg( if let Some(rc) = &row_index { df.with_row_index_mut( rc.name.clone(), - Some(row_count_start as IdxSize + rc.offset), + Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize), ); } diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 49b01c471610..61a89693cfe0 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -72,6 +72,7 @@ impl ParquetExec { } }; let predicate = self.predicate.clone().map(phys_expr_to_io_expr); + let mut base_row_index = self.file_options.row_index.take(); // (offset, end) let (slice_offset, slice_end) = if let Some(slice) = self.file_options.slice { @@ -82,6 +83,7 @@ impl ParquetExec { // slice into a positive-offset equivalent. let slice_start_as_n_from_end = -slice.0 as usize; let mut cum_rows = 0; + let mut first_source_row_offset = 0; let chunk_size = 8; POOL.install(|| { for path_indexes in (0..self.sources.len()) @@ -107,16 +109,19 @@ impl ParquetExec { .collect::>>()?; for (path_idx, rc) in path_indexes.iter().zip(row_counts) { - cum_rows += rc; + if first_source == 0 { + cum_rows += rc; - if cum_rows >= slice_start_as_n_from_end { - first_source = *path_idx; - break; - } - } + if cum_rows >= slice_start_as_n_from_end { + first_source = *path_idx; - if first_source > 0 { - break; + if base_row_index.is_none() { + break; + } + } + } else { + first_source_row_offset += rc; + } } } @@ -134,6 +139,10 @@ impl ParquetExec { let end = start.saturating_add(len); + if let Some(ri) = base_row_index.as_mut() { + ri.offset += first_source_row_offset as IdxSize; + } + (start, end) } } else { @@ -141,7 +150,6 @@ impl ParquetExec { }; let mut current_offset = 0; - let base_row_index = self.file_options.row_index.take(); // Limit no. of files at a time to prevent open file limits. for i in (first_source..self.sources.len()).step_by(step) { @@ -268,6 +276,7 @@ impl ParquetExec { } }; let predicate = self.predicate.clone().map(phys_expr_to_io_expr); + let mut base_row_index = self.file_options.row_index.take(); // Modified if we have a negative slice let mut first_file_idx = 0; @@ -281,6 +290,7 @@ impl ParquetExec { // slice into a positive-offset equivalent. let slice_start_as_n_from_end = -slice.0 as usize; let mut cum_rows = 0; + let mut first_source_row_offset = 0; let paths = &paths; let cloud_options = Arc::new(self.cloud_options.clone()); @@ -312,11 +322,18 @@ impl ParquetExec { while let Some(v) = iter.next().await { let (path_idx, num_rows) = v.unwrap()?; - cum_rows += num_rows; + if first_file_idx == 0 { + cum_rows += num_rows; + + if cum_rows >= slice_start_as_n_from_end { + first_file_idx = path_idx; - if cum_rows >= slice_start_as_n_from_end { - first_file_idx = path_idx; - break; + if base_row_index.is_none() { + break; + } + } + } else { + first_source_row_offset += num_rows; } } @@ -331,6 +348,10 @@ impl ParquetExec { let end = start.saturating_add(len); + if let Some(ri) = base_row_index.as_mut() { + ri.offset += first_source_row_offset as IdxSize; + } + (start, end) } } else { @@ -338,7 +359,6 @@ impl ParquetExec { }; let mut current_offset = 0; - let base_row_index = self.file_options.row_index.take(); let mut processed = 0; for batch_start in (first_file_idx..paths.len()).step_by(batch_size) { diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index f09fbc0c8762..058563b91b61 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -136,7 +136,10 @@ impl ParquetSource { self.projected_arrow_schema.as_deref(), self.file_options.allow_missing_columns, )? - .with_row_index(file_options.row_index) + .with_row_index(file_options.row_index.map(|mut ri| { + ri.offset += self.processed_rows.load(Ordering::Relaxed) as IdxSize; + ri + })) .with_predicate(predicate.clone()) .use_statistics(options.use_statistics) .with_hive_partition_columns(hive_partitions) @@ -196,7 +199,10 @@ impl ParquetSource { let mut async_reader = ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), metadata) .await? - .with_row_index(file_options.row_index) + .with_row_index(file_options.row_index.map(|mut ri| { + ri.offset += self.processed_rows.load(Ordering::Relaxed) as IdxSize; + ri + })) .with_arrow_schema_projection( &self.first_schema, self.projected_arrow_schema.as_deref(), @@ -317,7 +323,11 @@ impl ParquetSource { .collect::>(); let init_iter = range.into_iter().map(|index| self.init_reader_async(index)); - let batched_readers = if self.file_options.slice.is_some() { + let needs_exact_processed_rows_count = + self.file_options.slice.is_some() || self.file_options.row_index.is_some(); + + let batched_readers = if needs_exact_processed_rows_count { + // We run serially to ensure we have a correct processed_rows count. polars_io::pl_async::get_runtime().block_on_potential_spawn(async { futures::stream::iter(init_iter) .then(|x| x) diff --git a/crates/polars-stream/src/nodes/parquet_source/init.rs b/crates/polars-stream/src/nodes/parquet_source/init.rs index a722186ff497..7aad10ddde6c 100644 --- a/crates/polars-stream/src/nodes/parquet_source/init.rs +++ b/crates/polars-stream/src/nodes/parquet_source/init.rs @@ -15,6 +15,7 @@ use super::{AsyncTaskData, ParquetSourceNode}; use crate::async_executor; use crate::async_primitives::connector::connector; use crate::async_primitives::wait_group::{WaitGroup, WaitToken}; +use crate::morsel::get_ideal_morsel_size; use crate::nodes::{MorselSeq, TaskPriority}; use crate::utils::task_handles_ext; @@ -29,10 +30,10 @@ impl ParquetSourceNode { eprintln!("[ParquetSource]: Shutting down"); } - let (mut raw_morsel_receivers, morsel_stream_task_handle) = + let (raw_morsel_receivers, morsel_stream_task_handle) = async_task_data.try_lock().unwrap().take().unwrap(); - raw_morsel_receivers.clear(); + drop(raw_morsel_receivers); // Join on the producer handle to catch errors/panics. // Safety // * We dropped the receivers on the line above @@ -118,7 +119,11 @@ impl ParquetSourceNode { let row_group_decoder = self.init_row_group_decoder(); let row_group_decoder = Arc::new(row_group_decoder); - let ideal_morsel_size = self.config.ideal_morsel_size; + let ideal_morsel_size = get_ideal_morsel_size(); + + if verbose { + eprintln!("[ParquetSource]: ideal_morsel_size: {}", ideal_morsel_size); + } // Distributes morsels across pipelines. This does not perform any CPU or I/O bound work - // it is purely a dispatch loop. @@ -276,7 +281,7 @@ impl ParquetSourceNode { .unwrap_or(0); let include_file_paths = self.file_options.include_file_paths.clone(); let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap(); - let row_index = self.file_options.row_index.clone(); + let row_index = self.row_index.clone(); let physical_predicate = self.physical_predicate.clone(); let min_values_per_thread = self.config.min_values_per_thread; diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs index e3377036b908..e5237cbccc8f 100644 --- a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs +++ b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs @@ -7,6 +7,7 @@ use polars_io::prelude::_internal::ensure_matching_dtypes_if_found; use polars_io::utils::byte_source::{DynByteSource, MemSliceByteSource}; use polars_io::utils::slice::SplitSlicePosition; use polars_utils::mmap::MemSlice; +use polars_utils::IdxSize; use super::metadata_utils::{ensure_schema_has_projected_fields, read_parquet_metadata_bytes}; use super::ParquetSourceNode; @@ -289,23 +290,42 @@ impl ParquetSourceNode { .map(process_metadata_bytes) .buffered(metadata_decode_ahead_size); + let row_index = self.row_index.clone(); + // Note: // * We want to wait until the first morsel is requested before starting this let init_negative_slice_and_metadata = async move { let mut processed_metadata_rev = vec![]; let mut cum_rows = 0; + let mut row_index_adjust = 0; while let Some(v) = metadata_stream.next().await { let v = v?; let (_, _, metadata) = &v; - cum_rows += metadata.num_rows; - processed_metadata_rev.push(v); + let n_rows = metadata.num_rows; - if cum_rows >= slice_start_as_n_from_end { - break; + if cum_rows < slice_start_as_n_from_end { + processed_metadata_rev.push(v); + cum_rows += n_rows; + + if cum_rows >= slice_start_as_n_from_end && row_index.is_none() { + break; + } + } else { + // If we didn't already break it means a row_index was requested, so we need + // to count the number of rows in the skipped files and adjust the offset + // accordingly. + row_index_adjust += n_rows; } } + row_index.as_deref().map(|(_, offset)| { + offset.fetch_add( + row_index_adjust as IdxSize, + std::sync::atomic::Ordering::Relaxed, + ) + }); + let (start, len) = if slice_start_as_n_from_end > cum_rows { // We need to trim the slice, e.g. SLICE[offset: -100, len: 75] on a file of 50 // rows should only give the first 25 rows. @@ -316,7 +336,7 @@ impl ParquetSourceNode { }; if len == 0 { - processed_metadata_rev.clear(); + processed_metadata_rev = vec![]; } normalized_slice_oneshot_tx diff --git a/crates/polars-stream/src/nodes/parquet_source/mod.rs b/crates/polars-stream/src/nodes/parquet_source/mod.rs index f9661cb863af..6427d08e2696 100644 --- a/crates/polars-stream/src/nodes/parquet_source/mod.rs +++ b/crates/polars-stream/src/nodes/parquet_source/mod.rs @@ -14,11 +14,13 @@ use polars_io::utils::byte_source::DynByteSourceBuilder; use polars_plan::plans::hive::HivePartitions; use polars_plan::plans::{FileInfo, ScanSources}; use polars_plan::prelude::FileScanOptions; +use polars_utils::index::AtomicIdxSize; +use polars_utils::pl_str::PlSmallStr; use super::compute_node_prelude::*; use super::{MorselSeq, TaskPriority}; use crate::async_primitives::wait_group::WaitToken; -use crate::morsel::{get_ideal_morsel_size, SourceToken}; +use crate::morsel::SourceToken; use crate::utils::task_handles_ext; mod init; @@ -51,6 +53,11 @@ pub struct ParquetSourceNode { projected_arrow_schema: Option>, byte_source_builder: DynByteSourceBuilder, memory_prefetch_func: fn(&[u8]) -> (), + /// The offset is an AtomicIdxSize, as in the negative slice case, the row + /// offset becomes relative to the starting point in the list of files, + /// so the row index offset needs to be updated by the initializer to + /// reflect this (https://github.com/pola-rs/polars/issues/19607). + row_index: Option>, // This permit blocks execution until the first morsel is requested. morsel_stream_starter: Option>, // This is behind a Mutex so that we can call `shutdown()` asynchronously. @@ -70,7 +77,6 @@ struct Config { /// Minimum number of values for a parallel spawned task to process to amortize /// parallelism overhead. min_values_per_thread: usize, - ideal_morsel_size: usize, } #[allow(clippy::too_many_arguments)] @@ -82,7 +88,7 @@ impl ParquetSourceNode { predicate: Option>, options: ParquetOptions, cloud_options: Option, - file_options: FileScanOptions, + mut file_options: FileScanOptions, first_metadata: Option>, ) -> Self { let verbose = config::verbose(); @@ -94,6 +100,11 @@ impl ParquetSourceNode { }; let memory_prefetch_func = get_memory_prefetch_func(verbose); + let row_index = file_options + .row_index + .take() + .map(|ri| Arc::new((ri.name, AtomicIdxSize::new(ri.offset)))); + Self { scan_sources, file_info, @@ -111,7 +122,6 @@ impl ParquetSourceNode { metadata_decode_ahead_size: 0, row_group_prefetch_size: 0, min_values_per_thread: 0, - ideal_morsel_size: 0, }, verbose, physical_predicate: None, @@ -119,6 +129,7 @@ impl ParquetSourceNode { projected_arrow_schema: None, byte_source_builder, memory_prefetch_func, + row_index, morsel_stream_starter: None, async_task_data: Arc::new(tokio::sync::Mutex::new(None)), @@ -144,7 +155,6 @@ impl ComputeNode for ParquetSourceNode { let min_values_per_thread = std::env::var("POLARS_MIN_VALUES_PER_THREAD") .map(|x| x.parse::().expect("integer").max(1)) .unwrap_or(16_777_216); - let ideal_morsel_size = get_ideal_morsel_size(); Config { num_pipelines, @@ -152,7 +162,6 @@ impl ComputeNode for ParquetSourceNode { metadata_decode_ahead_size, row_group_prefetch_size, min_values_per_thread, - ideal_morsel_size, } }; diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs index d31f1e51f71e..1c4c4f70f25b 100644 --- a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs +++ b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs @@ -12,9 +12,9 @@ use polars_io::predicates::PhysicalIoExpr; use polars_io::prelude::_internal::calc_prefilter_cost; pub use polars_io::prelude::_internal::PrefilterMaskSetting; use polars_io::prelude::try_set_sorted_flag; -use polars_io::RowIndex; use polars_plan::plans::hive::HivePartitions; use polars_plan::plans::ScanSources; +use polars_utils::index::AtomicIdxSize; use polars_utils::pl_str::PlSmallStr; use polars_utils::IdxSize; @@ -29,7 +29,7 @@ pub(super) struct RowGroupDecoder { pub(super) hive_partitions_width: usize, pub(super) include_file_paths: Option, pub(super) projected_arrow_schema: Arc, - pub(super) row_index: Option, + pub(super) row_index: Option>, pub(super) physical_predicate: Option>, pub(super) use_prefiltered: Option, /// Indices into `projected_arrow_schema. This must be sorted. @@ -178,7 +178,8 @@ impl RowGroupDecoder { row_group_data: &RowGroupData, slice_range: core::ops::Range, ) -> PolarsResult> { - if let Some(RowIndex { name, offset }) = self.row_index.as_ref() { + if let Some((name, offset)) = self.row_index.as_deref() { + let offset = offset.load(std::sync::atomic::Ordering::Relaxed); let projection_height = slice_range.len(); let Some(offset) = (|| { diff --git a/crates/polars-utils/src/index.rs b/crates/polars-utils/src/index.rs index 9069c2e42fcf..9d8cf67f8a58 100644 --- a/crates/polars-utils/src/index.rs +++ b/crates/polars-utils/src/index.rs @@ -14,6 +14,11 @@ pub type NonZeroIdxSize = std::num::NonZeroU32; #[cfg(feature = "bigidx")] pub type NonZeroIdxSize = std::num::NonZeroU64; +#[cfg(not(feature = "bigidx"))] +pub type AtomicIdxSize = std::sync::atomic::AtomicU32; +#[cfg(feature = "bigidx")] +pub type AtomicIdxSize = std::sync::atomic::AtomicU64; + #[derive(Clone, Copy)] #[repr(transparent)] pub struct NullableIdxSize { diff --git a/py-polars/tests/unit/io/test_lazy_parquet.py b/py-polars/tests/unit/io/test_lazy_parquet.py index 49a842386ea2..8235e7b31bba 100644 --- a/py-polars/tests/unit/io/test_lazy_parquet.py +++ b/py-polars/tests/unit/io/test_lazy_parquet.py @@ -514,6 +514,27 @@ def trim_to_metadata(path: str | Path) -> None: assert_frame_equal( pl.scan_parquet(paths[1:]).head(1).collect(streaming=streaming), df ) + assert_frame_equal( + ( + pl.scan_parquet([paths[1], paths[1], paths[1]]) + .with_row_index() + .slice(1, 1) + .collect(streaming=streaming) + ), + df.with_row_index(offset=1), + ) + assert_frame_equal( + ( + pl.scan_parquet([paths[1], paths[1], paths[1]]) + .with_row_index(offset=1) + .slice(1, 1) + .collect(streaming=streaming) + ), + df.with_row_index(offset=2), + ) + assert_frame_equal( + pl.scan_parquet(paths[1:]).head(1).collect(streaming=streaming), df + ) # Negative slice unsupported in streaming if not streaming: @@ -527,6 +548,17 @@ def trim_to_metadata(path: str | Path) -> None: df = pl.select(x=pl.int_range(0, 50)) df.write_parquet(path) assert_frame_equal(pl.scan_parquet(path).slice(-100, 75).collect(), df.head(25)) + assert_frame_equal( + pl.scan_parquet([path, path]).with_row_index().slice(-25, 100).collect(), + pl.concat([df, df]).with_row_index().slice(75), + ) + assert_frame_equal( + pl.scan_parquet([path, path]) + .with_row_index(offset=10) + .slice(-25, 100) + .collect(), + pl.concat([df, df]).with_row_index(offset=10).slice(75), + ) assert_frame_equal( pl.scan_parquet(path).slice(-1, (1 << 32) - 1).collect(), df.tail(1) ) @@ -765,3 +797,24 @@ def test_scan_parquet_ignores_dtype_mismatch_for_non_projected_columns_19249( .collect(streaming=streaming), pl.DataFrame({"a": [1, 1]}, schema={"a": pl.Int32}), ) + + +@pytest.mark.parametrize("streaming", [True, False]) +@pytest.mark.write_disk +def test_scan_parquet_streaming_row_index_19606( + tmp_path: Path, streaming: bool +) -> None: + tmp_path.mkdir(exist_ok=True) + paths = [tmp_path / "1", tmp_path / "2"] + + dfs = [pl.DataFrame({"x": i}) for i in range(len(paths))] + + for df, p in zip(dfs, paths): + df.write_parquet(p) + + assert_frame_equal( + pl.scan_parquet(tmp_path).with_row_index().collect(streaming=streaming), + pl.DataFrame( + {"index": [0, 1], "x": [0, 1]}, schema={"index": pl.UInt32, "x": pl.Int64} + ), + ) diff --git a/py-polars/tests/unit/operations/namespaces/test_meta.py b/py-polars/tests/unit/operations/namespaces/test_meta.py index f038dd27fdbf..38835244557e 100644 --- a/py-polars/tests/unit/operations/namespaces/test_meta.py +++ b/py-polars/tests/unit/operations/namespaces/test_meta.py @@ -149,6 +149,7 @@ def test_meta_tree_format(namespace_files_path: Path) -> None: def test_meta_show_graph(namespace_files_path: Path) -> None: e = (pl.col("foo") * pl.col("bar")).sum().over(pl.col("ham")) / 2 dot = e.meta.show_graph(show=False, raw_output=True) + assert dot is not None assert len(dot) > 0 # Don't check output contents since this creates a maintenance burden # Assume output check in test_meta_tree_format is enough