Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix incorrect scan_parquet().with_row_index() with non-zero slice or with streaming collect #19609

Merged
merged 5 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,10 @@ fn rg_to_dfs_optionally_par_over_columns(

let mut df = unsafe { DataFrame::new_no_checks(rg_slice.1, columns) };
if let Some(rc) = &row_index {
df.with_row_index_mut(rc.name.clone(), Some(*previous_row_count + rc.offset));
df.with_row_index_mut(
rc.name.clone(),
Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),
);
}

materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns, rg_slice.1);
Expand Down Expand Up @@ -830,7 +833,7 @@ fn rg_to_dfs_par_over_rg(
if let Some(rc) = &row_index {
df.with_row_index_mut(
rc.name.clone(),
Some(row_count_start as IdxSize + rc.offset),
Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fn rg_to_dfs*, we also add slice.0 (offset), as it may be non-zero in a negative-slice case

);
}

Expand Down
48 changes: 34 additions & 14 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl ParquetExec {
}
};
let predicate = self.predicate.clone().map(phys_expr_to_io_expr);
let mut base_row_index = self.file_options.row_index.take();

// (offset, end)
let (slice_offset, slice_end) = if let Some(slice) = self.file_options.slice {
Expand All @@ -82,6 +83,7 @@ impl ParquetExec {
// slice into a positive-offset equivalent.
let slice_start_as_n_from_end = -slice.0 as usize;
let mut cum_rows = 0;
let mut first_source_row_offset = 0;
let chunk_size = 8;
POOL.install(|| {
for path_indexes in (0..self.sources.len())
Expand All @@ -107,16 +109,19 @@ impl ParquetExec {
.collect::<PolarsResult<Vec<_>>>()?;

for (path_idx, rc) in path_indexes.iter().zip(row_counts) {
cum_rows += rc;
if first_source == 0 {
cum_rows += rc;

if cum_rows >= slice_start_as_n_from_end {
first_source = *path_idx;
break;
}
}
if cum_rows >= slice_start_as_n_from_end {
first_source = *path_idx;

if first_source > 0 {
break;
if base_row_index.is_none() {
break;
}
}
} else {
first_source_row_offset += rc;
}
}
}

Expand All @@ -134,14 +139,17 @@ impl ParquetExec {

let end = start.saturating_add(len);

if let Some(ri) = base_row_index.as_mut() {
ri.offset += first_source_row_offset as IdxSize;
}

(start, end)
}
} else {
(0, usize::MAX)
};

let mut current_offset = 0;
let base_row_index = self.file_options.row_index.take();
// Limit no. of files at a time to prevent open file limits.

for i in (first_source..self.sources.len()).step_by(step) {
Expand Down Expand Up @@ -268,6 +276,7 @@ impl ParquetExec {
}
};
let predicate = self.predicate.clone().map(phys_expr_to_io_expr);
let mut base_row_index = self.file_options.row_index.take();

// Modified if we have a negative slice
let mut first_file_idx = 0;
Expand All @@ -281,6 +290,7 @@ impl ParquetExec {
// slice into a positive-offset equivalent.
let slice_start_as_n_from_end = -slice.0 as usize;
let mut cum_rows = 0;
let mut first_source_row_offset = 0;

let paths = &paths;
let cloud_options = Arc::new(self.cloud_options.clone());
Expand Down Expand Up @@ -312,11 +322,18 @@ impl ParquetExec {
while let Some(v) = iter.next().await {
let (path_idx, num_rows) = v.unwrap()?;

cum_rows += num_rows;
if first_file_idx == 0 {
cum_rows += num_rows;

if cum_rows >= slice_start_as_n_from_end {
first_file_idx = path_idx;

if cum_rows >= slice_start_as_n_from_end {
first_file_idx = path_idx;
break;
if base_row_index.is_none() {
break;
}
}
} else {
first_source_row_offset += num_rows;
}
}

Expand All @@ -331,14 +348,17 @@ impl ParquetExec {

let end = start.saturating_add(len);

if let Some(ri) = base_row_index.as_mut() {
ri.offset += first_source_row_offset as IdxSize;
}

(start, end)
}
} else {
(0, usize::MAX)
};

let mut current_offset = 0;
let base_row_index = self.file_options.row_index.take();
let mut processed = 0;

for batch_start in (first_file_idx..paths.len()).step_by(batch_size) {
Expand Down
16 changes: 13 additions & 3 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ impl ParquetSource {
self.projected_arrow_schema.as_deref(),
self.file_options.allow_missing_columns,
)?
.with_row_index(file_options.row_index)
.with_row_index(file_options.row_index.map(|mut ri| {
ri.offset += self.processed_rows.load(Ordering::Relaxed) as IdxSize;
ri
}))
.with_predicate(predicate.clone())
.use_statistics(options.use_statistics)
.with_hive_partition_columns(hive_partitions)
Expand Down Expand Up @@ -196,7 +199,10 @@ impl ParquetSource {
let mut async_reader =
ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), metadata)
.await?
.with_row_index(file_options.row_index)
.with_row_index(file_options.row_index.map(|mut ri| {
ri.offset += self.processed_rows.load(Ordering::Relaxed) as IdxSize;
ri
}))
.with_arrow_schema_projection(
&self.first_schema,
self.projected_arrow_schema.as_deref(),
Expand Down Expand Up @@ -317,7 +323,11 @@ impl ParquetSource {
.collect::<Vec<_>>();
let init_iter = range.into_iter().map(|index| self.init_reader_async(index));

let batched_readers = if self.file_options.slice.is_some() {
let needs_exact_processed_rows_count =
self.file_options.slice.is_some() || self.file_options.row_index.is_some();

let batched_readers = if needs_exact_processed_rows_count {
// We run serially to ensure we have a correct processed_rows count.
polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
futures::stream::iter(init_iter)
.then(|x| x)
Expand Down
13 changes: 9 additions & 4 deletions crates/polars-stream/src/nodes/parquet_source/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use super::{AsyncTaskData, ParquetSourceNode};
use crate::async_executor;
use crate::async_primitives::connector::connector;
use crate::async_primitives::wait_group::{WaitGroup, WaitToken};
use crate::morsel::get_ideal_morsel_size;
use crate::nodes::{MorselSeq, TaskPriority};
use crate::utils::task_handles_ext;

Expand All @@ -29,10 +30,10 @@ impl ParquetSourceNode {
eprintln!("[ParquetSource]: Shutting down");
}

let (mut raw_morsel_receivers, morsel_stream_task_handle) =
let (raw_morsel_receivers, morsel_stream_task_handle) =
async_task_data.try_lock().unwrap().take().unwrap();

raw_morsel_receivers.clear();
drop(raw_morsel_receivers);
// Join on the producer handle to catch errors/panics.
// Safety
// * We dropped the receivers on the line above
Expand Down Expand Up @@ -118,7 +119,11 @@ impl ParquetSourceNode {
let row_group_decoder = self.init_row_group_decoder();
let row_group_decoder = Arc::new(row_group_decoder);

let ideal_morsel_size = self.config.ideal_morsel_size;
let ideal_morsel_size = get_ideal_morsel_size();

if verbose {
eprintln!("[ParquetSource]: ideal_morsel_size: {}", ideal_morsel_size);
}

// Distributes morsels across pipelines. This does not perform any CPU or I/O bound work -
// it is purely a dispatch loop.
Expand Down Expand Up @@ -276,7 +281,7 @@ impl ParquetSourceNode {
.unwrap_or(0);
let include_file_paths = self.file_options.include_file_paths.clone();
let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap();
let row_index = self.file_options.row_index.clone();
let row_index = self.row_index.clone();
let physical_predicate = self.physical_predicate.clone();
let min_values_per_thread = self.config.min_values_per_thread;

Expand Down
30 changes: 25 additions & 5 deletions crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_io::prelude::_internal::ensure_matching_dtypes_if_found;
use polars_io::utils::byte_source::{DynByteSource, MemSliceByteSource};
use polars_io::utils::slice::SplitSlicePosition;
use polars_utils::mmap::MemSlice;
use polars_utils::IdxSize;

use super::metadata_utils::{ensure_schema_has_projected_fields, read_parquet_metadata_bytes};
use super::ParquetSourceNode;
Expand Down Expand Up @@ -289,23 +290,42 @@ impl ParquetSourceNode {
.map(process_metadata_bytes)
.buffered(metadata_decode_ahead_size);

let row_index = self.row_index.clone();

// Note:
// * We want to wait until the first morsel is requested before starting this
let init_negative_slice_and_metadata = async move {
let mut processed_metadata_rev = vec![];
let mut cum_rows = 0;
let mut row_index_adjust = 0;

while let Some(v) = metadata_stream.next().await {
let v = v?;
let (_, _, metadata) = &v;
cum_rows += metadata.num_rows;
processed_metadata_rev.push(v);
let n_rows = metadata.num_rows;

if cum_rows >= slice_start_as_n_from_end {
break;
if cum_rows < slice_start_as_n_from_end {
processed_metadata_rev.push(v);
cum_rows += n_rows;

if cum_rows >= slice_start_as_n_from_end && row_index.is_none() {
break;
}
} else {
// If we didn't already break it means a row_index was requested, so we need
// to count the number of rows in the skipped files and adjust the offset
// accordingly.
row_index_adjust += n_rows;
}
}

row_index.as_deref().map(|(_, offset)| {
offset.fetch_add(
row_index_adjust as IdxSize,
std::sync::atomic::Ordering::Relaxed,
)
});

let (start, len) = if slice_start_as_n_from_end > cum_rows {
// We need to trim the slice, e.g. SLICE[offset: -100, len: 75] on a file of 50
// rows should only give the first 25 rows.
Expand All @@ -316,7 +336,7 @@ impl ParquetSourceNode {
};

if len == 0 {
processed_metadata_rev.clear();
processed_metadata_rev = vec![];
}

normalized_slice_oneshot_tx
Expand Down
21 changes: 15 additions & 6 deletions crates/polars-stream/src/nodes/parquet_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ use polars_io::utils::byte_source::DynByteSourceBuilder;
use polars_plan::plans::hive::HivePartitions;
use polars_plan::plans::{FileInfo, ScanSources};
use polars_plan::prelude::FileScanOptions;
use polars_utils::index::AtomicIdxSize;
use polars_utils::pl_str::PlSmallStr;

use super::compute_node_prelude::*;
use super::{MorselSeq, TaskPriority};
use crate::async_primitives::wait_group::WaitToken;
use crate::morsel::{get_ideal_morsel_size, SourceToken};
use crate::morsel::SourceToken;
use crate::utils::task_handles_ext;

mod init;
Expand Down Expand Up @@ -51,6 +53,11 @@ pub struct ParquetSourceNode {
projected_arrow_schema: Option<Arc<ArrowSchema>>,
byte_source_builder: DynByteSourceBuilder,
memory_prefetch_func: fn(&[u8]) -> (),
/// The offset is an AtomicIdxSize, as in the negative slice case, the row
/// offset becomes relative to the starting point in the list of files,
/// so the row index offset needs to be updated by the initializer to
/// reflect this (https://github.com/pola-rs/polars/issues/19607).
row_index: Option<Arc<(PlSmallStr, AtomicIdxSize)>>,
// This permit blocks execution until the first morsel is requested.
morsel_stream_starter: Option<tokio::sync::oneshot::Sender<()>>,
// This is behind a Mutex so that we can call `shutdown()` asynchronously.
Expand All @@ -70,7 +77,6 @@ struct Config {
/// Minimum number of values for a parallel spawned task to process to amortize
/// parallelism overhead.
min_values_per_thread: usize,
ideal_morsel_size: usize,
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -82,7 +88,7 @@ impl ParquetSourceNode {
predicate: Option<Arc<dyn PhysicalExpr>>,
options: ParquetOptions,
cloud_options: Option<CloudOptions>,
file_options: FileScanOptions,
mut file_options: FileScanOptions,
first_metadata: Option<Arc<FileMetadata>>,
) -> Self {
let verbose = config::verbose();
Expand All @@ -94,6 +100,11 @@ impl ParquetSourceNode {
};
let memory_prefetch_func = get_memory_prefetch_func(verbose);

let row_index = file_options
.row_index
.take()
.map(|ri| Arc::new((ri.name, AtomicIdxSize::new(ri.offset))));

Self {
scan_sources,
file_info,
Expand All @@ -111,14 +122,14 @@ impl ParquetSourceNode {
metadata_decode_ahead_size: 0,
row_group_prefetch_size: 0,
min_values_per_thread: 0,
ideal_morsel_size: 0,
},
verbose,
physical_predicate: None,
schema: None,
projected_arrow_schema: None,
byte_source_builder,
memory_prefetch_func,
row_index,

morsel_stream_starter: None,
async_task_data: Arc::new(tokio::sync::Mutex::new(None)),
Expand All @@ -144,15 +155,13 @@ impl ComputeNode for ParquetSourceNode {
let min_values_per_thread = std::env::var("POLARS_MIN_VALUES_PER_THREAD")
.map(|x| x.parse::<usize>().expect("integer").max(1))
.unwrap_or(16_777_216);
let ideal_morsel_size = get_ideal_morsel_size();

Config {
num_pipelines,
metadata_prefetch_size,
metadata_decode_ahead_size,
row_group_prefetch_size,
min_values_per_thread,
ideal_morsel_size,
}
};

Expand Down
Loading