Skip to content

Commit

Permalink
[PERF] Fix excessive parquet metadata reading (#2694)
Browse files Browse the repository at this point in the history
When reading files from AWS, the logical to physical plan translator
previously fetched the metadata for the files sequentially, which could
be very slow if there are many files. (This bug was introduced in
#2358.)

This PR makes it so that we only cache the metadata upon splitting, and
when we do so we only cache the row groups that are actually relevant to
each scan task. This avoids serializing the entire metadata for each Ray
runner, which should improve performance.

Benchmark results:
<img width="516" alt="image"
src="https://github.com/user-attachments/assets/ba013482-89be-413f-89da-8f0e8fcf4cd7">
  • Loading branch information
Vince7778 authored Aug 22, 2024
1 parent fc167dd commit 4bbf9ec
Show file tree
Hide file tree
Showing 20 changed files with 109 additions and 82 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,6 @@ class ScanOperatorHandle:
storage_config: StorageConfig,
infer_schema: bool,
schema: PySchema | None = None,
is_ray_runner: bool = False,
) -> ScanOperatorHandle: ...
@staticmethod
def from_python_scan_operator(operator: ScanOperator) -> ScanOperatorHandle: ...
Expand Down
1 change: 0 additions & 1 deletion daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,5 @@ def read_csv(
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
is_ray_runner=context.get_context().is_ray_runner,
)
return DataFrame(builder)
1 change: 0 additions & 1 deletion daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,5 @@ def read_json(
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
is_ray_runner=context.get_context().is_ray_runner,
)
return DataFrame(builder)
1 change: 0 additions & 1 deletion daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,5 @@ def read_parquet(
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
is_ray_runner=is_ray_runner,
)
return DataFrame(builder)
2 changes: 0 additions & 2 deletions daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def get_tabular_files_scan(
schema: dict[str, DataType] | None,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
is_ray_runner: bool,
) -> LogicalPlanBuilder:
"""Returns a TabularFilesScan LogicalPlan for a given glob filepath."""
# Glob the path using the Runner
Expand All @@ -42,7 +41,6 @@ def get_tabular_files_scan(
storage_config,
infer_schema=infer_schema,
schema=_get_schema_from_dict(schema)._schema if schema is not None else None,
is_ray_runner=is_ray_runner,
)

builder = LogicalPlanBuilder.from_tabular_scan(
Expand Down
8 changes: 4 additions & 4 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1196,7 +1196,7 @@ pub(crate) fn read_parquet_into_micropartition(

let any_stats_avail = metadata
.iter()
.flat_map(|m| m.row_groups.iter())
.flat_map(|m| m.row_groups.values())
.flat_map(|rg| rg.columns().iter())
.any(|col| col.statistics().is_some());
let stats = if any_stats_avail {
Expand All @@ -1205,7 +1205,7 @@ pub(crate) fn read_parquet_into_micropartition(
.zip(schemas.iter())
.flat_map(|(fm, schema)| {
fm.row_groups
.iter()
.values()
.map(|rgm| daft_parquet::row_group_metadata_to_table_stats(rgm, schema))
})
.collect::<DaftResult<Vec<TableStatistics>>>()?;
Expand Down Expand Up @@ -1237,7 +1237,7 @@ pub(crate) fn read_parquet_into_micropartition(
.map(|(fm, rg)| match rg {
Some(rg) => rg
.iter()
.map(|rg_idx| fm.row_groups.get(*rg_idx as usize).unwrap().num_rows())
.map(|rg_idx| fm.row_groups.get(&(*rg_idx as usize)).unwrap().num_rows())
.sum::<usize>(),
None => fm.num_rows,
})
Expand All @@ -1251,7 +1251,7 @@ pub(crate) fn read_parquet_into_micropartition(
let size_bytes = metadata
.iter()
.map(|m| -> u64 {
std::iter::Sum::sum(m.row_groups.iter().map(|m| m.total_byte_size() as u64))
std::iter::Sum::sum(m.row_groups.values().map(|m| m.total_byte_size() as u64))
})
.sum();

Expand Down
3 changes: 3 additions & 0 deletions src/daft-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ tokio = {workspace = true}
tokio-stream = {workspace = true}
tokio-util = {workspace = true}

[dev-dependencies]
bincode = {workspace = true}

[features]
python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python", "daft-stats/python", "daft-dsl/python"]

Expand Down
18 changes: 9 additions & 9 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub(crate) fn build_row_ranges(
if rows_to_add <= 0 {
break;
}
let rg = metadata.row_groups.get(i).unwrap();
let rg = metadata.row_groups.get(&i).unwrap();
if let Some(ref pred) = predicate {
let stats = statistics::row_group_metadata_to_table_stats(rg, schema)
.with_context(|_| UnableToConvertRowGroupMetadataToStatsSnafu {
Expand Down Expand Up @@ -158,7 +158,7 @@ pub(crate) fn build_row_ranges(
} else {
let mut rows_to_add = limit.unwrap_or(metadata.num_rows as i64);

for (i, rg) in metadata.row_groups.iter().enumerate() {
for (i, rg) in metadata.row_groups.iter() {
if (curr_row_index + rg.num_rows()) < row_start_offset {
curr_row_index += rg.num_rows();
continue;
Expand All @@ -179,7 +179,7 @@ pub(crate) fn build_row_ranges(
}
}
let range_to_add = RowGroupRange {
row_group_index: i,
row_group_index: *i,
start: row_start_offset.saturating_sub(curr_row_index),
num_rows: rg.num_rows().min(rows_to_add as usize),
};
Expand Down Expand Up @@ -349,7 +349,7 @@ impl ParquetFileReader {
let rg = self
.metadata
.row_groups
.get(row_group_range.row_group_index)
.get(&row_group_range.row_group_index)
.unwrap();

let columns = rg.columns();
Expand Down Expand Up @@ -441,7 +441,7 @@ impl ParquetFileReader {
tokio::task::spawn(async move {
let rg = metadata
.row_groups
.get(row_range.row_group_index)
.get(&row_range.row_group_index)
.expect("Row Group index should be in bounds");
let num_rows =
rg.num_rows().min(row_range.start + row_range.num_rows);
Expand Down Expand Up @@ -565,7 +565,7 @@ impl ParquetFileReader {
let owned_uri = self.uri.clone();
let rg = metadata
.row_groups
.get(row_range.row_group_index)
.get(&row_range.row_group_index)
.expect("Row Group index should be in bounds");
let num_rows = rg.num_rows().min(row_range.start + row_range.num_rows);
let chunk_size = self.chunk_size.unwrap_or(Self::DEFAULT_CHUNK_SIZE);
Expand Down Expand Up @@ -609,7 +609,7 @@ impl ParquetFileReader {
{
let col = metadata
.row_groups
.get(row_range.row_group_index)
.get(&row_range.row_group_index)
.expect("Row Group index should be in bounds")
.columns()
.get(col_idx)
Expand Down Expand Up @@ -751,7 +751,7 @@ impl ParquetFileReader {
let owned_uri = self.uri.clone();
let rg = metadata
.row_groups
.get(row_range.row_group_index)
.get(&row_range.row_group_index)
.expect("Row Group index should be in bounds");
let num_rows = rg.num_rows().min(row_range.start + row_range.num_rows);
let chunk_size = self.chunk_size.unwrap_or(128 * 1024);
Expand Down Expand Up @@ -793,7 +793,7 @@ impl ParquetFileReader {
{
let col = metadata
.row_groups
.get(row_range.row_group_index)
.get(&row_range.row_group_index)
.expect("Row Group index should be in bounds")
.columns()
.get(col_idx)
Expand Down
10 changes: 6 additions & 4 deletions src/daft-parquet/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use daft_dsl::common_treenode::{Transformed, TreeNode, TreeNodeRecursion};
use daft_io::{IOClient, IOStatsRef};

pub use parquet2::metadata::{FileMetaData, RowGroupMetaData};
use parquet2::read::deserialize_metadata;
use parquet2::schema::types::ParquetType;
use parquet2::{metadata::RowGroupList, read::deserialize_metadata};
use snafu::ResultExt;

use crate::{Error, JoinSnafu, UnableToParseMetadataSnafu};
Expand Down Expand Up @@ -184,9 +184,9 @@ fn apply_field_ids_to_parquet_file_metadata(
})
.collect::<BTreeMap<_, _>>();

let new_row_groups = file_metadata
let new_row_groups_list = file_metadata
.row_groups
.into_iter()
.into_values()
.map(|rg| {
let new_columns = rg
.columns()
Expand All @@ -213,7 +213,9 @@ fn apply_field_ids_to_parquet_file_metadata(
new_total_uncompressed_size,
)
})
.collect();
.collect::<Vec<RowGroupMetaData>>();

let new_row_groups = RowGroupList::from_iter(new_row_groups_list.into_iter().enumerate());

Ok(FileMetaData {
row_groups: new_row_groups,
Expand Down
32 changes: 30 additions & 2 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ async fn read_parquet_single(

let rows_per_row_groups = metadata
.row_groups
.iter()
.values()
.map(|m| m.num_rows())
.collect::<Vec<_>>();

Expand Down Expand Up @@ -540,7 +540,7 @@ async fn read_parquet_single_into_arrow(

let rows_per_row_groups = metadata
.row_groups
.iter()
.values()
.map(|m| m.num_rows())
.collect::<Vec<_>>();

Expand Down Expand Up @@ -1030,11 +1030,22 @@ mod tests {

use daft_io::{IOClient, IOConfig};
use futures::StreamExt;
use parquet2::metadata::FileMetaData;

use super::read_parquet;
use super::read_parquet_metadata;
use super::stream_parquet;

const PARQUET_FILE: &str = "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet";
const PARQUET_FILE_LOCAL: &str = "tests/assets/parquet-data/mvp.parquet";

fn get_local_parquet_path() -> String {
let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
d.push("../../"); // CARGO_MANIFEST_DIR is at src/daft-parquet
d.push(PARQUET_FILE_LOCAL);
d.to_str().unwrap().to_string()
}

#[test]
fn test_parquet_read_from_s3() -> DaftResult<()> {
let file = PARQUET_FILE;
Expand Down Expand Up @@ -1097,4 +1108,21 @@ mod tests {
Ok(())
})
}

#[test]
fn test_file_metadata_serialize_roundtrip() -> DaftResult<()> {
let file = get_local_parquet_path();

let io_config = IOConfig::default();
let io_client = Arc::new(IOClient::new(io_config.into())?);
let runtime_handle = daft_io::get_runtime(true)?;

runtime_handle.block_on(async move {
let metadata = read_parquet_metadata(&file, io_client, None, None).await?;
let serialized = bincode::serialize(&metadata).unwrap();
let deserialized = bincode::deserialize::<FileMetaData>(&serialized).unwrap();
assert_eq!(metadata, deserialized);
Ok(())
})
}
}
4 changes: 2 additions & 2 deletions src/daft-parquet/src/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub(crate) fn local_parquet_read_into_column_iters(

// Read all the required row groups into memory sequentially
let column_iters_per_rg = row_ranges.clone().into_iter().map(move |rg_range| {
let rg_metadata = all_row_groups.get(rg_range.row_group_index).unwrap();
let rg_metadata = all_row_groups.get(&rg_range.row_group_index).unwrap();

// This operation is IO-bounded O(C) where C is the number of columns in the row group.
// It reads all the columns to memory from the row group associated to the requested fields,
Expand Down Expand Up @@ -358,7 +358,7 @@ pub(crate) fn local_parquet_read_into_arrow(
.iter()
.enumerate()
.map(|(req_idx, rg_range)| {
let rg = metadata.row_groups.get(rg_range.row_group_index).unwrap();
let rg = metadata.row_groups.get(&rg_range.row_group_index).unwrap();
let single_rg_column_iter = read::read_columns_many(
&mut reader,
rg,
Expand Down
38 changes: 2 additions & 36 deletions src/daft-scan/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{sync::Arc, vec};
use common_error::{DaftError, DaftResult};
use daft_core::schema::SchemaRef;
use daft_csv::CsvParseOptions;
use daft_io::{get_runtime, parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef};
use daft_io::{parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef};
use daft_parquet::read::ParquetSchemaInferenceOptions;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use snafu::Snafu;
Expand All @@ -19,7 +19,6 @@ pub struct GlobScanOperator {
file_format_config: Arc<FileFormatConfig>,
schema: SchemaRef,
storage_config: Arc<StorageConfig>,
is_ray_runner: bool,
}

/// Wrapper struct that implements a sync Iterator for a BoxStream
Expand Down Expand Up @@ -130,7 +129,6 @@ impl GlobScanOperator {
storage_config: Arc<StorageConfig>,
infer_schema: bool,
schema: Option<SchemaRef>,
is_ray_runner: bool,
) -> DaftResult<Self> {
let first_glob_path = match glob_paths.first() {
None => Err(DaftError::ValueError(
Expand Down Expand Up @@ -244,7 +242,6 @@ impl GlobScanOperator {
file_format_config,
schema,
storage_config,
is_ray_runner,
})
}
}
Expand Down Expand Up @@ -299,7 +296,6 @@ impl ScanOperator for GlobScanOperator {
let file_format_config = self.file_format_config.clone();
let schema = self.schema.clone();
let storage_config = self.storage_config.clone();
let is_ray_runner = self.is_ray_runner;

let row_groups = if let FileFormatConfig::Parquet(ParquetSourceConfig {
row_groups: Some(row_groups),
Expand All @@ -319,36 +315,6 @@ impl ScanOperator for GlobScanOperator {
..
} = f?;

let path_clone = path.clone();
let io_client_clone = io_client.clone();
let field_id_mapping = match file_format_config.as_ref() {
FileFormatConfig::Parquet(ParquetSourceConfig {
field_id_mapping, ..
}) => Some(field_id_mapping.clone()),
_ => None,
};

// We skip reading parquet metadata if we are running in Ray
// because the metadata can be quite large
let parquet_metadata = if !is_ray_runner {
if let Some(field_id_mapping) = field_id_mapping {
get_runtime(true).unwrap().block_on(async {
daft_parquet::read::read_parquet_metadata(
&path_clone,
io_client_clone,
Some(io_stats.clone()),
field_id_mapping.clone(),
)
.await
.ok()
.map(Arc::new)
})
} else {
None
}
} else {
None
};
let row_group = row_groups
.as_ref()
.and_then(|rgs| rgs.get(idx).cloned())
Expand All @@ -363,7 +329,7 @@ impl ScanOperator for GlobScanOperator {
metadata: None,
partition_spec: None,
statistics: None,
parquet_metadata,
parquet_metadata: None,
}],
file_format_config.clone(),
schema.clone(),
Expand Down
Loading

0 comments on commit 4bbf9ec

Please sign in to comment.