diff --git a/Cargo.lock b/Cargo.lock index d7033dc1b4..ad454d9fdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1887,6 +1887,7 @@ dependencies = [ "daft-table", "futures", "itertools 0.11.0", + "parquet2", "pyo3", "pyo3-log", "serde", diff --git a/daft/daft.pyi b/daft/daft.pyi index 835cf86695..5f7355f1ac 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -732,6 +732,7 @@ class ScanOperatorHandle: storage_config: StorageConfig, infer_schema: bool, schema: PySchema | None = None, + is_ray_runner: bool = False, ) -> ScanOperatorHandle: ... @staticmethod def from_python_scan_operator(operator: ScanOperator) -> ScanOperatorHandle: ... diff --git a/daft/io/_csv.py b/daft/io/_csv.py index 22a802a0a8..2d489bd615 100644 --- a/daft/io/_csv.py +++ b/daft/io/_csv.py @@ -103,5 +103,6 @@ def read_csv( schema=schema, file_format_config=file_format_config, storage_config=storage_config, + is_ray_runner=context.get_context().is_ray_runner, ) return DataFrame(builder) diff --git a/daft/io/_json.py b/daft/io/_json.py index 51f73998c6..1fbe499a6f 100644 --- a/daft/io/_json.py +++ b/daft/io/_json.py @@ -62,5 +62,6 @@ def read_json( schema=schema_hints, file_format_config=file_format_config, storage_config=storage_config, + is_ray_runner=context.get_context().is_ray_runner, ) return DataFrame(builder) diff --git a/daft/io/_parquet.py b/daft/io/_parquet.py index b9d9ddc502..11dd6c5a0b 100644 --- a/daft/io/_parquet.py +++ b/daft/io/_parquet.py @@ -53,9 +53,10 @@ def read_parquet( if isinstance(path, list) and len(path) == 0: raise ValueError("Cannot read DataFrame from from empty list of Parquet filepaths") + is_ray_runner = context.get_context().is_ray_runner # If running on Ray, we want to limit the amount of concurrency and requests being made. # This is because each Ray worker process receives its own pool of thread workers and connections - multithreaded_io = not context.get_context().is_ray_runner if _multithreaded_io is None else _multithreaded_io + multithreaded_io = not is_ray_runner if _multithreaded_io is None else _multithreaded_io if isinstance(coerce_int96_timestamp_unit, str): coerce_int96_timestamp_unit = TimeUnit.from_str(coerce_int96_timestamp_unit) @@ -76,5 +77,6 @@ def read_parquet( schema=schema_hints, file_format_config=file_format_config, storage_config=storage_config, + is_ray_runner=is_ray_runner, ) return DataFrame(builder) diff --git a/daft/io/common.py b/daft/io/common.py index 363f4812ec..9b41c236e7 100644 --- a/daft/io/common.py +++ b/daft/io/common.py @@ -24,6 +24,7 @@ def get_tabular_files_scan( schema: dict[str, DataType] | None, file_format_config: FileFormatConfig, storage_config: StorageConfig, + is_ray_runner: bool, ) -> LogicalPlanBuilder: """Returns a TabularFilesScan LogicalPlan for a given glob filepath.""" # Glob the path using the Runner @@ -41,6 +42,7 @@ def get_tabular_files_scan( storage_config, infer_schema=infer_schema, schema=_get_schema_from_dict(schema)._schema if schema is not None else None, + is_ray_runner=is_ray_runner, ) builder = LogicalPlanBuilder.from_tabular_scan( diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 330d2b809d..cab4017571 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -20,6 +20,7 @@ use daft_scan::storage_config::{NativeStorageConfig, StorageConfig}; use daft_scan::{ChunkSpec, DataFileSource, Pushdowns, ScanTask}; use daft_table::Table; +use parquet2::metadata::FileMetaData; use snafu::ResultExt; #[cfg(feature = "python")] @@ -134,6 +135,11 @@ fn materialize_scan_task( ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit)); let urls = urls.collect::>(); let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice()); + let metadatas = scan_task + .sources + .iter() + .map(|s| s.get_parquet_metadata().cloned()) + .collect::>>(); daft_parquet::read::read_parquet_bulk( urls.as_slice(), file_column_names.as_deref(), @@ -147,6 +153,7 @@ fn materialize_scan_task( runtime_handle, &inference_options, field_id_mapping.clone(), + metadatas, ) .context(DaftCoreComputeSnafu)? } @@ -575,6 +582,11 @@ impl MicroPartition { .columns .as_ref() .map(|cols| cols.iter().map(|s| s.as_str()).collect::>()); + let parquet_metadata = scan_task + .sources + .iter() + .map(|s| s.get_parquet_metadata().cloned()) + .collect::>>(); let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice()); read_parquet_into_micropartition( @@ -597,6 +609,7 @@ impl MicroPartition { }, Some(schema.clone()), field_id_mapping.clone(), + parquet_metadata, ) .context(DaftCoreComputeSnafu) } @@ -932,6 +945,7 @@ fn _read_parquet_into_loaded_micropartition( runtime_handle, schema_infer_options, field_id_mapping, + None, )?; // Prefer using the `catalog_provided_schema` but fall back onto inferred schema from Parquet files @@ -979,6 +993,7 @@ pub(crate) fn read_parquet_into_micropartition( schema_infer_options: &ParquetSchemaInferenceOptions, catalog_provided_schema: Option, field_id_mapping: Option>>, + parquet_metadata: Option>>, ) -> DaftResult { if let Some(so) = start_offset && so > 0 @@ -1017,17 +1032,42 @@ pub(crate) fn read_parquet_into_micropartition( let meta_io_client = io_client.clone(); let meta_io_stats = io_stats.clone(); let meta_field_id_mapping = field_id_mapping.clone(); - let metadata = runtime_handle.block_on(async move { - read_parquet_metadata_bulk(uris, meta_io_client, meta_io_stats, meta_field_id_mapping).await - })?; - let schemas = metadata - .iter() - .map(|m| { - let schema = infer_schema_with_options(m, &Some((*schema_infer_options).into()))?; - let daft_schema = daft_core::schema::Schema::try_from(&schema)?; - DaftResult::Ok(daft_schema) - }) - .collect::>>()?; + let (metadata, schemas) = if let Some(metadata) = parquet_metadata { + let schemas = metadata + .iter() + .map(|m| { + let schema = infer_schema_with_options(m, &Some((*schema_infer_options).into()))?; + let daft_schema = daft_core::schema::Schema::try_from(&schema)?; + DaftResult::Ok(Arc::new(daft_schema)) + }) + .collect::>>()?; + (metadata, schemas) + } else { + let metadata = runtime_handle + .block_on(async move { + read_parquet_metadata_bulk( + uris, + meta_io_client, + meta_io_stats, + meta_field_id_mapping, + ) + .await + })? + .into_iter() + .map(Arc::new) + .collect::>(); + + let schemas = metadata + .iter() + .map(|m| { + let schema = infer_schema_with_options(m, &Some((*schema_infer_options).into()))?; + let daft_schema = daft_core::schema::Schema::try_from(&schema)?; + DaftResult::Ok(Arc::new(daft_schema)) + }) + .collect::>>()?; + (metadata, schemas) + }; + let any_stats_avail = metadata .iter() .flat_map(|m| m.row_groups.iter()) @@ -1055,8 +1095,10 @@ pub(crate) fn read_parquet_into_micropartition( let scan_task_daft_schema = match catalog_provided_schema { Some(catalog_provided_schema) => catalog_provided_schema, None => { - let unioned_schema = schemas.into_iter().try_reduce(|l, r| l.union(&r))?; - Arc::new(unioned_schema.expect("we need at least 1 schema")) + let unioned_schema = schemas + .into_iter() + .try_reduce(|l, r| l.union(&r).map(Arc::new))?; + unioned_schema.expect("we need at least 1 schema") } }; @@ -1090,17 +1132,19 @@ pub(crate) fn read_parquet_into_micropartition( let scan_task = ScanTask::new( owned_urls .into_iter() + .zip(metadata) .zip( row_groups .unwrap_or_else(|| std::iter::repeat(None).take(uris.len()).collect()), ) - .map(|(url, rgs)| DataFileSource::AnonymousDataFile { + .map(|((url, metadata), rgs)| DataFileSource::AnonymousDataFile { path: url, chunk_spec: rgs.map(ChunkSpec::Parquet), size_bytes: Some(size_bytes), metadata: None, partition_spec: partition_spec.cloned(), statistics: None, + parquet_metadata: Some(metadata), }) .collect::>(), FileFormatConfig::Parquet(ParquetSourceConfig { diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 418b56adc1..c42b5b1540 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -603,6 +603,7 @@ impl PyMicroPartition { &schema_infer_options, None, None, + None, ) })?; Ok(mp.into()) @@ -646,6 +647,7 @@ impl PyMicroPartition { &schema_infer_options, None, None, + None, ) })?; Ok(mp.into()) diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 8c80f74cad..6ba511741f 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -50,6 +50,7 @@ pub mod pylib { Some(io_stats.clone()), runtime_handle, schema_infer_options, + None, )? .into(); Ok(result) @@ -163,6 +164,7 @@ pub mod pylib { runtime_handle, &schema_infer_options, None, + None, )? .into_iter() .map(|v| v.into()) @@ -233,13 +235,16 @@ pub mod pylib { multithreaded_io.unwrap_or(true), io_config.unwrap_or_default().config.into(), )?; - Ok(Arc::new(crate::read::read_parquet_schema( - uri, - io_client, - Some(io_stats), - schema_infer_options, - None, // TODO: allow passing in of field_id_mapping through Python API? - )?) + Ok(Arc::new( + crate::read::read_parquet_schema( + uri, + io_client, + Some(io_stats), + schema_infer_options, + None, // TODO: allow passing in of field_id_mapping through Python API? + )? + .0, + ) .into()) }) } diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 0007529917..da77a4506d 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -1,5 +1,6 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use arrow2::io::parquet::read::schema::infer_schema_with_options; use common_error::DaftResult; use daft_core::{ @@ -15,6 +16,7 @@ use futures::{ StreamExt, TryStreamExt, }; use itertools::Itertools; +use parquet2::metadata::FileMetaData; use snafu::ResultExt; use tokio::runtime::Runtime; @@ -67,6 +69,7 @@ async fn read_parquet_single( io_stats: Option, schema_infer_options: ParquetSchemaInferenceOptions, field_id_mapping: Option>>, + metadata: Option>, ) -> DaftResult { let field_id_mapping_provided = field_id_mapping.is_some(); let original_columns = columns; @@ -98,6 +101,7 @@ async fn read_parquet_single( row_groups.clone(), predicate.clone(), schema_infer_options, + metadata, ) .await } else { @@ -137,7 +141,7 @@ async fn read_parquet_single( let parquet_reader = builder.build()?; let ranges = parquet_reader.prebuffer_ranges(io_client, io_stats)?; Ok(( - metadata, + Arc::new(metadata), parquet_reader.read_from_ranges_into_table(ranges).await?, )) }?; @@ -229,6 +233,7 @@ async fn read_parquet_single_into_arrow( io_stats: Option, schema_infer_options: ParquetSchemaInferenceOptions, field_id_mapping: Option>>, + metadata: Option>, ) -> DaftResult<(arrow2::datatypes::SchemaRef, Vec)> { let field_id_mapping_provided = field_id_mapping.is_some(); let (source_type, fixed_uri) = parse_url(uri)?; @@ -242,6 +247,7 @@ async fn read_parquet_single_into_arrow( row_groups.clone(), None, schema_infer_options, + metadata, ) .await?; (metadata, Arc::new(schema), all_arrays) @@ -280,7 +286,7 @@ async fn read_parquet_single_into_arrow( let all_arrays = parquet_reader .read_from_ranges_into_arrow_arrays(ranges) .await?; - (metadata, schema, all_arrays) + (Arc::new(metadata), schema, all_arrays) }; let rows_per_row_groups = metadata @@ -374,6 +380,7 @@ pub fn read_parquet( io_stats: Option, runtime_handle: Arc, schema_infer_options: ParquetSchemaInferenceOptions, + metadata: Option>, ) -> DaftResult
{ let _rt_guard = runtime_handle.enter(); runtime_handle.block_on(async { @@ -388,6 +395,7 @@ pub fn read_parquet( io_stats, schema_infer_options, None, + metadata, ) .await }) @@ -419,6 +427,7 @@ pub fn read_parquet_into_pyarrow( io_stats, schema_infer_options, None, + None, ); if let Some(timeout) = file_timeout_ms { match tokio::time::timeout(Duration::from_millis(timeout as u64), fut).await { @@ -449,6 +458,7 @@ pub fn read_parquet_bulk( runtime_handle: Arc, schema_infer_options: &ParquetSchemaInferenceOptions, field_id_mapping: Option>>, + metadata: Option>>, ) -> DaftResult> { let _rt_guard = runtime_handle.enter(); let owned_columns = columns.map(|s| s.iter().map(|v| String::from(*v)).collect::>()); @@ -468,6 +478,7 @@ pub fn read_parquet_bulk( let owned_columns = owned_columns.clone(); let owned_row_group = row_groups.as_ref().and_then(|rgs| rgs[i].clone()); let owned_predicate = predicate.clone(); + let metadata = metadata.as_ref().map(|mds| mds[i].clone()); let io_client = io_client.clone(); let io_stats = io_stats.clone(); @@ -490,6 +501,7 @@ pub fn read_parquet_bulk( io_stats, schema_infer_options, owned_field_id_mapping, + metadata, ) .await?, )) @@ -558,6 +570,7 @@ pub fn read_parquet_into_pyarrow_bulk( io_stats, schema_infer_options, None, + None, ) .await?, )) @@ -579,7 +592,7 @@ pub fn read_parquet_schema( io_stats: Option, schema_inference_options: ParquetSchemaInferenceOptions, field_id_mapping: Option>>, -) -> DaftResult { +) -> DaftResult<(Schema, FileMetaData)> { let runtime_handle = get_runtime(true)?; let _rt_guard = runtime_handle.enter(); let builder = runtime_handle.block_on(async { @@ -587,7 +600,11 @@ pub fn read_parquet_schema( })?; let builder = builder.set_infer_schema_options(schema_inference_options); - Schema::try_from(builder.build()?.arrow_schema().as_ref()) + let metadata = builder.metadata; + let arrow_schema = + infer_schema_with_options(&metadata, &Some(schema_inference_options.into()))?; + let schema = Schema::try_from(&arrow_schema)?; + Ok((schema, metadata)) } pub async fn read_parquet_metadata( @@ -742,6 +759,7 @@ mod tests { None, runtime_handle, Default::default(), + None, )?; assert_eq!(table.len(), 100); diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index 982be9c2da..63e8e45012 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, fs::File}; +use std::{collections::HashSet, fs::File, sync::Arc}; use arrow2::io::parquet::read; use common_error::DaftResult; @@ -47,6 +47,7 @@ fn prune_fields_from_schema( } } +#[allow(clippy::too_many_arguments)] pub(crate) fn local_parquet_read_into_arrow( uri: &str, columns: Option<&[String]>, @@ -55,8 +56,9 @@ pub(crate) fn local_parquet_read_into_arrow( row_groups: Option<&[i64]>, predicate: Option, schema_infer_options: ParquetSchemaInferenceOptions, + metadata: Option>, ) -> super::Result<( - parquet2::metadata::FileMetaData, + Arc, arrow2::datatypes::Schema, Vec, )> { @@ -80,12 +82,14 @@ pub(crate) fn local_parquet_read_into_arrow( file_size: size as usize, }); } - - let metadata = read::read_metadata(&mut reader).with_context(|_| { - super::UnableToParseMetadataFromLocalFileSnafu { - path: uri.to_string(), - } - })?; + let metadata = match metadata { + Some(m) => m, + None => read::read_metadata(&mut reader) + .with_context(|_| super::UnableToParseMetadataFromLocalFileSnafu { + path: uri.to_string(), + }) + .map(Arc::new)?, + }; // and infer a [`Schema`] from the `metadata`. let schema = infer_schema_with_options(&metadata, &Some(schema_infer_options.into())) @@ -176,6 +180,7 @@ pub(crate) fn local_parquet_read_into_arrow( Ok((metadata, schema, all_columns)) } +#[allow(clippy::too_many_arguments)] pub(crate) async fn local_parquet_read_async( uri: &str, columns: Option>, @@ -184,7 +189,8 @@ pub(crate) async fn local_parquet_read_async( row_groups: Option>, predicate: Option, schema_infer_options: ParquetSchemaInferenceOptions, -) -> DaftResult<(parquet2::metadata::FileMetaData, Table)> { + metadata: Option>, +) -> DaftResult<(Arc, Table)> { let (send, recv) = tokio::sync::oneshot::channel(); let uri = uri.to_string(); rayon::spawn(move || { @@ -197,6 +203,7 @@ pub(crate) async fn local_parquet_read_async( row_groups.as_deref(), predicate, schema_infer_options, + metadata, ); let (metadata, schema, arrays) = v?; @@ -226,6 +233,7 @@ pub(crate) async fn local_parquet_read_async( recv.await.context(super::OneShotRecvSnafu {})? } +#[allow(clippy::too_many_arguments)] pub(crate) async fn local_parquet_read_into_arrow_async( uri: &str, columns: Option>, @@ -234,8 +242,9 @@ pub(crate) async fn local_parquet_read_into_arrow_async( row_groups: Option>, predicate: Option, schema_infer_options: ParquetSchemaInferenceOptions, + metadata: Option>, ) -> super::Result<( - parquet2::metadata::FileMetaData, + Arc, arrow2::datatypes::Schema, Vec, )> { @@ -250,6 +259,7 @@ pub(crate) async fn local_parquet_read_into_arrow_async( row_groups.as_deref(), predicate, schema_infer_options, + metadata, ); let _ = send.send(v); }); diff --git a/src/daft-plan/src/physical_planner/planner.rs b/src/daft-plan/src/physical_planner/planner.rs index 8fe0beb6f0..b2f9ddfdd7 100644 --- a/src/daft-plan/src/physical_planner/planner.rs +++ b/src/daft-plan/src/physical_planner/planner.rs @@ -310,7 +310,6 @@ impl AdaptivePlanner { source_id: None, }; let output = self.logical_plan.clone().rewrite(&mut rewriter)?; - let physical_plan = rewriter .physical_children .pop() diff --git a/src/daft-scan/Cargo.toml b/src/daft-scan/Cargo.toml index bb7b0adea5..8702cbb635 100644 --- a/src/daft-scan/Cargo.toml +++ b/src/daft-scan/Cargo.toml @@ -14,6 +14,7 @@ daft-stats = {path = "../daft-stats", default-features = false} daft-table = {path = "../daft-table", default-features = false} futures = {workspace = true} itertools = {workspace = true} +parquet2 = {workspace = true} pyo3 = {workspace = true, optional = true} pyo3-log = {workspace = true} serde = {workspace = true} diff --git a/src/daft-scan/src/anonymous.rs b/src/daft-scan/src/anonymous.rs index a3db188b22..4786608a39 100644 --- a/src/daft-scan/src/anonymous.rs +++ b/src/daft-scan/src/anonymous.rs @@ -80,6 +80,7 @@ impl ScanOperator for AnonymousScanOperator { metadata: None, partition_spec: None, statistics: None, + parquet_metadata: None, }], file_format_config.clone(), schema.clone(), diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index edf63084db..4ec85b4040 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, vec}; use common_error::{DaftError, DaftResult}; use daft_core::schema::SchemaRef; use daft_csv::CsvParseOptions; -use daft_io::{parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef}; +use daft_io::{get_runtime, parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef}; use daft_parquet::read::ParquetSchemaInferenceOptions; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use snafu::Snafu; @@ -19,6 +19,7 @@ pub struct GlobScanOperator { file_format_config: Arc, schema: SchemaRef, storage_config: Arc, + is_ray_runner: bool, } /// Wrapper struct that implements a sync Iterator for a BoxStream @@ -129,6 +130,7 @@ impl GlobScanOperator { storage_config: Arc, infer_schema: bool, schema: Option, + is_ray_runner: bool, ) -> DaftResult { let first_glob_path = match glob_paths.first() { None => Err(DaftError::ValueError( @@ -169,7 +171,8 @@ impl GlobScanOperator { let io_stats = IOStatsContext::new(format!( "GlobScanOperator constructor read_parquet_schema: for uri {first_filepath}" )); - daft_parquet::read::read_parquet_schema( + + let (schema, _metadata) = daft_parquet::read::read_parquet_schema( first_filepath.as_str(), io_client.clone(), Some(io_stats), @@ -177,7 +180,9 @@ impl GlobScanOperator { coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit, }, field_id_mapping.clone(), - )? + )?; + + schema } FileFormatConfig::Csv(CsvSourceConfig { delimiter, @@ -233,12 +238,12 @@ impl GlobScanOperator { } false => schema.expect("Schema must be provided if infer_schema is false"), }; - Ok(Self { glob_paths: glob_paths.iter().map(|s| s.to_string()).collect(), file_format_config, schema, storage_config, + is_ray_runner, }) } } @@ -293,7 +298,7 @@ impl ScanOperator for GlobScanOperator { let file_format_config = self.file_format_config.clone(); let schema = self.schema.clone(); let storage_config = self.storage_config.clone(); - + let is_ray_runner = self.is_ray_runner; // Create one ScanTask per file Ok(Box::new(files.map(move |f| { let FileMetadata { @@ -301,6 +306,38 @@ impl ScanOperator for GlobScanOperator { size: size_bytes, .. } = f?; + + let path_clone = path.clone(); + let io_client_clone = io_client.clone(); + let field_id_mapping = match file_format_config.as_ref() { + FileFormatConfig::Parquet(ParquetSourceConfig { + field_id_mapping, .. + }) => Some(field_id_mapping.clone()), + _ => None, + }; + + // We skip reading parquet metadata if we are running in Ray + // because the metadata can be quite large + let parquet_metadata = if !is_ray_runner { + if let Some(field_id_mapping) = field_id_mapping { + get_runtime(true).unwrap().block_on(async { + daft_parquet::read::read_parquet_metadata( + &path_clone, + io_client_clone, + Some(io_stats.clone()), + field_id_mapping.clone(), + ) + .await + .ok() + .map(Arc::new) + }) + } else { + None + } + } else { + None + }; + Ok(ScanTask::new( vec![DataFileSource::AnonymousDataFile { path: path.to_string(), @@ -309,6 +346,7 @@ impl ScanOperator for GlobScanOperator { metadata: None, partition_spec: None, statistics: None, + parquet_metadata, }], file_format_config.clone(), schema.clone(), diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index 7f8f33a4bc..84bf003437 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -16,6 +16,7 @@ use daft_dsl::ExprRef; use daft_stats::{PartitionSpec, TableMetadata, TableStatistics}; use file_format::FileFormatConfig; use itertools::Itertools; +use parquet2::metadata::FileMetaData; use serde::{Deserialize, Serialize}; mod anonymous; @@ -124,6 +125,7 @@ pub enum DataFileSource { metadata: Option, partition_spec: Option, statistics: Option, + parquet_metadata: Option>, }, CatalogDataFile { path: String, @@ -164,6 +166,15 @@ impl DataFileSource { } } + pub fn get_parquet_metadata(&self) -> Option<&Arc> { + match self { + Self::AnonymousDataFile { + parquet_metadata, .. + } => parquet_metadata.as_ref(), + _ => None, + } + } + pub fn get_chunk_spec(&self) -> Option<&ChunkSpec> { match self { Self::AnonymousDataFile { chunk_spec, .. } @@ -224,6 +235,7 @@ impl DataFileSource { metadata, partition_spec, statistics, + parquet_metadata: _, } | Self::DatabaseDataSource { path, diff --git a/src/daft-scan/src/python.rs b/src/daft-scan/src/python.rs index 3c188f35a6..ce0fbf0583 100644 --- a/src/daft-scan/src/python.rs +++ b/src/daft-scan/src/python.rs @@ -121,6 +121,7 @@ pub mod pylib { storage_config: PyStorageConfig, infer_schema: bool, schema: Option, + is_ray_runner: Option, ) -> PyResult { py.allow_threads(|| { let operator = Arc::new(GlobScanOperator::try_new( @@ -129,6 +130,7 @@ pub mod pylib { storage_config.into(), infer_schema, schema.map(|s| s.schema), + is_ray_runner.unwrap_or(false), )?); Ok(ScanOperatorHandle { scan_op: ScanOperatorRef(operator),