Skip to content

Commit

Permalink
[FEAT] [Scan Operator] Add ChunkSpec for specifying format-specific…
Browse files Browse the repository at this point in the history
… per-file row subset selection for `ScanTask`s. (#1590)

This PR adds a `ChunkSpec` abstraction + integration that allows for
format-specific per-file row subset selection for `ScanTask`s. The first
concrete implementation is for Parquet, via an index-based row group
selection. Pushing this down to be a `ScanTask` concept makes it easier
to specify things like this at a per-file level without polluting
user-facing abstractions like the file format config, and should make
merging/splitting `ScanTask`s along the row dimension easier.

In the future, other formats such as CSV and NDJSON could support
selecting by byte range ending at row boundaries.
  • Loading branch information
clarkzinzow authored Nov 11, 2023
1 parent cdc1b94 commit ef4d2fd
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 55 deletions.
8 changes: 4 additions & 4 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class ParquetSourceConfig:
Configuration of a Parquet data source.
"""

def __init__(self, coerce_int96_timestamp_unit: PyTimeUnit | None = None, row_groups: list[int] | None = None): ...
def __init__(self, coerce_int96_timestamp_unit: PyTimeUnit | None = None): ...

class CsvSourceConfig:
"""
Expand Down Expand Up @@ -424,7 +424,7 @@ def read_parquet_bulk(
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[list[int]] | None = None,
row_groups: list[list[int] | None] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand All @@ -450,7 +450,7 @@ def read_parquet_into_pyarrow_bulk(
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[list[int]] | None = None,
row_groups: list[list[int] | None] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand Down Expand Up @@ -804,7 +804,7 @@ class PyMicroPartition:
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[list[int]] | None = None,
row_groups: list[list[int] | None] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = None,
multithreaded_io: bool | None = None,
Expand Down
2 changes: 1 addition & 1 deletion daft/table/micropartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def read_parquet_bulk(
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups_per_path: list[list[int]] | None = None,
row_groups_per_path: list[list[int] | None] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand Down
4 changes: 2 additions & 2 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def read_parquet_bulk(
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups_per_path: list[list[int]] | None = None,
row_groups_per_path: list[list[int] | None] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand Down Expand Up @@ -532,7 +532,7 @@ def read_parquet_into_pyarrow_bulk(
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups_per_path: list[list[int]] | None = None,
row_groups_per_path: list[list[int] | None] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
Expand Down
67 changes: 42 additions & 25 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use daft_parquet::read::{
};
use daft_scan::file_format::{CsvSourceConfig, FileFormatConfig, ParquetSourceConfig};
use daft_scan::storage_config::{NativeStorageConfig, StorageConfig};
use daft_scan::{DataFileSource, ScanTask};
use daft_scan::{ChunkSpec, DataFileSource, ScanTask};
use daft_table::Table;

use snafu::ResultExt;
Expand Down Expand Up @@ -107,20 +107,17 @@ fn materialize_scan_task(
// ********************
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit,
// TODO(Clark): Support different row group specification per file.
row_groups,
}) => {
let inference_options =
ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit));
let urls = urls.collect::<Vec<_>>();
let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice());
daft_parquet::read::read_parquet_bulk(
urls.as_slice(),
column_names.as_deref(),
None,
scan_task.limit,
row_groups
.as_ref()
.map(|row_groups| vec![row_groups.clone(); urls.len()]),
row_groups,
io_client.clone(),
io_stats,
8,
Expand Down Expand Up @@ -209,7 +206,7 @@ fn materialize_scan_task(
py,
url,
*has_headers,
delimiter,
delimiter.as_str(),
*double_quote,
cast_to_schema.clone().into(),
scan_task.storage_config.clone().into(),
Expand Down Expand Up @@ -320,30 +317,26 @@ impl MicroPartition {
_,
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit,
row_groups,
}),
StorageConfig::Native(cfg),
) => {
let uris = scan_task
.sources
.iter()
.map(|s| s.get_path())
.collect::<Vec<_>>();
let columns = scan_task
.columns
.as_ref()
.map(|cols| cols.iter().map(|s| s.as_str()).collect::<Vec<&str>>());

let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice());
read_parquet_into_micropartition(
scan_task
.sources
.iter()
.map(|s| s.get_path())
.collect::<Vec<_>>()
.as_slice(),
uris.as_slice(),
columns.as_deref(),
None,
scan_task.limit,
row_groups.clone().map(|rg| {
std::iter::repeat(rg)
.take(scan_task.sources.len())
.collect::<Vec<_>>()
}), // HACK: Properly propagate multi-file row_groups
row_groups,
cfg.io_config
.clone()
.map(|c| Arc::new(c.clone()))
Expand Down Expand Up @@ -510,6 +503,24 @@ fn prune_fields_from_schema_ref(
}
}

fn parquet_sources_to_row_groups(sources: &[DataFileSource]) -> Option<Vec<Option<Vec<i64>>>> {
let row_groups = sources
.iter()
.map(|s| {
if let Some(ChunkSpec::Parquet(row_group)) = s.get_chunk_spec() {
Some(row_group.clone())
} else {
None
}
})
.collect::<Vec<_>>();
if row_groups.iter().any(|rgs| rgs.is_some()) {
Some(row_groups)
} else {
None
}
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn read_csv_into_micropartition(
uris: &[&str],
Expand Down Expand Up @@ -586,7 +597,7 @@ pub(crate) fn read_parquet_into_micropartition(
columns: Option<&[&str]>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
io_config: Arc<IOConfig>,
io_stats: Option<IOStatsRef>,
num_parallel_tasks: usize,
Expand Down Expand Up @@ -646,10 +657,12 @@ pub(crate) fn read_parquet_into_micropartition(
Some(row_groups) => metadata
.iter()
.zip(row_groups.iter())
.map(|(fm, rg)| {
rg.iter()
.map(|(fm, rg)| match rg {
Some(rg) => rg
.iter()
.map(|rg_idx| fm.row_groups.get(*rg_idx as usize).unwrap().num_rows())
.sum::<usize>()
.sum::<usize>(),
None => fm.num_rows,
})
.sum(),
};
Expand All @@ -670,8 +683,13 @@ pub(crate) fn read_parquet_into_micropartition(
let scan_task = ScanTask::new(
owned_urls
.into_iter()
.map(|url| DataFileSource::AnonymousDataFile {
.zip(
row_groups
.unwrap_or_else(|| std::iter::repeat(None).take(uris.len()).collect()),
)
.map(|(url, rgs)| DataFileSource::AnonymousDataFile {
path: url,
chunk_spec: rgs.map(ChunkSpec::Parquet),
size_bytes: Some(size_bytes),
metadata: None,
partition_spec: None,
Expand All @@ -680,7 +698,6 @@ pub(crate) fn read_parquet_into_micropartition(
.collect::<Vec<_>>(),
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit: schema_infer_options.coerce_int96_timestamp_unit,
row_groups: None,
})
.into(),
daft_schema.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ impl PyMicroPartition {
columns.as_deref(),
start_offset,
num_rows,
row_groups.map(|rg| vec![rg]),
row_groups.map(|rg| vec![Some(rg)]),
io_config,
Some(io_stats),
1,
Expand All @@ -457,7 +457,7 @@ impl PyMicroPartition {
columns: Option<Vec<&str>>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
io_config: Option<IOConfig>,
num_parallel_tasks: Option<i64>,
multithreaded_io: Option<bool>,
Expand Down
4 changes: 2 additions & 2 deletions src/daft-parquet/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub mod pylib {
columns: Option<Vec<&str>>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
io_config: Option<IOConfig>,
num_parallel_tasks: Option<i64>,
multithreaded_io: Option<bool>,
Expand Down Expand Up @@ -170,7 +170,7 @@ pub mod pylib {
columns: Option<Vec<&str>>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
io_config: Option<IOConfig>,
num_parallel_tasks: Option<i64>,
multithreaded_io: Option<bool>,
Expand Down
14 changes: 4 additions & 10 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ pub fn read_parquet_bulk(
columns: Option<&[&str]>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
num_parallel_tasks: usize,
Expand All @@ -395,10 +395,7 @@ pub fn read_parquet_bulk(
let task_stream = futures::stream::iter(uris.iter().enumerate().map(|(i, uri)| {
let uri = uri.to_string();
let owned_columns = owned_columns.clone();
let owned_row_group = match &row_groups {
None => None,
Some(v) => v.get(i).cloned(),
};
let owned_row_group = row_groups.as_ref().and_then(|rgs| rgs[i].clone());

let io_client = io_client.clone();
let io_stats = io_stats.clone();
Expand Down Expand Up @@ -441,7 +438,7 @@ pub fn read_parquet_into_pyarrow_bulk(
columns: Option<&[&str]>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
num_parallel_tasks: usize,
Expand All @@ -466,10 +463,7 @@ pub fn read_parquet_into_pyarrow_bulk(
let uri = uri.to_string();
let owned_columns = owned_columns.clone();
let schema_infer_options = schema_infer_options;
let owned_row_group = match &row_groups {
None => None,
Some(v) => v.get(i).cloned(),
};
let owned_row_group = row_groups.as_ref().and_then(|rgs| rgs[i].clone());

let io_client = io_client.clone();
let io_stats = io_stats.clone();
Expand Down
1 change: 1 addition & 0 deletions src/daft-scan/src/anonymous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl ScanOperator for AnonymousScanOperator {
Ok(ScanTask::new(
vec![DataFileSource::AnonymousDataFile {
path: f.to_string(),
chunk_spec: None,
size_bytes: None,
metadata: None,
partition_spec: None,
Expand Down
9 changes: 1 addition & 8 deletions src/daft-scan/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,21 @@ impl FileFormatConfig {
#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))]
pub struct ParquetSourceConfig {
pub coerce_int96_timestamp_unit: TimeUnit,
pub row_groups: Option<Vec<i64>>,
}

#[cfg(feature = "python")]
#[pymethods]
impl ParquetSourceConfig {
/// Create a config for a Parquet data source.
#[new]
fn new(coerce_int96_timestamp_unit: Option<PyTimeUnit>, row_groups: Option<Vec<i64>>) -> Self {
fn new(coerce_int96_timestamp_unit: Option<PyTimeUnit>) -> Self {
Self {
coerce_int96_timestamp_unit: coerce_int96_timestamp_unit
.unwrap_or(TimeUnit::Nanoseconds.into())
.into(),
row_groups,
}
}

#[getter]
fn row_groups(&self) -> PyResult<Option<Vec<i64>>> {
Ok(self.row_groups.clone())
}

#[getter]
fn coerce_int96_timestamp_unit(&self) -> PyResult<PyTimeUnit> {
Ok(self.coerce_int96_timestamp_unit.into())
Expand Down
2 changes: 1 addition & 1 deletion src/daft-scan/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ impl GlobScanOperator {
let inferred_schema = match file_format_config.as_ref() {
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit,
..
}) => {
let io_stats = IOStatsContext::new(format!(
"GlobScanOperator constructor read_parquet_schema: for uri {first_filepath}"
Expand Down Expand Up @@ -289,6 +288,7 @@ impl ScanOperator for GlobScanOperator {
Ok(ScanTask::new(
vec![DataFileSource::AnonymousDataFile {
path: path.to_string(),
chunk_spec: None,
size_bytes,
metadata: None,
partition_spec: None,
Expand Down
18 changes: 18 additions & 0 deletions src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,26 @@ impl From<Error> for pyo3::PyErr {
}
}

/// Specification of a subset of a file to be read.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ChunkSpec {
/// Selection of Parquet row groups.
Parquet(Vec<i64>),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DataFileSource {
AnonymousDataFile {
path: String,
chunk_spec: Option<ChunkSpec>,
size_bytes: Option<u64>,
metadata: Option<TableMetadata>,
partition_spec: Option<PartitionSpec>,
statistics: Option<TableStatistics>,
},
CatalogDataFile {
path: String,
chunk_spec: Option<ChunkSpec>,
size_bytes: Option<u64>,
metadata: TableMetadata,
partition_spec: PartitionSpec,
Expand All @@ -71,12 +80,21 @@ impl DataFileSource {
Self::AnonymousDataFile { path, .. } | Self::CatalogDataFile { path, .. } => path,
}
}

pub fn get_chunk_spec(&self) -> Option<&ChunkSpec> {
match self {
Self::AnonymousDataFile { chunk_spec, .. }
| Self::CatalogDataFile { chunk_spec, .. } => chunk_spec.as_ref(),
}
}

pub fn get_size_bytes(&self) -> Option<u64> {
match self {
Self::AnonymousDataFile { size_bytes, .. }
| Self::CatalogDataFile { size_bytes, .. } => *size_bytes,
}
}

pub fn get_metadata(&self) -> Option<&TableMetadata> {
match self {
Self::AnonymousDataFile { metadata, .. } => metadata.as_ref(),
Expand Down

0 comments on commit ef4d2fd

Please sign in to comment.