-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT][ScanOperator 1/3] Add MVP e2e ScanOperator
integration.
#1559
Changes from all commits
422e7b2
9768c21
636faa6
619b911
39492ad
2740a54
acec8ec
b8b6fff
63e7146
59263e9
7aae2fe
effaa97
39619ec
bbddbd0
24d6a46
2d3579b
c113d5a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,7 +70,7 @@ def read_csv( | |
) | ||
file_format_config = FileFormatConfig.from_csv_config(csv_config) | ||
if use_native_downloader: | ||
storage_config = StorageConfig.native(NativeStorageConfig(io_config)) | ||
storage_config = StorageConfig.native(NativeStorageConfig(True, io_config)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jaychia Did we decide to only use the multithreading backend for the python runner or are we just gonna call yolo and use it for ray too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do have a hardcoded default for Ray to |
||
else: | ||
storage_config = StorageConfig.python(PythonStorageConfig(None, io_config=io_config)) | ||
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,14 @@ | ||
from __future__ import annotations | ||
|
||
import os | ||
from typing import TYPE_CHECKING | ||
|
||
from daft.context import get_context | ||
from daft.daft import ( | ||
FileFormatConfig, | ||
NativeStorageConfig, | ||
PythonStorageConfig, | ||
ScanOperatorHandle, | ||
StorageConfig, | ||
) | ||
from daft.datatype import DataType | ||
|
@@ -31,9 +33,6 @@ | |
storage_config: StorageConfig, | ||
) -> LogicalPlanBuilder: | ||
"""Returns a TabularFilesScan LogicalPlan for a given glob filepath.""" | ||
paths = path if isinstance(path, list) else [str(path)] | ||
schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None | ||
|
||
# Glob the path using the Runner | ||
# NOTE: Globbing will always need the IOConfig, regardless of whether "native reads" are used | ||
io_config = None | ||
|
@@ -44,6 +43,54 @@ | |
else: | ||
raise NotImplementedError(f"Tabular scan with config not implemented: {storage_config.config}") | ||
|
||
schema_hint = _get_schema_from_hints(schema_hints) if schema_hints is not None else None | ||
|
||
### FEATURE_FLAG: $DAFT_V2_SCANS | ||
# | ||
# This environment variable will make Daft use the new "v2 scans" and MicroPartitions when building Daft logical plans | ||
if os.getenv("DAFT_V2_SCANS", "0") == "1": | ||
assert ( | ||
os.getenv("DAFT_MICROPARTITIONS", "0") == "1" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's just override this to be 1 on default if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah unfortunately we use |
||
), "DAFT_V2_SCANS=1 requires DAFT_MICROPARTITIONS=1 to be set as well" | ||
|
||
scan_op: ScanOperatorHandle | ||
if isinstance(path, list): | ||
# Eagerly globs each path and fallback to AnonymousScanOperator. | ||
# NOTE: We could instead have GlobScanOperator take a list of paths and mux the glob output streams | ||
jaychia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
runner_io = get_context().runner().runner_io() | ||
file_infos = runner_io.glob_paths_details(path, file_format_config=file_format_config, io_config=io_config) | ||
|
||
# TODO: Should we move this into the AnonymousScanOperator itself? | ||
# Infer schema if no hints provided | ||
inferred_or_provided_schema = ( | ||
schema_hint | ||
if schema_hint is not None | ||
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config) | ||
) | ||
|
||
scan_op = ScanOperatorHandle.anonymous_scan( | ||
file_infos.file_paths, | ||
inferred_or_provided_schema._schema, | ||
file_format_config, | ||
storage_config, | ||
) | ||
elif isinstance(path, str): | ||
scan_op = ScanOperatorHandle.glob_scan( | ||
jaychia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
path, | ||
file_format_config, | ||
storage_config, | ||
schema=schema_hint._schema if schema_hint is not None else None, | ||
) | ||
else: | ||
raise NotImplementedError(f"_get_tabular_files_scan cannot construct ScanOperatorHandle for input: {path}") | ||
|
||
builder = LogicalPlanBuilder.from_tabular_scan_with_scan_operator( | ||
scan_operator=scan_op, | ||
schema_hint=schema_hint, | ||
) | ||
return builder | ||
|
||
paths = path if isinstance(path, list) else [str(path)] | ||
runner_io = get_context().runner().runner_io() | ||
file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: I changed this from a single
ScanTaskBatch
to avoid having to coalesce all the tasks into one fat task