Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Fix parquet file metadata is dropped after first DSL->IR conversion #18789

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ impl<R: MmapBytesReader> ParquetReader<R> {
self
}

pub fn set_metadata(&mut self, metadata: FileMetadataRef) {
self.metadata = Some(metadata);
}

pub fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
if self.metadata.is_none() {
self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl LazyFileListReader for LazyCsvReader {
/// Get the final [LazyFrame].
fn finish(self) -> PolarsResult<LazyFrame> {
let mut lf: LazyFrame = DslBuilder::scan_csv(
self.sources.to_dsl(false),
self.sources,
self.read_options,
self.cache,
self.cloud_options,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl LazyFileListReader for LazyIpcReader {
let options = IpcScanOptions {};

let mut lf: LazyFrame = DslBuilder::scan_ipc(
self.sources.to_dsl(false),
self.sources,
options,
args.n_rows,
args.cache,
Expand Down
7 changes: 4 additions & 3 deletions crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::Arc;

use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
Expand Down Expand Up @@ -155,10 +155,11 @@ impl LazyFileListReader for LazyJsonLineReader {
};

Ok(LazyFrame::from(DslPlan::Scan {
sources: Arc::new(Mutex::new(self.sources.to_dsl(false))),
file_info: Arc::new(RwLock::new(None)),
sources: self.sources,
file_info: None,
file_options,
scan_type,
cached_ir: Default::default(),
}))
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl LazyFileListReader for LazyParquetReader {
let row_index = self.args.row_index;

let mut lf: LazyFrame = DslBuilder::scan_parquet(
self.sources.to_dsl(false),
self.sources,
self.args.n_rows,
self.args.cache,
self.args.parallel,
Expand Down
24 changes: 21 additions & 3 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,16 @@ impl ParquetExec {
.into_par_iter()
.map(|&i| {
let memslice = self.sources.at(i).to_memslice()?;
ParquetReader::new(std::io::Cursor::new(memslice)).num_rows()

let mut reader = ParquetReader::new(std::io::Cursor::new(memslice));

if i == 0 {
if let Some(md) = self.metadata.clone() {
reader.set_metadata(md)
}
}

reader.num_rows()
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand Down Expand Up @@ -151,7 +160,15 @@ impl ParquetExec {

let memslice = source.to_memslice()?;

let mut reader = ParquetReader::new(std::io::Cursor::new(memslice))
let mut reader = ParquetReader::new(std::io::Cursor::new(memslice));

if i == 0 {
if let Some(md) = self.metadata.clone() {
reader.set_metadata(md)
}
}

let mut reader = reader
.read_parallel(parallel)
.set_low_memory(self.options.low_memory)
.use_statistics(self.options.use_statistics)
Expand Down Expand Up @@ -266,14 +283,15 @@ impl ParquetExec {
let mut iter = stream::iter((0..paths.len()).rev().map(|i| {
let paths = paths.clone();
let cloud_options = cloud_options.clone();
let first_metadata = first_metadata.clone();

pl_async::get_runtime().spawn(async move {
PolarsResult::Ok((
i,
ParquetAsyncReader::from_uri(
paths[i].to_str().unwrap(),
cloud_options.as_ref().as_ref(),
None,
first_metadata.filter(|_| i == 0),
)
.await?
.num_rows()
Expand Down
21 changes: 14 additions & 7 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct ParquetSource {
file_options: FileScanOptions,
#[allow(dead_code)]
cloud_options: Option<CloudOptions>,
metadata: Option<FileMetadataRef>,
first_metadata: Option<FileMetadataRef>,
file_info: FileInfo,
hive_parts: Option<Arc<Vec<HivePartitions>>>,
verbose: bool,
Expand All @@ -61,7 +61,6 @@ impl ParquetSource {
}

fn init_next_reader_sync(&mut self) -> PolarsResult<()> {
self.metadata = None;
self.init_reader_sync()
}

Expand Down Expand Up @@ -133,7 +132,16 @@ impl ParquetSource {

let batched_reader = {
let file = std::fs::File::open(path).unwrap();
let mut reader = ParquetReader::new(file)

let mut reader = ParquetReader::new(file);

if index == 0 {
if let Some(md) = self.first_metadata.clone() {
reader.set_metadata(md);
}
}

let mut reader = reader
.with_projection(projection)
.check_schema(
self.file_info
Expand Down Expand Up @@ -191,7 +199,7 @@ impl ParquetSource {
async fn init_reader_async(&self, index: usize) -> PolarsResult<BatchedParquetReader> {
use std::sync::atomic::Ordering;

let metadata = self.metadata.clone();
let metadata = self.first_metadata.clone().filter(|_| index == 0);
let predicate = self.predicate.clone();
let cloud_options = self.cloud_options.clone();
let (path, options, file_options, projection, chunk_size, hive_partitions) =
Expand Down Expand Up @@ -252,7 +260,7 @@ impl ParquetSource {
sources: ScanSources,
options: ParquetOptions,
cloud_options: Option<CloudOptions>,
metadata: Option<FileMetadataRef>,
first_metadata: Option<FileMetadataRef>,
file_options: FileScanOptions,
file_info: FileInfo,
hive_parts: Option<Arc<Vec<HivePartitions>>>,
Expand Down Expand Up @@ -282,7 +290,7 @@ impl ParquetSource {
iter,
sources,
cloud_options,
metadata,
first_metadata,
file_info,
hive_parts,
verbose,
Expand All @@ -293,7 +301,6 @@ impl ParquetSource {
// Already start downloading when we deal with cloud urls.
if run_async {
source.init_next_reader()?;
source.metadata = None;
}
Ok(source)
}
Expand Down
3 changes: 1 addition & 2 deletions crates/polars-plan/src/client/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ pub(super) fn assert_cloud_eligible(dsl: &DslPlan) -> PolarsResult<()> {
DslPlan::Scan {
sources, scan_type, ..
} => {
let sources_lock = sources.lock().unwrap();
match &sources_lock.sources {
match sources {
ScanSources::Paths(paths) => {
if paths.iter().any(|p| !is_cloud_url(p)) {
return ineligible_error("contains scan of local file system");
Expand Down
31 changes: 16 additions & 15 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::{Arc, Mutex, RwLock};
use std::sync::Arc;

use polars_core::prelude::*;
#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
Expand Down Expand Up @@ -57,11 +57,8 @@ impl DslBuilder {
};

Ok(DslPlan::Scan {
sources: Arc::new(Mutex::new(DslScanSources {
sources: ScanSources::Buffers(Arc::default()),
is_expanded: true,
})),
file_info: Arc::new(RwLock::new(Some(file_info))),
sources: ScanSources::Buffers(Arc::default()),
file_info: Some(file_info),
file_options,
scan_type: FileScan::Anonymous {
function,
Expand All @@ -70,14 +67,15 @@ impl DslBuilder {
skip_rows,
}),
},
cached_ir: Default::default(),
}
.into())
}

#[cfg(feature = "parquet")]
#[allow(clippy::too_many_arguments)]
pub fn scan_parquet(
sources: DslScanSources,
sources: ScanSources,
n_rows: Option<usize>,
cache: bool,
parallel: polars_io::parquet::read::ParallelStrategy,
Expand All @@ -102,8 +100,8 @@ impl DslBuilder {
include_file_paths,
};
Ok(DslPlan::Scan {
sources: Arc::new(Mutex::new(sources)),
file_info: Arc::new(RwLock::new(None)),
sources,
file_info: None,
file_options: options,
scan_type: FileScan::Parquet {
options: ParquetOptions {
Expand All @@ -114,14 +112,15 @@ impl DslBuilder {
cloud_options,
metadata: None,
},
cached_ir: Default::default(),
}
.into())
}

#[cfg(feature = "ipc")]
#[allow(clippy::too_many_arguments)]
pub fn scan_ipc(
sources: DslScanSources,
sources: ScanSources,
options: IpcScanOptions,
n_rows: Option<usize>,
cache: bool,
Expand All @@ -132,8 +131,8 @@ impl DslBuilder {
include_file_paths: Option<PlSmallStr>,
) -> PolarsResult<Self> {
Ok(DslPlan::Scan {
sources: Arc::new(Mutex::new(sources)),
file_info: Arc::new(RwLock::new(None)),
sources,
file_info: None,
file_options: FileScanOptions {
with_columns: None,
cache,
Expand All @@ -150,14 +149,15 @@ impl DslBuilder {
cloud_options,
metadata: None,
},
cached_ir: Default::default(),
}
.into())
}

#[allow(clippy::too_many_arguments)]
#[cfg(feature = "csv")]
pub fn scan_csv(
sources: DslScanSources,
sources: ScanSources,
read_options: CsvReadOptions,
cache: bool,
cloud_options: Option<CloudOptions>,
Expand All @@ -183,13 +183,14 @@ impl DslBuilder {
include_file_paths,
};
Ok(DslPlan::Scan {
sources: Arc::new(Mutex::new(sources)),
file_info: Arc::new(RwLock::new(None)),
sources,
file_info: None,
file_options: options,
scan_type: FileScan::Csv {
options: read_options,
cloud_options,
},
cached_ir: Default::default(),
}
.into())
}
Expand Down
Loading