Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] [Scan Operator] Add ChunkSpec for specifying format-specific per-file row subset selection for ScanTasks. #1590

Merged
merged 3 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class ParquetSourceConfig:
Configuration of a Parquet data source.
"""

def __init__(self, coerce_int96_timestamp_unit: PyTimeUnit | None = None, row_groups: list[int] | None = None): ...
def __init__(self, coerce_int96_timestamp_unit: PyTimeUnit | None = None): ...

class CsvSourceConfig:
"""
Expand Down Expand Up @@ -424,7 +424,7 @@ def read_parquet_bulk(
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[list[int]] | None = None,
row_groups: list[list[int] | None] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand All @@ -450,7 +450,7 @@ def read_parquet_into_pyarrow_bulk(
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[list[int]] | None = None,
row_groups: list[list[int] | None] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand Down Expand Up @@ -804,7 +804,7 @@ class PyMicroPartition:
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[list[int]] | None = None,
row_groups: list[list[int] | None] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = None,
multithreaded_io: bool | None = None,
Expand Down
2 changes: 1 addition & 1 deletion daft/table/micropartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def read_parquet_bulk(
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups_per_path: list[list[int]] | None = None,
row_groups_per_path: list[list[int] | None] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand Down
4 changes: 2 additions & 2 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def read_parquet_bulk(
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups_per_path: list[list[int]] | None = None,
row_groups_per_path: list[list[int] | None] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand Down Expand Up @@ -532,7 +532,7 @@ def read_parquet_into_pyarrow_bulk(
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups_per_path: list[list[int]] | None = None,
row_groups_per_path: list[list[int] | None] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand Down
67 changes: 42 additions & 25 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use daft_parquet::read::{
};
use daft_scan::file_format::{CsvSourceConfig, FileFormatConfig, ParquetSourceConfig};
use daft_scan::storage_config::{NativeStorageConfig, StorageConfig};
use daft_scan::{DataFileSource, ScanTask};
use daft_scan::{ChunkSpec, DataFileSource, ScanTask};
use daft_table::Table;

use snafu::ResultExt;
Expand Down Expand Up @@ -107,20 +107,17 @@ fn materialize_scan_task(
// ********************
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit,
// TODO(Clark): Support different row group specification per file.
row_groups,
}) => {
let inference_options =
ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit));
let urls = urls.collect::<Vec<_>>();
let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice());
daft_parquet::read::read_parquet_bulk(
urls.as_slice(),
column_names.as_deref(),
None,
scan_task.limit,
row_groups
.as_ref()
.map(|row_groups| vec![row_groups.clone(); urls.len()]),
row_groups,
io_client.clone(),
io_stats,
8,
Expand Down Expand Up @@ -209,7 +206,7 @@ fn materialize_scan_task(
py,
url,
*has_headers,
delimiter,
delimiter.as_str(),
*double_quote,
cast_to_schema.clone().into(),
scan_task.storage_config.clone().into(),
Expand Down Expand Up @@ -320,30 +317,26 @@ impl MicroPartition {
_,
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit,
row_groups,
}),
StorageConfig::Native(cfg),
) => {
let uris = scan_task
.sources
.iter()
.map(|s| s.get_path())
.collect::<Vec<_>>();
let columns = scan_task
.columns
.as_ref()
.map(|cols| cols.iter().map(|s| s.as_str()).collect::<Vec<&str>>());

let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice());
read_parquet_into_micropartition(
scan_task
.sources
.iter()
.map(|s| s.get_path())
.collect::<Vec<_>>()
.as_slice(),
uris.as_slice(),
columns.as_deref(),
None,
scan_task.limit,
row_groups.clone().map(|rg| {
std::iter::repeat(rg)
.take(scan_task.sources.len())
.collect::<Vec<_>>()
}), // HACK: Properly propagate multi-file row_groups
row_groups,
cfg.io_config
.clone()
.map(|c| Arc::new(c.clone()))
Expand Down Expand Up @@ -510,6 +503,24 @@ fn prune_fields_from_schema_ref(
}
}

fn parquet_sources_to_row_groups(sources: &[DataFileSource]) -> Option<Vec<Option<Vec<i64>>>> {
let row_groups = sources
.iter()
.map(|s| {
if let Some(ChunkSpec::Parquet(row_group)) = s.get_chunk_spec() {
Some(row_group.clone())
} else {
None
}
})
.collect::<Vec<_>>();
if row_groups.iter().any(|rgs| rgs.is_some()) {
Some(row_groups)
} else {
None
}
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn read_csv_into_micropartition(
uris: &[&str],
Expand Down Expand Up @@ -586,7 +597,7 @@ pub(crate) fn read_parquet_into_micropartition(
columns: Option<&[&str]>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
io_config: Arc<IOConfig>,
io_stats: Option<IOStatsRef>,
num_parallel_tasks: usize,
Expand Down Expand Up @@ -646,10 +657,12 @@ pub(crate) fn read_parquet_into_micropartition(
Some(row_groups) => metadata
.iter()
.zip(row_groups.iter())
.map(|(fm, rg)| {
rg.iter()
.map(|(fm, rg)| match rg {
Some(rg) => rg
.iter()
.map(|rg_idx| fm.row_groups.get(*rg_idx as usize).unwrap().num_rows())
.sum::<usize>()
.sum::<usize>(),
None => fm.num_rows,
})
.sum(),
};
Expand All @@ -670,8 +683,13 @@ pub(crate) fn read_parquet_into_micropartition(
let scan_task = ScanTask::new(
owned_urls
.into_iter()
.map(|url| DataFileSource::AnonymousDataFile {
.zip(
row_groups
.unwrap_or_else(|| std::iter::repeat(None).take(uris.len()).collect()),
)
.map(|(url, rgs)| DataFileSource::AnonymousDataFile {
path: url,
chunk_spec: rgs.map(ChunkSpec::Parquet),
size_bytes: Some(size_bytes),
metadata: None,
partition_spec: None,
Expand All @@ -680,7 +698,6 @@ pub(crate) fn read_parquet_into_micropartition(
.collect::<Vec<_>>(),
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit: schema_infer_options.coerce_int96_timestamp_unit,
row_groups: None,
})
.into(),
daft_schema.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ impl PyMicroPartition {
columns.as_deref(),
start_offset,
num_rows,
row_groups.map(|rg| vec![rg]),
row_groups.map(|rg| vec![Some(rg)]),
io_config,
Some(io_stats),
1,
Expand All @@ -457,7 +457,7 @@ impl PyMicroPartition {
columns: Option<Vec<&str>>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
io_config: Option<IOConfig>,
num_parallel_tasks: Option<i64>,
multithreaded_io: Option<bool>,
Expand Down
4 changes: 2 additions & 2 deletions src/daft-parquet/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub mod pylib {
columns: Option<Vec<&str>>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
io_config: Option<IOConfig>,
num_parallel_tasks: Option<i64>,
multithreaded_io: Option<bool>,
Expand Down Expand Up @@ -170,7 +170,7 @@ pub mod pylib {
columns: Option<Vec<&str>>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
io_config: Option<IOConfig>,
num_parallel_tasks: Option<i64>,
multithreaded_io: Option<bool>,
Expand Down
14 changes: 4 additions & 10 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ pub fn read_parquet_bulk(
columns: Option<&[&str]>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
num_parallel_tasks: usize,
Expand All @@ -395,10 +395,7 @@ pub fn read_parquet_bulk(
let task_stream = futures::stream::iter(uris.iter().enumerate().map(|(i, uri)| {
let uri = uri.to_string();
let owned_columns = owned_columns.clone();
let owned_row_group = match &row_groups {
None => None,
Some(v) => v.get(i).cloned(),
};
let owned_row_group = row_groups.as_ref().and_then(|rgs| rgs[i].clone());

let io_client = io_client.clone();
let io_stats = io_stats.clone();
Expand Down Expand Up @@ -441,7 +438,7 @@ pub fn read_parquet_into_pyarrow_bulk(
columns: Option<&[&str]>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
num_parallel_tasks: usize,
Expand All @@ -466,10 +463,7 @@ pub fn read_parquet_into_pyarrow_bulk(
let uri = uri.to_string();
let owned_columns = owned_columns.clone();
let schema_infer_options = schema_infer_options;
let owned_row_group = match &row_groups {
None => None,
Some(v) => v.get(i).cloned(),
};
let owned_row_group = row_groups.as_ref().and_then(|rgs| rgs[i].clone());

let io_client = io_client.clone();
let io_stats = io_stats.clone();
Expand Down
1 change: 1 addition & 0 deletions src/daft-scan/src/anonymous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl ScanOperator for AnonymousScanOperator {
Ok(ScanTask::new(
vec![DataFileSource::AnonymousDataFile {
path: f.to_string(),
chunk_spec: None,
size_bytes: None,
metadata: None,
partition_spec: None,
Expand Down
9 changes: 1 addition & 8 deletions src/daft-scan/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,21 @@ impl FileFormatConfig {
#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))]
pub struct ParquetSourceConfig {
pub coerce_int96_timestamp_unit: TimeUnit,
pub row_groups: Option<Vec<i64>>,
}

#[cfg(feature = "python")]
#[pymethods]
impl ParquetSourceConfig {
/// Create a config for a Parquet data source.
#[new]
fn new(coerce_int96_timestamp_unit: Option<PyTimeUnit>, row_groups: Option<Vec<i64>>) -> Self {
fn new(coerce_int96_timestamp_unit: Option<PyTimeUnit>) -> Self {
Self {
coerce_int96_timestamp_unit: coerce_int96_timestamp_unit
.unwrap_or(TimeUnit::Nanoseconds.into())
.into(),
row_groups,
}
}

#[getter]
fn row_groups(&self) -> PyResult<Option<Vec<i64>>> {
Ok(self.row_groups.clone())
}

#[getter]
fn coerce_int96_timestamp_unit(&self) -> PyResult<PyTimeUnit> {
Ok(self.coerce_int96_timestamp_unit.into())
Expand Down
2 changes: 1 addition & 1 deletion src/daft-scan/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ impl GlobScanOperator {
let inferred_schema = match file_format_config.as_ref() {
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit,
..
}) => {
let io_stats = IOStatsContext::new(format!(
"GlobScanOperator constructor read_parquet_schema: for uri {first_filepath}"
Expand Down Expand Up @@ -289,6 +288,7 @@ impl ScanOperator for GlobScanOperator {
Ok(ScanTask::new(
vec![DataFileSource::AnonymousDataFile {
path: path.to_string(),
chunk_spec: None,
size_bytes,
metadata: None,
partition_spec: None,
Expand Down
18 changes: 18 additions & 0 deletions src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,26 @@ impl From<Error> for pyo3::PyErr {
}
}

/// Specification of a subset of a file to be read.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ChunkSpec {
/// Selection of Parquet row groups.
Parquet(Vec<i64>),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DataFileSource {
AnonymousDataFile {
path: String,
chunk_spec: Option<ChunkSpec>,
size_bytes: Option<u64>,
metadata: Option<TableMetadata>,
partition_spec: Option<PartitionSpec>,
statistics: Option<TableStatistics>,
},
CatalogDataFile {
path: String,
chunk_spec: Option<ChunkSpec>,
size_bytes: Option<u64>,
metadata: TableMetadata,
partition_spec: PartitionSpec,
Expand All @@ -71,12 +80,21 @@ impl DataFileSource {
Self::AnonymousDataFile { path, .. } | Self::CatalogDataFile { path, .. } => path,
}
}

pub fn get_chunk_spec(&self) -> Option<&ChunkSpec> {
match self {
Self::AnonymousDataFile { chunk_spec, .. }
| Self::CatalogDataFile { chunk_spec, .. } => chunk_spec.as_ref(),
}
}

pub fn get_size_bytes(&self) -> Option<u64> {
match self {
Self::AnonymousDataFile { size_bytes, .. }
| Self::CatalogDataFile { size_bytes, .. } => *size_bytes,
}
}

pub fn get_metadata(&self) -> Option<&TableMetadata> {
match self {
Self::AnonymousDataFile { metadata, .. } => metadata.as_ref(),
Expand Down
Loading