Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT][ScanOperator 1/3] Add MVP e2e ScanOperator integration. #1559

Merged
merged 17 commits into from
Nov 7, 2023

Conversation

clarkzinzow
Copy link
Contributor

@clarkzinzow clarkzinzow commented Nov 1, 2023

This PR adds an e2e integration for the new ScanOperator for reading from external sources, integrating with logical plan building, logical -> physical plan translation, physical plan scheduling, physical task execution, and the actual MicroPartition-based reading.

TODOs (possibly before merging)

Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a first pass review! Great work so far :)

@@ -67,7 +67,7 @@ def read_csv(
)
file_format_config = FileFormatConfig.from_csv_config(csv_config)
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(io_config))
storage_config = StorageConfig.native(NativeStorageConfig(True, io_config))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaychia Did we decide to only use the multithreading backend for the python runner or are we just gonna call yolo and use it for ray too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do have a hardcoded default for Ray to false, but only for the Parquet read (https://github.com/Eventual-Inc/Daft/blob/main/daft/io/_parquet.py#L53-L55). We can do it for CSV as well?

src/daft-micropartition/src/micropartition.rs Outdated Show resolved Hide resolved
src/daft-micropartition/src/micropartition.rs Show resolved Hide resolved
src/daft-micropartition/src/micropartition.rs Outdated Show resolved Hide resolved
src/daft-plan/src/physical_ops/scan.rs Show resolved Hide resolved
src/daft-scan/src/lib.rs Outdated Show resolved Hide resolved
src/daft-scan/src/lib.rs Outdated Show resolved Hide resolved
src/daft-scan/src/lib.rs Outdated Show resolved Hide resolved
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Pushdowns {
/// Optional filters to apply to the source data.
pub filters: Option<Arc<Vec<ExprRef>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these be HashSets instead since ordering shouldn't matter for equality and hashing?

src/daft-scan/src/python.rs Outdated Show resolved Hide resolved
@jaychia jaychia force-pushed the clark/scan-operator-integration branch from b091508 to 9768c21 Compare November 2, 2023 00:04
@github-actions github-actions bot added the enhancement New feature or request label Nov 2, 2023
Jay Chia and others added 7 commits November 2, 2023 17:21
…e instead of on `dyn ScanOperator` trait object (#1562)

Performs equality on the Arc pointer instead of relying on logic to
check the underlying `dyn ScanOperator` trait object for equality, which
could be tricky because of some custom vtable logic.

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
…on-optional TableMetadata (#1563)

Also refactors `MicroPartition::new` into explicit `::new_loaded` and
`::new_unloaded` variants:

1. Helps us enforce that `TableStatistics` is `Some` when the state is
`TableState::Unloaded` at construction-time
2. Cleans up client code, because `TableState` no longer needs to be
exposed externally and we can hide the calculations for TableMetadata
inside of the `::new_loaded` constructor
3. `::from_scan_task_batch` is very explicit and clean. It tries
`::new_unloaded` first, but falls back on `::new_loaded` if the
ScanTaskBatch is not provided with both metadata/statistics

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
…ew scan node builder (#1564)

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
Copy link

codecov bot commented Nov 3, 2023

Codecov Report

Merging #1559 (c113d5a) into main (c8fe883) will decrease coverage by 0.36%.
The diff coverage is 50.00%.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1559      +/-   ##
==========================================
- Coverage   85.21%   84.85%   -0.36%     
==========================================
  Files          54       54              
  Lines        5121     5165      +44     
==========================================
+ Hits         4364     4383      +19     
- Misses        757      782      +25     
Files Coverage Δ
daft/execution/execution_step.py 92.83% <ø> (ø)
daft/io/_csv.py 94.73% <100.00%> (ø)
daft/io/_parquet.py 100.00% <100.00%> (ø)
daft/table/table_io.py 95.83% <ø> (ø)
daft/table/table.py 81.97% <75.00%> (-0.10%) ⬇️
daft/table/micropartition.py 89.06% <60.00%> (-0.78%) ⬇️
daft/logical/builder.py 86.72% <50.00%> (-2.17%) ⬇️
daft/execution/rust_physical_plan_shim.py 87.50% <52.94%> (-10.69%) ⬇️
daft/io/common.py 65.00% <26.66%> (-23.89%) ⬇️

@jaychia jaychia changed the title [FEAT] [Scan Operator] [New Query Planner] Add MVP e2e ScanOperator integration. [FEAT][ScanOperator 1/3] Add MVP e2e ScanOperator integration. Nov 4, 2023
# This environment variable will make Daft use the new "v2 scans" and MicroPartitions when building Daft logical plans
if os.getenv("DAFT_V2_SCANS", "0") == "1":
assert (
os.getenv("DAFT_MICROPARTITIONS", "0") == "1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just override this to be 1 on default if DAFT_V2_SCANS is set

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah unfortunately we use os.getenv("DAFT_MICROPARTITIONS", "0") == "1" at import time to hotswap our Table implementation, so by the time we hit this code it might be "too late" to override it

daft/io/common.py Show resolved Hide resolved
daft/io/common.py Show resolved Hide resolved
src/daft-micropartition/src/micropartition.rs Outdated Show resolved Hide resolved
src/daft-plan/src/physical_plan.rs Show resolved Hide resolved
src/daft-scan/Cargo.toml Show resolved Hide resolved
src/daft-scan/src/glob.rs Outdated Show resolved Hide resolved
src/daft-scan/src/glob.rs Outdated Show resolved Hide resolved
src/daft-scan/src/glob.rs Outdated Show resolved Hide resolved
@@ -14,6 +15,8 @@ pub struct Source {
/// Information about the source data location.
pub source_info: Arc<SourceInfo>,

// TODO(Clark): Replace these pushdown fields with the Pushdown struct, where the Pushdown struct would exist
// on the LegacyExternalInfo struct in SourceInfo.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaychia Btw the TODO in the PR description "Consolidate filter/limit pushdowns to use the same Pushdown struct." is referring to this, which still should be done but definitely doesn't need to block the merging of the PR IMO.

pub file_format_config: Arc<FileFormatConfig>,
pub schema: SchemaRef,
pub storage_config: Arc<StorageConfig>,
// TODO(Clark): Directly use the Pushdowns struct as part of the ScanTask struct?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaychia The TODO in the PR description "Consolidate filter/limit pushdowns to use the same Pushdown struct." was also referring to this, where we could use the Pushdowns struct as a pushdowns field on the ScanTask.

src/daft-scan/src/glob.rs Outdated Show resolved Hide resolved
src/daft-scan/src/glob.rs Outdated Show resolved Hide resolved
jaychia and others added 4 commits November 6, 2023 12:18
…ves ScanTaskBatch (#1565)

1. Replaces `ScanTask` with `ScanTaskBatch`, which mostly like
`ScanTask` except that it is multi-file
2. This helps us deduplicate information representation between
`ScanTask` and `ScanTaskBatch`

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
Adds various fixes to daft v2 scans:

1. Fix for how we handle schema hints in the Python logical scan node
builder
2. Fix for our ScanGlobOperator not correctly prefixing local paths with
`file://` schemes
3. Adds a branch in micropartitions for materializing from a ScanTask
that is a CSV
4. Implements column pruning pushdown rules, and correct handling during
ScanTask materialization

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
from daft.table import Table

PartitionT = TypeVar("PartitionT")


def scan_with_tasks(
scan_tasks: list[ScanTask],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: I changed this from a single ScanTaskBatch to avoid having to coalesce all the tasks into one fat task

Copy link
Contributor Author

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaychia LGTM overall, can't approve my own PR so leaving this as a comment!

src/daft-micropartition/src/micropartition.rs Outdated Show resolved Hide resolved
@jaychia jaychia merged commit e176f2e into main Nov 7, 2023
36 of 37 checks passed
@jaychia jaychia deleted the clark/scan-operator-integration branch November 7, 2023 03:28
clarkzinzow added a commit that referenced this pull request Nov 8, 2023
…tion` reads (#1578)

This PR adds support for the Python I/O layer to `MicroPartition` reads,
which thereby adds support for reading `MicroPartition`s from JSON files
with the scan operator path.

As a driveby, this PR also fixes a column ordering bug when out-of-order
column projections are provided to our native CSV reader.

This PR is stacked on top of
#1559
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants