-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] [New Query Planner] Logical --> physical translation, physical plan execution. #1232
Conversation
…heduling translation, a Limit op, and other misc. tweaks.
fs: fsspec.AbstractFileSystem | None = None, | ||
) -> RayPartitionSet: | ||
partition_refs = ray.get( | ||
_glob_path_into_details_vpartitions.remote(source_paths, RayRunnerIO.FS_LISTING_SCHEMA, source_info, fs=fs) | ||
_glob_path_into_details_vpartitions.remote( | ||
source_paths, RayRunnerIO.FS_LISTING_SCHEMA, file_format_config, fs=fs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heads up, this didn't work for me when I tried the Ray runner, got this error:
(_glob_path_into_details_vpartitions pid=97917) 2023-08-04 11:05:37,459 ERROR serialization.py:371 -- type object 'FileFormatConfig' has no attribute 'from_parquet_config'
(_glob_path_into_details_vpartitions pid=97917) Traceback (most recent call last):
(_glob_path_into_details_vpartitions pid=97917) File "/Users/charles/daft/venv/lib/python3.9/site-packages/ray/_private/serialization.py", line 369, in deserialize_objects
(_glob_path_into_details_vpartitions pid=97917) obj = self._deserialize_object(data, metadata, object_ref)
(_glob_path_into_details_vpartitions pid=97917) File "/Users/charles/daft/venv/lib/python3.9/site-packages/ray/_private/serialization.py", line 252, in _deserialize_object
(_glob_path_into_details_vpartitions pid=97917) return self._deserialize_msgpack_data(data, metadata_fields)
(_glob_path_into_details_vpartitions pid=97917) File "/Users/charles/daft/venv/lib/python3.9/site-packages/ray/_private/serialization.py", line 207, in _deserialize_msgpack_data
(_glob_path_into_details_vpartitions pid=97917) python_objects = self._deserialize_pickle5_data(pickle5_data)
(_glob_path_into_details_vpartitions pid=97917) File "/Users/charles/daft/venv/lib/python3.9/site-packages/ray/_private/serialization.py", line 197, in _deserialize_pickle5_data
(_glob_path_into_details_vpartitions pid=97917) obj = pickle.loads(in_band)
(_glob_path_into_details_vpartitions pid=97917) AttributeError: type object 'FileFormatConfig' has no attribute 'from_parquet_config'
Pyrunner is working though so we can probably fix it later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah I haven't added anything for the Ray runner yet, I believe that we agreed that was out of scope for the e2e MVP.
tests/dataframe/test_creation.py
Outdated
@@ -648,8 +648,11 @@ def test_create_dataframe_parquet(valid_data: list[dict[str, float]], use_native | |||
df = daft.read_parquet(f.name, use_native_downloader=use_native_downloader) | |||
assert df.column_names == COL_NAMES | |||
|
|||
# df = df.where(daft.col("sepal_length") > 4.8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove before merging?
} | ||
|
||
#[cfg(feature = "python")] | ||
impl PhysicalPlan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thoughts on implementing this part in python instead for brevity? we could write the methods in python and attach it to the rust class (or the python wrapper):
def to_partition_tasks(self) -> Iterator[PartitionTask]:
...
daft.daft.LogicalPlanBuilder.to_partition_tasks = to_partition_tasks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thoughts on implementing this part in python instead for brevity?
I briefly mentioned this in the PR description, but there are a few motivators for keeping at least the physical plan traversal in Rust, and maybe the calling of the physical_plan.py
functions in Rust as well:
- We can gradually port the scheduling and execution logic for individual operators over from Python to Rust; I'm loosely thinking that we could create all-Rust-backed
Instruction
implementations +PartitionTask
/Runner
logic to execute a pipeline of such instructions without the GIL. - By driving the Python-based physical plan scheduling from the Rust side, we avoid having to expose the Rust physical operators (or Python-oriented views of them) to Python, which would otherwise be a decent bit of pyo3 work.
(2) is the main driver since (1) is a bit hand-wavy at this point, but maybe there's a good middle ground here with a Python shim layer that's oriented to the Rust physical plan? We could then do away with a good bit of these pyo3 function calls to set up the arguments for the physical_plan.*()
calls. For example, if we had something like
def filter(input: InProgressPhysicalPlan[PartitionT], predicate: PyExpr) -> InProgressPhysicalPlan[PartitionT]:
filter_step = Filter(ExpressionsProjection([Expr._from_pyexpr(predicate)]))
return physical_plan.pipeline_instruction(input, filter_step, ResourceRequest())
then the Rust-side would be reduced to:
PhysicalPlan::Filter(Filter { input, predicate }) => {
let upstream_iter = input.to_partition_tasks(py)?;
let py_iter = py
.import(pyo3::intern!(py, "daft.execution.rust_physical_plan"))?
.getattr(pyo3::intern!(py, "filter"))?
.call1((upstream_iter, PyExpr::from(predicate.clone())))?;
Ok(py_iter.into())
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh I see. This looks fine as-is for now, and the shim you're suggesting also sounds nice, maybe we can try that next week
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also shim as needed per op, so if a shim would be nicer for the Agg
op, definitely feel free to add it!
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #1232 +/- ##
==========================================
- Coverage 88.04% 87.67% -0.37%
==========================================
Files 56 56
Lines 5661 5698 +37
==========================================
+ Hits 4984 4996 +12
- Misses 677 702 +25
|
This PR adds the following for the new Rust-based query planner:
Source
andFilter
opsLimit
logical op, physical op, and physical op executionSourceInfo
into a Rust-side pyo3-exposedFileFormatConfig
struct + aFileFormat
enumPartitionSpec
andPartitionScheme
into Rust-side pyo3-exposed equivalents.By still controlling conversion of the physical plan to
Iterator[PartitionTask]
on the Rust side, and doing per-physical-op delegation to the Python physical plan scheduling functions, we can gradually port individual ops to do all Rust-based scheduling and underlying execution, with a very thin Python dispatch shim (required since we're still using Ray).TODOs