Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT][ScanOperator 1/3] Add MVP e2e ScanOperator integration. #1559

Merged
merged 17 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 46 additions & 5 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,7 @@ class ParquetSourceConfig:
Configuration of a Parquet data source.
"""

# Whether or not to use a multithreaded tokio runtime for processing I/O
multithreaded_io: bool

def __init__(self, multithreaded_io: bool): ...
def __init__(self, coerce_int96_timestamp_unit: PyTimeUnit | None = None, row_groups: list[int] | None = None): ...

class CsvSourceConfig:
"""
Expand Down Expand Up @@ -339,9 +336,11 @@ class NativeStorageConfig:
Storage configuration for the Rust-native I/O layer.
"""

# Whether or not to use a multithreaded tokio runtime for processing I/O
multithreaded_io: bool
io_config: IOConfig

def __init__(self, io_config: IOConfig | None = None): ...
def __init__(self, multithreaded_io: bool, io_config: IOConfig | None = None): ...

class PythonStorageConfig:
"""
Expand Down Expand Up @@ -374,6 +373,42 @@ class StorageConfig:
@property
def config(self) -> NativeStorageConfig | PythonStorageConfig: ...

class ScanTask:
"""
A batch of scan tasks for reading data from an external source.
"""

def num_rows(self) -> int:
"""
Get number of rows that will be scanned by this ScanTask.
"""
...
def size_bytes(self) -> int:
"""
Get number of bytes that will be scanned by this ScanTask.
"""
...

class ScanOperatorHandle:
"""
A handle to a scan operator.
"""

@staticmethod
def anonymous_scan(
files: list[str],
schema: PySchema,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
) -> ScanOperatorHandle: ...
@staticmethod
def glob_scan(
glob_path: str,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
schema: PySchema | None = None,
) -> ScanOperatorHandle: ...

def read_parquet(
uri: str,
columns: list[str] | None = None,
Expand Down Expand Up @@ -722,6 +757,8 @@ class PyMicroPartition:
@staticmethod
def empty(schema: PySchema | None = None) -> PyMicroPartition: ...
@staticmethod
def from_scan_task(scan_task: ScanTask) -> PyMicroPartition: ...
@staticmethod
def from_tables(tables: list[PyTable]) -> PyMicroPartition: ...
@staticmethod
def from_arrow_record_batches(record_batches: list[pyarrow.RecordBatch], schema: PySchema) -> PyMicroPartition: ...
Expand Down Expand Up @@ -814,6 +851,10 @@ class LogicalPlanBuilder:
partition_key: str, cache_entry: PartitionCacheEntry, schema: PySchema, num_partitions: int
) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan_with_scan_operator(
scan_operator: ScanOperatorHandle, schema_hint: PySchema | None
) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan(
file_infos: FileInfos, schema: PySchema, file_format_config: FileFormatConfig, storage_config: StorageConfig
) -> LogicalPlanBuilder: ...
Expand Down
1 change: 0 additions & 1 deletion daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ def _handle_tabular_files_scan(
schema=self.schema,
storage_config=self.storage_config,
read_options=read_options,
multithreaded_io=format_config.multithreaded_io,
)
for fp in filepaths
]
Expand Down
44 changes: 44 additions & 0 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Iterator, TypeVar, cast

from daft.daft import (
Expand All @@ -10,17 +11,60 @@
PySchema,
PyTable,
ResourceRequest,
ScanTask,
StorageConfig,
)
from daft.execution import execution_step, physical_plan
from daft.expressions import Expression, ExpressionsProjection
from daft.logical.map_partition_ops import MapPartitionOp
from daft.logical.schema import Schema
from daft.runners.partitioning import PartialPartitionMetadata
from daft.table import Table

PartitionT = TypeVar("PartitionT")


def scan_with_tasks(
scan_tasks: list[ScanTask],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: I changed this from a single ScanTaskBatch to avoid having to coalesce all the tasks into one fat task

) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
"""child_plan represents partitions with filenames.

Yield a plan to read those filenames.
"""
# TODO(Clark): Currently hardcoded to have 1 file per instruction
# We can instead right-size and bundle the ScanTask into single-instruction bulk reads.
for scan_task in scan_tasks:
scan_step = execution_step.PartitionTaskBuilder[PartitionT](inputs=[], partial_metadatas=None,).add_instruction(

Check warning on line 37 in daft/execution/rust_physical_plan_shim.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/rust_physical_plan_shim.py#L36-L37

Added lines #L36 - L37 were not covered by tests
instruction=ScanWithTask(scan_task),
# Set the filesize as the memory request.
# (Note: this is very conservative; file readers empirically use much more peak memory than 1x file size.)
resource_request=ResourceRequest(memory_bytes=scan_task.size_bytes()),
)
yield scan_step

Check warning on line 43 in daft/execution/rust_physical_plan_shim.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/rust_physical_plan_shim.py#L43

Added line #L43 was not covered by tests


@dataclass(frozen=True)
class ScanWithTask(execution_step.SingleOutputInstruction):
scan_task: ScanTask

def run(self, inputs: list[Table]) -> list[Table]:
return self._scan(inputs)

Check warning on line 51 in daft/execution/rust_physical_plan_shim.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/rust_physical_plan_shim.py#L51

Added line #L51 was not covered by tests

def _scan(self, inputs: list[Table]) -> list[Table]:
assert len(inputs) == 0
return [Table._from_scan_task(self.scan_task)]

Check warning on line 55 in daft/execution/rust_physical_plan_shim.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/rust_physical_plan_shim.py#L54-L55

Added lines #L54 - L55 were not covered by tests

def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]:
assert len(input_metadatas) == 0

Check warning on line 58 in daft/execution/rust_physical_plan_shim.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/rust_physical_plan_shim.py#L58

Added line #L58 was not covered by tests

return [

Check warning on line 60 in daft/execution/rust_physical_plan_shim.py

View check run for this annotation

Codecov / codecov/patch

daft/execution/rust_physical_plan_shim.py#L60

Added line #L60 was not covered by tests
PartialPartitionMetadata(
num_rows=self.scan_task.num_rows(),
size_bytes=None,
)
]


def tabular_scan(
schema: PySchema,
columns_to_read: list[str] | None,
Expand Down
2 changes: 1 addition & 1 deletion daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def read_csv(
)
file_format_config = FileFormatConfig.from_csv_config(csv_config)
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
storage_config = StorageConfig.native(NativeStorageConfig(True, io_config))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaychia Did we decide to only use the multithreading backend for the python runner or are we just gonna call yolo and use it for ray too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do have a hardcoded default for Ray to false, but only for the Parquet read (https://github.com/Eventual-Inc/Daft/blob/main/daft/io/_parquet.py#L53-L55). We can do it for CSV as well?

else:
storage_config = StorageConfig.python(PythonStorageConfig(None, io_config=io_config))
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
Expand Down
8 changes: 2 additions & 6 deletions daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,9 @@ def read_parquet(
# This is because each Ray worker process receives its own pool of thread workers and connections
multithreaded_io = not context.get_context().is_ray_runner if _multithreaded_io is None else _multithreaded_io

file_format_config = FileFormatConfig.from_parquet_config(
ParquetSourceConfig(
multithreaded_io=multithreaded_io,
)
)
file_format_config = FileFormatConfig.from_parquet_config(ParquetSourceConfig())
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(None, io_config=io_config))

Expand Down
53 changes: 50 additions & 3 deletions daft/io/common.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING

from daft.context import get_context
from daft.daft import (
FileFormatConfig,
NativeStorageConfig,
PythonStorageConfig,
ScanOperatorHandle,
StorageConfig,
)
from daft.datatype import DataType
Expand All @@ -31,9 +33,6 @@
storage_config: StorageConfig,
) -> LogicalPlanBuilder:
"""Returns a TabularFilesScan LogicalPlan for a given glob filepath."""
paths = path if isinstance(path, list) else [str(path)]
schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None

# Glob the path using the Runner
# NOTE: Globbing will always need the IOConfig, regardless of whether "native reads" are used
io_config = None
Expand All @@ -44,6 +43,54 @@
else:
raise NotImplementedError(f"Tabular scan with config not implemented: {storage_config.config}")

schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None

### FEATURE_FLAG: $DAFT_V2_SCANS
#
# This environment variable will make Daft use the new "v2 scans" and MicroPartitions when building Daft logical plans
if os.getenv("DAFT_V2_SCANS", "0") == "1":
assert (

Check warning on line 52 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L52

Added line #L52 was not covered by tests
os.getenv("DAFT_MICROPARTITIONS", "0") == "1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just override this to be 1 on default if DAFT_V2_SCANS is set

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah unfortunately we use os.getenv("DAFT_MICROPARTITIONS", "0") == "1" at import time to hotswap our Table implementation, so by the time we hit this code it might be "too late" to override it

), "DAFT_V2_SCANS=1 requires DAFT_MICROPARTITIONS=1 to be set as well"

scan_op: ScanOperatorHandle
if isinstance(path, list):

Check warning on line 57 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L57

Added line #L57 was not covered by tests
# Eagerly globs each path and fallback to AnonymousScanOperator.
# NOTE: We could instead have GlobScanOperator take a list of paths and mux the glob output streams
jaychia marked this conversation as resolved.
Show resolved Hide resolved
runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(path, file_format_config=file_format_config, io_config=io_config)

Check warning on line 61 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L60-L61

Added lines #L60 - L61 were not covered by tests

# TODO: Should we move this into the AnonymousScanOperator itself?
# Infer schema if no hints provided
inferred_or_provided_schema = (

Check warning on line 65 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L65

Added line #L65 was not covered by tests
schema_hint
if schema_hint is not None
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config)
)

scan_op = ScanOperatorHandle.anonymous_scan(

Check warning on line 71 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L71

Added line #L71 was not covered by tests
file_infos.file_paths,
inferred_or_provided_schema._schema,
file_format_config,
storage_config,
)
elif isinstance(path, str):
scan_op = ScanOperatorHandle.glob_scan(

Check warning on line 78 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L77-L78

Added lines #L77 - L78 were not covered by tests
jaychia marked this conversation as resolved.
Show resolved Hide resolved
path,
file_format_config,
storage_config,
schema=schema_hint._schema if schema_hint is not None else None,
)
else:
raise NotImplementedError(f"_get_tabular_files_scan cannot construct ScanOperatorHandle for input: {path}")

Check warning on line 85 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L85

Added line #L85 was not covered by tests

builder = LogicalPlanBuilder.from_tabular_scan_with_scan_operator(

Check warning on line 87 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L87

Added line #L87 was not covered by tests
scan_operator=scan_op,
schema_hint=schema_hint,
)
return builder

Check warning on line 91 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L91

Added line #L91 was not covered by tests

paths = path if isinstance(path, list) else [str(path)]
runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config)

Expand Down
18 changes: 17 additions & 1 deletion daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@

from daft.daft import CountMode, FileFormat, FileFormatConfig, FileInfos, JoinType
from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder
from daft.daft import PartitionScheme, ResourceRequest, StorageConfig
from daft.daft import (
PartitionScheme,
ResourceRequest,
ScanOperatorHandle,
StorageConfig,
)
from daft.expressions import Expression, col
from daft.logical.schema import Schema
from daft.runners.partitioning import PartitionCacheEntry
Expand Down Expand Up @@ -66,6 +71,17 @@
builder = _LogicalPlanBuilder.in_memory_scan(partition.key, partition, schema._schema, num_partitions)
return cls(builder)

@classmethod
def from_tabular_scan_with_scan_operator(
cls,
*,
scan_operator: ScanOperatorHandle,
schema_hint: Schema | None,
) -> LogicalPlanBuilder:
pyschema = schema_hint._schema if schema_hint is not None else None
builder = _LogicalPlanBuilder.table_scan_with_scan_operator(scan_operator, pyschema)
return cls(builder)

Check warning on line 83 in daft/logical/builder.py

View check run for this annotation

Codecov / codecov/patch

daft/logical/builder.py#L81-L83

Added lines #L81 - L83 were not covered by tests

@classmethod
def from_tabular_scan(
cls,
Expand Down
6 changes: 6 additions & 0 deletions daft/table/micropartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from daft.daft import IOConfig, JoinType
from daft.daft import PyMicroPartition as _PyMicroPartition
from daft.daft import PyTable as _PyTable
from daft.daft import ScanTask as _ScanTask
from daft.datatype import DataType, TimeUnit
from daft.expressions import Expression, ExpressionsProjection
from daft.logical.schema import Schema
Expand Down Expand Up @@ -64,6 +65,11 @@
pyt = _PyMicroPartition.empty(None) if schema is None else _PyMicroPartition.empty(schema._schema)
return MicroPartition._from_pymicropartition(pyt)

@staticmethod
def _from_scan_task(scan_task: _ScanTask) -> MicroPartition:
assert isinstance(scan_task, _ScanTask)
return MicroPartition._from_pymicropartition(_PyMicroPartition.from_scan_task(scan_task))

Check warning on line 71 in daft/table/micropartition.py

View check run for this annotation

Codecov / codecov/patch

daft/table/micropartition.py#L70-L71

Added lines #L70 - L71 were not covered by tests

@staticmethod
def _from_pytable(pyt: _PyTable) -> MicroPartition:
assert isinstance(pyt, _PyTable)
Expand Down
5 changes: 5 additions & 0 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from daft.arrow_utils import ensure_table
from daft.daft import JoinType
from daft.daft import PyTable as _PyTable
from daft.daft import ScanTask as _ScanTask
from daft.daft import read_csv as _read_csv
from daft.daft import read_parquet as _read_parquet
from daft.daft import read_parquet_bulk as _read_parquet_bulk
Expand Down Expand Up @@ -78,6 +79,10 @@
pyt = _PyTable.empty(None) if schema is None else _PyTable.empty(schema._schema)
return Table._from_pytable(pyt)

@staticmethod
def _from_scan_task(_: _ScanTask) -> Table:
raise NotImplementedError("_from_scan_task is not implemented for legacy Python Table.")

Check warning on line 84 in daft/table/table.py

View check run for this annotation

Codecov / codecov/patch

daft/table/table.py#L84

Added line #L84 was not covered by tests

@staticmethod
def _from_pytable(pyt: _PyTable) -> Table:
assert isinstance(pyt, _PyTable)
Expand Down
3 changes: 1 addition & 2 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def read_parquet(
storage_config: StorageConfig | None = None,
read_options: TableReadOptions = TableReadOptions(),
parquet_options: TableParseParquetOptions = TableParseParquetOptions(),
multithreaded_io: bool | None = None,
) -> Table:
"""Reads a Table from a Parquet file

Expand All @@ -131,7 +130,7 @@ def read_parquet(
num_rows=read_options.num_rows,
io_config=config.io_config,
coerce_int96_timestamp_unit=parquet_options.coerce_int96_timestamp_unit,
multithreaded_io=multithreaded_io,
multithreaded_io=config.multithreaded_io,
)
return _cast_table_to_schema(tbl, read_options=read_options, schema=schema)

Expand Down
1 change: 1 addition & 0 deletions src/daft-micropartition/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ daft-scan = {path = "../daft-scan", default-features = false}
daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
indexmap = {workspace = true, features = ["serde"]}
log = {workspace = true}
parquet2 = {workspace = true}
pyo3 = {workspace = true, optional = true}
pyo3-log = {workspace = true}
Expand Down
Loading
Loading