-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT][ScanOperator 1/3] Add MVP e2e ScanOperator
integration.
#1559
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a first pass review! Great work so far :)
@@ -67,7 +67,7 @@ def read_csv( | |||
) | |||
file_format_config = FileFormatConfig.from_csv_config(csv_config) | |||
if use_native_downloader: | |||
storage_config = StorageConfig.native(NativeStorageConfig(io_config)) | |||
storage_config = StorageConfig.native(NativeStorageConfig(True, io_config)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jaychia Did we decide to only use the multithreading backend for the python runner or are we just gonna call yolo and use it for ray too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do have a hardcoded default for Ray to false
, but only for the Parquet read (https://github.com/Eventual-Inc/Daft/blob/main/daft/io/_parquet.py#L53-L55). We can do it for CSV as well?
#[derive(Debug, Clone, PartialEq, Eq, Hash)] | ||
pub struct Pushdowns { | ||
/// Optional filters to apply to the source data. | ||
pub filters: Option<Arc<Vec<ExprRef>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should these be HashSets instead since ordering shouldn't matter for equality and hashing?
b091508
to
9768c21
Compare
…block merging into main
…e instead of on `dyn ScanOperator` trait object (#1562) Performs equality on the Arc pointer instead of relying on logic to check the underlying `dyn ScanOperator` trait object for equality, which could be tricky because of some custom vtable logic. Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
…on-optional TableMetadata (#1563) Also refactors `MicroPartition::new` into explicit `::new_loaded` and `::new_unloaded` variants: 1. Helps us enforce that `TableStatistics` is `Some` when the state is `TableState::Unloaded` at construction-time 2. Cleans up client code, because `TableState` no longer needs to be exposed externally and we can hide the calculations for TableMetadata inside of the `::new_loaded` constructor 3. `::from_scan_task_batch` is very explicit and clean. It tries `::new_unloaded` first, but falls back on `::new_loaded` if the ScanTaskBatch is not provided with both metadata/statistics Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
…ew scan node builder (#1564) Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #1559 +/- ##
==========================================
- Coverage 85.21% 84.85% -0.36%
==========================================
Files 54 54
Lines 5121 5165 +44
==========================================
+ Hits 4364 4383 +19
- Misses 757 782 +25
|
ScanOperator
integration.ScanOperator
integration.
# This environment variable will make Daft use the new "v2 scans" and MicroPartitions when building Daft logical plans | ||
if os.getenv("DAFT_V2_SCANS", "0") == "1": | ||
assert ( | ||
os.getenv("DAFT_MICROPARTITIONS", "0") == "1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just override this to be 1 on default if DAFT_V2_SCANS
is set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah unfortunately we use os.getenv("DAFT_MICROPARTITIONS", "0") == "1"
at import time to hotswap our Table
implementation, so by the time we hit this code it might be "too late" to override it
@@ -14,6 +15,8 @@ pub struct Source { | |||
/// Information about the source data location. | |||
pub source_info: Arc<SourceInfo>, | |||
|
|||
// TODO(Clark): Replace these pushdown fields with the Pushdown struct, where the Pushdown struct would exist | |||
// on the LegacyExternalInfo struct in SourceInfo. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jaychia Btw the TODO in the PR description "Consolidate filter/limit pushdowns to use the same Pushdown struct." is referring to this, which still should be done but definitely doesn't need to block the merging of the PR IMO.
src/daft-scan/src/lib.rs
Outdated
pub file_format_config: Arc<FileFormatConfig>, | ||
pub schema: SchemaRef, | ||
pub storage_config: Arc<StorageConfig>, | ||
// TODO(Clark): Directly use the Pushdowns struct as part of the ScanTask struct? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jaychia The TODO in the PR description "Consolidate filter/limit pushdowns to use the same Pushdown struct." was also referring to this, where we could use the Pushdowns
struct as a pushdowns
field on the ScanTask
.
…ves ScanTaskBatch (#1565) 1. Replaces `ScanTask` with `ScanTaskBatch`, which mostly like `ScanTask` except that it is multi-file 2. This helps us deduplicate information representation between `ScanTask` and `ScanTaskBatch` --------- Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
Adds various fixes to daft v2 scans: 1. Fix for how we handle schema hints in the Python logical scan node builder 2. Fix for our ScanGlobOperator not correctly prefixing local paths with `file://` schemes 3. Adds a branch in micropartitions for materializing from a ScanTask that is a CSV 4. Implements column pruning pushdown rules, and correct handling during ScanTask materialization --------- Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
Co-authored-by: Clark Zinzow <[email protected]>
Co-authored-by: Clark Zinzow <[email protected]>
from daft.table import Table | ||
|
||
PartitionT = TypeVar("PartitionT") | ||
|
||
|
||
def scan_with_tasks( | ||
scan_tasks: list[ScanTask], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: I changed this from a single ScanTaskBatch
to avoid having to coalesce all the tasks into one fat task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jaychia LGTM overall, can't approve my own PR so leaving this as a comment!
…tion` reads (#1578) This PR adds support for the Python I/O layer to `MicroPartition` reads, which thereby adds support for reading `MicroPartition`s from JSON files with the scan operator path. As a driveby, this PR also fixes a column ordering bug when out-of-order column projections are provided to our native CSV reader. This PR is stacked on top of #1559
This PR adds an e2e integration for the new
ScanOperator
for reading from external sources, integrating with logical plan building, logical -> physical plan translation, physical plan scheduling, physical task execution, and the actualMicroPartition
-based reading.TODOs (possibly before merging)
MicroPartition
level.MicroPartition
level.Pushdown
struct.TableMetadata
at theMicroPartition
level. ([CHORE] [ScanOperator-Follow-Ons-2] Refactor MicroPartition to have non-optional TableMetadata #1563)TableStatistics
when data is unloaded at theMicroPartition
level. ([CHORE] [ScanOperator-Follow-Ons-2] Refactor MicroPartition to have non-optional TableMetadata #1563)ScanOperator
implementation. ([CHORE] [ScanOperator-Follow-Ons-3] Integrate GlobScanOperator with new scan node builder #1564)ScanTask
configurations are compatible when merging into aScanTaskBatch
bundle.