Skip to content

Commit

Permalink
feat: Add option to include file path for Parquet, IPC, CSV scans (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Jul 12, 2024
1 parent f492060 commit b82beb2
Show file tree
Hide file tree
Showing 25 changed files with 418 additions and 78 deletions.
17 changes: 17 additions & 0 deletions crates/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub struct IpcReader<R: MmapBytesReader> {
pub(super) projection: Option<Vec<usize>>,
pub(crate) columns: Option<Vec<String>>,
hive_partition_columns: Option<Vec<Series>>,
include_file_path: Option<(Arc<str>, Arc<str>)>,
pub(super) row_index: Option<RowIndex>,
// Stores the as key semaphore to make sure we don't write to the memory mapped file.
pub(super) memory_map: Option<PathBuf>,
Expand Down Expand Up @@ -133,6 +134,14 @@ impl<R: MmapBytesReader> IpcReader<R> {
self
}

pub fn with_include_file_path(
mut self,
include_file_path: Option<(Arc<str>, Arc<str>)>,
) -> Self {
self.include_file_path = include_file_path;
self
}

/// Add a row index column.
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
self.row_index = row_index;
Expand Down Expand Up @@ -208,6 +217,7 @@ impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
n_rows: None,
columns: None,
hive_partition_columns: None,
include_file_path: None,
projection: None,
row_index: None,
memory_map: None,
Expand All @@ -230,6 +240,7 @@ impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
let reader_schema = reader_schema.as_ref();

let hive_partition_columns = self.hive_partition_columns.take();
let include_file_path = self.include_file_path.take();

// In case only hive columns are projected, the df would be empty, but we need the row count
// of the file in order to project the correct number of rows for the hive columns.
Expand Down Expand Up @@ -287,6 +298,12 @@ impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
);
};

if let Some((col, value)) = include_file_path {
unsafe {
df.with_column_unchecked(StringChunked::full(&col, &value, row_count).into_series())
};
}

Ok(df)
}
}
67 changes: 60 additions & 7 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ pub struct BatchedParquetReader {
chunk_size: usize,
use_statistics: bool,
hive_partition_columns: Option<Arc<[Series]>>,
include_file_path: Option<StringChunked>,
/// Has returned at least one materialized frame.
has_returned: bool,
}
Expand All @@ -583,6 +584,7 @@ impl BatchedParquetReader {
chunk_size: usize,
use_statistics: bool,
hive_partition_columns: Option<Vec<Series>>,
include_file_path: Option<(Arc<str>, Arc<str>)>,
mut parallel: ParallelStrategy,
) -> PolarsResult<Self> {
let n_row_groups = metadata.row_groups.len();
Expand Down Expand Up @@ -621,6 +623,8 @@ impl BatchedParquetReader {
chunk_size,
use_statistics,
hive_partition_columns: hive_partition_columns.map(Arc::from),
include_file_path: include_file_path
.map(|(col, path)| StringChunked::full(&col, &path, 1)),
has_returned: false,
})
}
Expand Down Expand Up @@ -669,7 +673,7 @@ impl BatchedParquetReader {
.fetch_row_groups(row_group_start..row_group_end)
.await?;

let dfs = match store {
let mut dfs = match store {
ColumnStore::Local(_) => rg_to_dfs(
&store,
&mut self.rows_read,
Expand Down Expand Up @@ -745,17 +749,58 @@ impl BatchedParquetReader {
},
}?;

if let Some(ca) = self.include_file_path.as_mut() {
let mut max_len = 0;

if self.projection.is_empty() {
max_len = self.metadata.num_rows;
} else {
for df in &dfs {
max_len = std::cmp::max(max_len, df.height());
}
}

// Re-use the same ChunkedArray
if ca.len() < max_len {
*ca = ca.new_from_index(max_len, 0);
}

for df in &mut dfs {
unsafe {
df.with_column_unchecked(
ca.slice(
0,
if !self.projection.is_empty() {
df.height()
} else {
self.metadata.num_rows
},
)
.into_series(),
)
};
}
}

self.row_group_offset += n;

// case where there is no data in the file
// the streaming engine needs at least a single chunk
if self.rows_read == 0 && dfs.is_empty() {
return Ok(Some(vec![materialize_empty_df(
Some(&self.projection),
self.schema.as_ref(),
let mut df = materialize_empty_df(
Some(self.projection.as_ref()),
&self.schema,
self.hive_partition_columns.as_deref(),
self.row_index.as_ref(),
)]));
);

if let Some(ca) = &self.include_file_path {
unsafe {
df.with_column_unchecked(ca.clear().into_series());
}
};

return Ok(Some(vec![df]));
}

// TODO! this is slower than it needs to be
Expand All @@ -780,12 +825,20 @@ impl BatchedParquetReader {
if self.chunks_fifo.is_empty() {
if skipped_all_rgs {
self.has_returned = true;
Ok(Some(vec![materialize_empty_df(
let mut df = materialize_empty_df(
Some(self.projection.as_ref()),
&self.schema,
self.hive_partition_columns.as_deref(),
self.row_index.as_ref(),
)]))
);

if let Some(ca) = &self.include_file_path {
unsafe {
df.with_column_unchecked(ca.clear().into_series());
}
};

Ok(Some(vec![df]))
} else {
Ok(None)
}
Expand Down
52 changes: 44 additions & 8 deletions crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct ParquetReader<R: Read + Seek> {
metadata: Option<FileMetaDataRef>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
hive_partition_columns: Option<Vec<Series>>,
include_file_path: Option<(Arc<str>, Arc<str>)>,
use_statistics: bool,
}

Expand Down Expand Up @@ -132,6 +133,14 @@ impl<R: MmapBytesReader> ParquetReader<R> {
self
}

pub fn with_include_file_path(
mut self,
include_file_path: Option<(Arc<str>, Arc<str>)>,
) -> Self {
self.include_file_path = include_file_path;
self
}

pub fn get_metadata(&mut self) -> PolarsResult<&FileMetaDataRef> {
if self.metadata.is_none() {
self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
Expand Down Expand Up @@ -162,6 +171,7 @@ impl<R: MmapBytesReader + 'static> ParquetReader<R> {
chunk_size,
self.use_statistics,
self.hive_partition_columns,
self.include_file_path,
self.parallel,
)
}
Expand All @@ -184,6 +194,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
schema: None,
use_statistics: true,
hive_partition_columns: None,
include_file_path: None,
}
}

Expand All @@ -195,12 +206,13 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
fn finish(mut self) -> PolarsResult<DataFrame> {
let schema = self.schema()?;
let metadata = self.get_metadata()?.clone();
let n_rows = metadata.num_rows;

if let Some(cols) = &self.columns {
self.projection = Some(columns_to_projection(cols, schema.as_ref())?);
}

read_parquet(
let mut df = read_parquet(
self.reader,
self.n_rows.unwrap_or(usize::MAX),
self.projection.as_deref(),
Expand All @@ -211,13 +223,26 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
self.row_index,
self.use_statistics,
self.hive_partition_columns.as_deref(),
)
.map(|mut df| {
if self.rechunk {
df.as_single_chunk_par();
}
df
})
)?;

if self.rechunk {
df.as_single_chunk_par();
};

if let Some((col, value)) = &self.include_file_path {
unsafe {
df.with_column_unchecked(
StringChunked::full(
col,
value,
if df.width() > 0 { df.height() } else { n_rows },
)
.into_series(),
)
};
}

Ok(df)
}
}

Expand All @@ -233,6 +258,7 @@ pub struct ParquetAsyncReader {
row_index: Option<RowIndex>,
use_statistics: bool,
hive_partition_columns: Option<Vec<Series>>,
include_file_path: Option<(Arc<str>, Arc<str>)>,
schema: Option<ArrowSchemaRef>,
parallel: ParallelStrategy,
}
Expand All @@ -253,6 +279,7 @@ impl ParquetAsyncReader {
predicate: None,
use_statistics: true,
hive_partition_columns: None,
include_file_path: None,
schema: None,
parallel: Default::default(),
})
Expand Down Expand Up @@ -330,6 +357,14 @@ impl ParquetAsyncReader {
self
}

pub fn with_include_file_path(
mut self,
include_file_path: Option<(Arc<str>, Arc<str>)>,
) -> Self {
self.include_file_path = include_file_path;
self
}

pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {
self.parallel = parallel;
self
Expand Down Expand Up @@ -362,6 +397,7 @@ impl ParquetAsyncReader {
chunk_size,
self.use_statistics,
self.hive_partition_columns,
self.include_file_path,
self.parallel,
)
}
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct LazyCsvReader {
cache: bool,
read_options: CsvReadOptions,
cloud_options: Option<CloudOptions>,
include_file_paths: Option<Arc<str>>,
}

#[cfg(feature = "csv")]
Expand All @@ -39,6 +40,7 @@ impl LazyCsvReader {
cache: true,
read_options: Default::default(),
cloud_options: Default::default(),
include_file_paths: None,
}
}

Expand Down Expand Up @@ -258,6 +260,11 @@ impl LazyCsvReader {

Ok(self.with_schema(Some(Arc::new(schema))))
}

pub fn with_include_file_paths(mut self, include_file_paths: Option<Arc<str>>) -> Self {
self.include_file_paths = include_file_paths;
self
}
}

impl LazyFileListReader for LazyCsvReader {
Expand All @@ -269,6 +276,7 @@ impl LazyFileListReader for LazyCsvReader {
self.cache,
self.cloud_options,
self.glob,
self.include_file_paths,
)?
.build()
.into();
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct ScanArgsIpc {
pub memory_map: bool,
pub cloud_options: Option<CloudOptions>,
pub hive_options: HiveOptions,
pub include_file_paths: Option<Arc<str>>,
}

impl Default for ScanArgsIpc {
Expand All @@ -28,6 +29,7 @@ impl Default for ScanArgsIpc {
memory_map: true,
cloud_options: Default::default(),
hive_options: Default::default(),
include_file_paths: None,
}
}
}
Expand Down Expand Up @@ -65,6 +67,7 @@ impl LazyFileListReader for LazyIpcReader {
args.rechunk,
args.cloud_options,
args.hive_options,
args.include_file_paths,
)?
.build()
.into();
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl LazyFileListReader for LazyJsonLineReader {
file_counter: 0,
hive_options: Default::default(),
glob: true,
include_file_paths: None,
};

let options = NDJsonReadOptions {
Expand Down Expand Up @@ -140,6 +141,7 @@ impl LazyFileListReader for LazyJsonLineReader {
file_counter: 0,
hive_options: Default::default(),
glob: false,
include_file_paths: None,
};

let options = NDJsonReadOptions {
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct ScanArgsParquet {
pub cache: bool,
/// Expand path given via globbing rules.
pub glob: bool,
pub include_file_paths: Option<Arc<str>>,
}

impl Default for ScanArgsParquet {
Expand All @@ -35,6 +36,7 @@ impl Default for ScanArgsParquet {
low_memory: false,
cache: true,
glob: true,
include_file_paths: None,
}
}
}
Expand Down Expand Up @@ -71,6 +73,7 @@ impl LazyFileListReader for LazyParquetReader {
self.args.use_statistics,
self.args.hive_options,
self.args.glob,
self.args.include_file_paths,
)?
.build()
.into();
Expand Down
Loading

0 comments on commit b82beb2

Please sign in to comment.