Skip to content

Commit

Permalink
fix: Fix incorrect scan_parquet().with_row_index() with non-zero sl…
Browse files Browse the repository at this point in the history
…ice or with streaming collect (#19609)
  • Loading branch information
nameexhaustion authored Nov 4, 2024
1 parent 5210697 commit e98fb41
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 37 deletions.
7 changes: 5 additions & 2 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,10 @@ fn rg_to_dfs_optionally_par_over_columns(

let mut df = unsafe { DataFrame::new_no_checks(rg_slice.1, columns) };
if let Some(rc) = &row_index {
df.with_row_index_mut(rc.name.clone(), Some(*previous_row_count + rc.offset));
df.with_row_index_mut(
rc.name.clone(),
Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),
);
}

materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns, rg_slice.1);
Expand Down Expand Up @@ -830,7 +833,7 @@ fn rg_to_dfs_par_over_rg(
if let Some(rc) = &row_index {
df.with_row_index_mut(
rc.name.clone(),
Some(row_count_start as IdxSize + rc.offset),
Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),
);
}

Expand Down
48 changes: 34 additions & 14 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl ParquetExec {
}
};
let predicate = self.predicate.clone().map(phys_expr_to_io_expr);
let mut base_row_index = self.file_options.row_index.take();

// (offset, end)
let (slice_offset, slice_end) = if let Some(slice) = self.file_options.slice {
Expand All @@ -82,6 +83,7 @@ impl ParquetExec {
// slice into a positive-offset equivalent.
let slice_start_as_n_from_end = -slice.0 as usize;
let mut cum_rows = 0;
let mut first_source_row_offset = 0;
let chunk_size = 8;
POOL.install(|| {
for path_indexes in (0..self.sources.len())
Expand All @@ -107,16 +109,19 @@ impl ParquetExec {
.collect::<PolarsResult<Vec<_>>>()?;

for (path_idx, rc) in path_indexes.iter().zip(row_counts) {
cum_rows += rc;
if first_source == 0 {
cum_rows += rc;

if cum_rows >= slice_start_as_n_from_end {
first_source = *path_idx;
break;
}
}
if cum_rows >= slice_start_as_n_from_end {
first_source = *path_idx;

if first_source > 0 {
break;
if base_row_index.is_none() {
break;
}
}
} else {
first_source_row_offset += rc;
}
}
}

Expand All @@ -134,14 +139,17 @@ impl ParquetExec {

let end = start.saturating_add(len);

if let Some(ri) = base_row_index.as_mut() {
ri.offset += first_source_row_offset as IdxSize;
}

(start, end)
}
} else {
(0, usize::MAX)
};

let mut current_offset = 0;
let base_row_index = self.file_options.row_index.take();
// Limit no. of files at a time to prevent open file limits.

for i in (first_source..self.sources.len()).step_by(step) {
Expand Down Expand Up @@ -268,6 +276,7 @@ impl ParquetExec {
}
};
let predicate = self.predicate.clone().map(phys_expr_to_io_expr);
let mut base_row_index = self.file_options.row_index.take();

// Modified if we have a negative slice
let mut first_file_idx = 0;
Expand All @@ -281,6 +290,7 @@ impl ParquetExec {
// slice into a positive-offset equivalent.
let slice_start_as_n_from_end = -slice.0 as usize;
let mut cum_rows = 0;
let mut first_source_row_offset = 0;

let paths = &paths;
let cloud_options = Arc::new(self.cloud_options.clone());
Expand Down Expand Up @@ -312,11 +322,18 @@ impl ParquetExec {
while let Some(v) = iter.next().await {
let (path_idx, num_rows) = v.unwrap()?;

cum_rows += num_rows;
if first_file_idx == 0 {
cum_rows += num_rows;

if cum_rows >= slice_start_as_n_from_end {
first_file_idx = path_idx;

if cum_rows >= slice_start_as_n_from_end {
first_file_idx = path_idx;
break;
if base_row_index.is_none() {
break;
}
}
} else {
first_source_row_offset += num_rows;
}
}

Expand All @@ -331,14 +348,17 @@ impl ParquetExec {

let end = start.saturating_add(len);

if let Some(ri) = base_row_index.as_mut() {
ri.offset += first_source_row_offset as IdxSize;
}

(start, end)
}
} else {
(0, usize::MAX)
};

let mut current_offset = 0;
let base_row_index = self.file_options.row_index.take();
let mut processed = 0;

for batch_start in (first_file_idx..paths.len()).step_by(batch_size) {
Expand Down
16 changes: 13 additions & 3 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ impl ParquetSource {
self.projected_arrow_schema.as_deref(),
self.file_options.allow_missing_columns,
)?
.with_row_index(file_options.row_index)
.with_row_index(file_options.row_index.map(|mut ri| {
ri.offset += self.processed_rows.load(Ordering::Relaxed) as IdxSize;
ri
}))
.with_predicate(predicate.clone())
.use_statistics(options.use_statistics)
.with_hive_partition_columns(hive_partitions)
Expand Down Expand Up @@ -196,7 +199,10 @@ impl ParquetSource {
let mut async_reader =
ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), metadata)
.await?
.with_row_index(file_options.row_index)
.with_row_index(file_options.row_index.map(|mut ri| {
ri.offset += self.processed_rows.load(Ordering::Relaxed) as IdxSize;
ri
}))
.with_arrow_schema_projection(
&self.first_schema,
self.projected_arrow_schema.as_deref(),
Expand Down Expand Up @@ -317,7 +323,11 @@ impl ParquetSource {
.collect::<Vec<_>>();
let init_iter = range.into_iter().map(|index| self.init_reader_async(index));

let batched_readers = if self.file_options.slice.is_some() {
let needs_exact_processed_rows_count =
self.file_options.slice.is_some() || self.file_options.row_index.is_some();

let batched_readers = if needs_exact_processed_rows_count {
// We run serially to ensure we have a correct processed_rows count.
polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
futures::stream::iter(init_iter)
.then(|x| x)
Expand Down
13 changes: 9 additions & 4 deletions crates/polars-stream/src/nodes/parquet_source/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use super::{AsyncTaskData, ParquetSourceNode};
use crate::async_executor;
use crate::async_primitives::connector::connector;
use crate::async_primitives::wait_group::{WaitGroup, WaitToken};
use crate::morsel::get_ideal_morsel_size;
use crate::nodes::{MorselSeq, TaskPriority};
use crate::utils::task_handles_ext;

Expand All @@ -29,10 +30,10 @@ impl ParquetSourceNode {
eprintln!("[ParquetSource]: Shutting down");
}

let (mut raw_morsel_receivers, morsel_stream_task_handle) =
let (raw_morsel_receivers, morsel_stream_task_handle) =
async_task_data.try_lock().unwrap().take().unwrap();

raw_morsel_receivers.clear();
drop(raw_morsel_receivers);
// Join on the producer handle to catch errors/panics.
// Safety
// * We dropped the receivers on the line above
Expand Down Expand Up @@ -118,7 +119,11 @@ impl ParquetSourceNode {
let row_group_decoder = self.init_row_group_decoder();
let row_group_decoder = Arc::new(row_group_decoder);

let ideal_morsel_size = self.config.ideal_morsel_size;
let ideal_morsel_size = get_ideal_morsel_size();

if verbose {
eprintln!("[ParquetSource]: ideal_morsel_size: {}", ideal_morsel_size);
}

// Distributes morsels across pipelines. This does not perform any CPU or I/O bound work -
// it is purely a dispatch loop.
Expand Down Expand Up @@ -276,7 +281,7 @@ impl ParquetSourceNode {
.unwrap_or(0);
let include_file_paths = self.file_options.include_file_paths.clone();
let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap();
let row_index = self.file_options.row_index.clone();
let row_index = self.row_index.clone();
let physical_predicate = self.physical_predicate.clone();
let min_values_per_thread = self.config.min_values_per_thread;

Expand Down
30 changes: 25 additions & 5 deletions crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_io::prelude::_internal::ensure_matching_dtypes_if_found;
use polars_io::utils::byte_source::{DynByteSource, MemSliceByteSource};
use polars_io::utils::slice::SplitSlicePosition;
use polars_utils::mmap::MemSlice;
use polars_utils::IdxSize;

use super::metadata_utils::{ensure_schema_has_projected_fields, read_parquet_metadata_bytes};
use super::ParquetSourceNode;
Expand Down Expand Up @@ -289,23 +290,42 @@ impl ParquetSourceNode {
.map(process_metadata_bytes)
.buffered(metadata_decode_ahead_size);

let row_index = self.row_index.clone();

// Note:
// * We want to wait until the first morsel is requested before starting this
let init_negative_slice_and_metadata = async move {
let mut processed_metadata_rev = vec![];
let mut cum_rows = 0;
let mut row_index_adjust = 0;

while let Some(v) = metadata_stream.next().await {
let v = v?;
let (_, _, metadata) = &v;
cum_rows += metadata.num_rows;
processed_metadata_rev.push(v);
let n_rows = metadata.num_rows;

if cum_rows >= slice_start_as_n_from_end {
break;
if cum_rows < slice_start_as_n_from_end {
processed_metadata_rev.push(v);
cum_rows += n_rows;

if cum_rows >= slice_start_as_n_from_end && row_index.is_none() {
break;
}
} else {
// If we didn't already break it means a row_index was requested, so we need
// to count the number of rows in the skipped files and adjust the offset
// accordingly.
row_index_adjust += n_rows;
}
}

row_index.as_deref().map(|(_, offset)| {
offset.fetch_add(
row_index_adjust as IdxSize,
std::sync::atomic::Ordering::Relaxed,
)
});

let (start, len) = if slice_start_as_n_from_end > cum_rows {
// We need to trim the slice, e.g. SLICE[offset: -100, len: 75] on a file of 50
// rows should only give the first 25 rows.
Expand All @@ -316,7 +336,7 @@ impl ParquetSourceNode {
};

if len == 0 {
processed_metadata_rev.clear();
processed_metadata_rev = vec![];
}

normalized_slice_oneshot_tx
Expand Down
21 changes: 15 additions & 6 deletions crates/polars-stream/src/nodes/parquet_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ use polars_io::utils::byte_source::DynByteSourceBuilder;
use polars_plan::plans::hive::HivePartitions;
use polars_plan::plans::{FileInfo, ScanSources};
use polars_plan::prelude::FileScanOptions;
use polars_utils::index::AtomicIdxSize;
use polars_utils::pl_str::PlSmallStr;

use super::compute_node_prelude::*;
use super::{MorselSeq, TaskPriority};
use crate::async_primitives::wait_group::WaitToken;
use crate::morsel::{get_ideal_morsel_size, SourceToken};
use crate::morsel::SourceToken;
use crate::utils::task_handles_ext;

mod init;
Expand Down Expand Up @@ -51,6 +53,11 @@ pub struct ParquetSourceNode {
projected_arrow_schema: Option<Arc<ArrowSchema>>,
byte_source_builder: DynByteSourceBuilder,
memory_prefetch_func: fn(&[u8]) -> (),
/// The offset is an AtomicIdxSize, as in the negative slice case, the row
/// offset becomes relative to the starting point in the list of files,
/// so the row index offset needs to be updated by the initializer to
/// reflect this (https://github.com/pola-rs/polars/issues/19607).
row_index: Option<Arc<(PlSmallStr, AtomicIdxSize)>>,
// This permit blocks execution until the first morsel is requested.
morsel_stream_starter: Option<tokio::sync::oneshot::Sender<()>>,
// This is behind a Mutex so that we can call `shutdown()` asynchronously.
Expand All @@ -70,7 +77,6 @@ struct Config {
/// Minimum number of values for a parallel spawned task to process to amortize
/// parallelism overhead.
min_values_per_thread: usize,
ideal_morsel_size: usize,
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -82,7 +88,7 @@ impl ParquetSourceNode {
predicate: Option<Arc<dyn PhysicalExpr>>,
options: ParquetOptions,
cloud_options: Option<CloudOptions>,
file_options: FileScanOptions,
mut file_options: FileScanOptions,
first_metadata: Option<Arc<FileMetadata>>,
) -> Self {
let verbose = config::verbose();
Expand All @@ -94,6 +100,11 @@ impl ParquetSourceNode {
};
let memory_prefetch_func = get_memory_prefetch_func(verbose);

let row_index = file_options
.row_index
.take()
.map(|ri| Arc::new((ri.name, AtomicIdxSize::new(ri.offset))));

Self {
scan_sources,
file_info,
Expand All @@ -111,14 +122,14 @@ impl ParquetSourceNode {
metadata_decode_ahead_size: 0,
row_group_prefetch_size: 0,
min_values_per_thread: 0,
ideal_morsel_size: 0,
},
verbose,
physical_predicate: None,
schema: None,
projected_arrow_schema: None,
byte_source_builder,
memory_prefetch_func,
row_index,

morsel_stream_starter: None,
async_task_data: Arc::new(tokio::sync::Mutex::new(None)),
Expand All @@ -144,15 +155,13 @@ impl ComputeNode for ParquetSourceNode {
let min_values_per_thread = std::env::var("POLARS_MIN_VALUES_PER_THREAD")
.map(|x| x.parse::<usize>().expect("integer").max(1))
.unwrap_or(16_777_216);
let ideal_morsel_size = get_ideal_morsel_size();

Config {
num_pipelines,
metadata_prefetch_size,
metadata_decode_ahead_size,
row_group_prefetch_size,
min_values_per_thread,
ideal_morsel_size,
}
};

Expand Down
Loading

0 comments on commit e98fb41

Please sign in to comment.