Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF]: dont read parquet metadata multiple times #2358

Merged
merged 4 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ class ScanOperatorHandle:
storage_config: StorageConfig,
infer_schema: bool,
schema: PySchema | None = None,
is_ray_runner: bool = False,
) -> ScanOperatorHandle: ...
@staticmethod
def from_python_scan_operator(operator: ScanOperator) -> ScanOperatorHandle: ...
Expand Down
1 change: 1 addition & 0 deletions daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,6 @@ def read_csv(
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
is_ray_runner=context.get_context().is_ray_runner,
)
return DataFrame(builder)
1 change: 1 addition & 0 deletions daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,6 @@ def read_json(
schema=schema_hints,
file_format_config=file_format_config,
storage_config=storage_config,
is_ray_runner=context.get_context().is_ray_runner,
)
return DataFrame(builder)
4 changes: 3 additions & 1 deletion daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ def read_parquet(
if isinstance(path, list) and len(path) == 0:
raise ValueError("Cannot read DataFrame from from empty list of Parquet filepaths")

is_ray_runner = context.get_context().is_ray_runner
# If running on Ray, we want to limit the amount of concurrency and requests being made.
# This is because each Ray worker process receives its own pool of thread workers and connections
multithreaded_io = not context.get_context().is_ray_runner if _multithreaded_io is None else _multithreaded_io
multithreaded_io = not is_ray_runner if _multithreaded_io is None else _multithreaded_io

if isinstance(coerce_int96_timestamp_unit, str):
coerce_int96_timestamp_unit = TimeUnit.from_str(coerce_int96_timestamp_unit)
Expand All @@ -76,5 +77,6 @@ def read_parquet(
schema=schema_hints,
file_format_config=file_format_config,
storage_config=storage_config,
is_ray_runner=is_ray_runner,
)
return DataFrame(builder)
2 changes: 2 additions & 0 deletions daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def get_tabular_files_scan(
schema: dict[str, DataType] | None,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
is_ray_runner: bool,
) -> LogicalPlanBuilder:
"""Returns a TabularFilesScan LogicalPlan for a given glob filepath."""
# Glob the path using the Runner
Expand All @@ -41,6 +42,7 @@ def get_tabular_files_scan(
storage_config,
infer_schema=infer_schema,
schema=_get_schema_from_dict(schema)._schema if schema is not None else None,
is_ray_runner=is_ray_runner,
)

builder = LogicalPlanBuilder.from_tabular_scan(
Expand Down
72 changes: 58 additions & 14 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use daft_scan::storage_config::{NativeStorageConfig, StorageConfig};
use daft_scan::{ChunkSpec, DataFileSource, Pushdowns, ScanTask};
use daft_table::Table;

use parquet2::metadata::FileMetaData;
use snafu::ResultExt;

#[cfg(feature = "python")]
Expand Down Expand Up @@ -134,6 +135,11 @@ fn materialize_scan_task(
ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit));
let urls = urls.collect::<Vec<_>>();
let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice());
let metadatas = scan_task
.sources
.iter()
.map(|s| s.get_parquet_metadata().cloned())
.collect::<Option<Vec<_>>>();
daft_parquet::read::read_parquet_bulk(
urls.as_slice(),
file_column_names.as_deref(),
Expand All @@ -147,6 +153,7 @@ fn materialize_scan_task(
runtime_handle,
&inference_options,
field_id_mapping.clone(),
metadatas,
)
.context(DaftCoreComputeSnafu)?
}
Expand Down Expand Up @@ -575,6 +582,11 @@ impl MicroPartition {
.columns
.as_ref()
.map(|cols| cols.iter().map(|s| s.as_str()).collect::<Vec<&str>>());
let parquet_metadata = scan_task
.sources
.iter()
.map(|s| s.get_parquet_metadata().cloned())
.collect::<Option<Vec<_>>>();

let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice());
read_parquet_into_micropartition(
Expand All @@ -597,6 +609,7 @@ impl MicroPartition {
},
Some(schema.clone()),
field_id_mapping.clone(),
parquet_metadata,
)
.context(DaftCoreComputeSnafu)
}
Expand Down Expand Up @@ -932,6 +945,7 @@ fn _read_parquet_into_loaded_micropartition(
runtime_handle,
schema_infer_options,
field_id_mapping,
None,
)?;

// Prefer using the `catalog_provided_schema` but fall back onto inferred schema from Parquet files
Expand Down Expand Up @@ -979,6 +993,7 @@ pub(crate) fn read_parquet_into_micropartition(
schema_infer_options: &ParquetSchemaInferenceOptions,
catalog_provided_schema: Option<SchemaRef>,
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
parquet_metadata: Option<Vec<Arc<FileMetaData>>>,
) -> DaftResult<MicroPartition> {
if let Some(so) = start_offset
&& so > 0
Expand Down Expand Up @@ -1017,17 +1032,42 @@ pub(crate) fn read_parquet_into_micropartition(
let meta_io_client = io_client.clone();
let meta_io_stats = io_stats.clone();
let meta_field_id_mapping = field_id_mapping.clone();
let metadata = runtime_handle.block_on(async move {
read_parquet_metadata_bulk(uris, meta_io_client, meta_io_stats, meta_field_id_mapping).await
})?;
let schemas = metadata
.iter()
.map(|m| {
let schema = infer_schema_with_options(m, &Some((*schema_infer_options).into()))?;
let daft_schema = daft_core::schema::Schema::try_from(&schema)?;
DaftResult::Ok(daft_schema)
})
.collect::<DaftResult<Vec<_>>>()?;
let (metadata, schemas) = if let Some(metadata) = parquet_metadata {
let schemas = metadata
.iter()
.map(|m| {
let schema = infer_schema_with_options(m, &Some((*schema_infer_options).into()))?;
let daft_schema = daft_core::schema::Schema::try_from(&schema)?;
DaftResult::Ok(Arc::new(daft_schema))
})
.collect::<DaftResult<Vec<_>>>()?;
(metadata, schemas)
} else {
let metadata = runtime_handle
.block_on(async move {
read_parquet_metadata_bulk(
uris,
meta_io_client,
meta_io_stats,
meta_field_id_mapping,
)
.await
})?
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>();

let schemas = metadata
.iter()
.map(|m| {
let schema = infer_schema_with_options(m, &Some((*schema_infer_options).into()))?;
let daft_schema = daft_core::schema::Schema::try_from(&schema)?;
DaftResult::Ok(Arc::new(daft_schema))
})
.collect::<DaftResult<Vec<_>>>()?;
(metadata, schemas)
};

let any_stats_avail = metadata
.iter()
.flat_map(|m| m.row_groups.iter())
Expand Down Expand Up @@ -1055,8 +1095,10 @@ pub(crate) fn read_parquet_into_micropartition(
let scan_task_daft_schema = match catalog_provided_schema {
Some(catalog_provided_schema) => catalog_provided_schema,
None => {
let unioned_schema = schemas.into_iter().try_reduce(|l, r| l.union(&r))?;
Arc::new(unioned_schema.expect("we need at least 1 schema"))
let unioned_schema = schemas
.into_iter()
.try_reduce(|l, r| l.union(&r).map(Arc::new))?;
unioned_schema.expect("we need at least 1 schema")
}
};

Expand Down Expand Up @@ -1090,17 +1132,19 @@ pub(crate) fn read_parquet_into_micropartition(
let scan_task = ScanTask::new(
owned_urls
.into_iter()
.zip(metadata)
.zip(
row_groups
.unwrap_or_else(|| std::iter::repeat(None).take(uris.len()).collect()),
)
.map(|(url, rgs)| DataFileSource::AnonymousDataFile {
.map(|((url, metadata), rgs)| DataFileSource::AnonymousDataFile {
path: url,
chunk_spec: rgs.map(ChunkSpec::Parquet),
size_bytes: Some(size_bytes),
metadata: None,
partition_spec: partition_spec.cloned(),
statistics: None,
parquet_metadata: Some(metadata),
})
.collect::<Vec<_>>(),
FileFormatConfig::Parquet(ParquetSourceConfig {
Expand Down
2 changes: 2 additions & 0 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ impl PyMicroPartition {
&schema_infer_options,
None,
None,
None,
)
})?;
Ok(mp.into())
Expand Down Expand Up @@ -646,6 +647,7 @@ impl PyMicroPartition {
&schema_infer_options,
None,
None,
None,
)
})?;
Ok(mp.into())
Expand Down
19 changes: 12 additions & 7 deletions src/daft-parquet/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub mod pylib {
Some(io_stats.clone()),
runtime_handle,
schema_infer_options,
None,
)?
.into();
Ok(result)
Expand Down Expand Up @@ -163,6 +164,7 @@ pub mod pylib {
runtime_handle,
&schema_infer_options,
None,
None,
)?
.into_iter()
.map(|v| v.into())
Expand Down Expand Up @@ -233,13 +235,16 @@ pub mod pylib {
multithreaded_io.unwrap_or(true),
io_config.unwrap_or_default().config.into(),
)?;
Ok(Arc::new(crate::read::read_parquet_schema(
uri,
io_client,
Some(io_stats),
schema_infer_options,
None, // TODO: allow passing in of field_id_mapping through Python API?
)?)
Ok(Arc::new(
crate::read::read_parquet_schema(
uri,
io_client,
Some(io_stats),
schema_infer_options,
None, // TODO: allow passing in of field_id_mapping through Python API?
)?
.0,
)
.into())
})
}
Expand Down
Loading
Loading