Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] [New Query Planner] Logical --> physical translation, physical plan execution. #1232

Merged
merged 6 commits into from
Aug 7, 2023

Conversation

clarkzinzow
Copy link
Contributor

@clarkzinzow clarkzinzow commented Aug 4, 2023

This PR adds the following for the new Rust-based query planner:

  • Physical plan implementations for existing Source and Filter ops
  • Logical plan --> physical plan translation
  • Physical plan execution (currently via a physical plan --> Python physical plan scheduling translation + Python-side driving of execution)
  • Limit logical op, physical op, and physical op execution
  • Consolidated Python-side SourceInfo into a Rust-side pyo3-exposed FileFormatConfig struct + a FileFormat enum
  • Consolidated Python-side PartitionSpec and PartitionScheme into Rust-side pyo3-exposed equivalents.

By still controlling conversion of the physical plan to Iterator[PartitionTask] on the Rust side, and doing per-physical-op delegation to the Python physical plan scheduling functions, we can gradually port individual ops to do all Rust-based scheduling and underlying execution, with a very thin Python dispatch shim (required since we're still using Ray).

TODOs

  • Get lint passing
  • Fix failing tests
  • Pull repeated pickle pyo3 code into macro
  • Misc. minor in-code TODOs (see diff)
  • Add test coverage in CI for supported tests
  • Add dev-facing documentation to new components
  • Misc. code clean up
  • Add support for Ray runner

…heduling translation, a Limit op, and other misc. tweaks.
@clarkzinzow clarkzinzow changed the title [WIP] [FEATURE] [New Query Planner] Logical --> physical translation, physical plan execution. [WIP] [FEAT] [New Query Planner] Logical --> physical translation, physical plan execution. Aug 4, 2023
@github-actions github-actions bot added the enhancement New feature or request label Aug 4, 2023
fs: fsspec.AbstractFileSystem | None = None,
) -> RayPartitionSet:
partition_refs = ray.get(
_glob_path_into_details_vpartitions.remote(source_paths, RayRunnerIO.FS_LISTING_SCHEMA, source_info, fs=fs)
_glob_path_into_details_vpartitions.remote(
source_paths, RayRunnerIO.FS_LISTING_SCHEMA, file_format_config, fs=fs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heads up, this didn't work for me when I tried the Ray runner, got this error:

(_glob_path_into_details_vpartitions pid=97917) 2023-08-04 11:05:37,459	ERROR serialization.py:371 -- type object 'FileFormatConfig' has no attribute 'from_parquet_config'
(_glob_path_into_details_vpartitions pid=97917) Traceback (most recent call last):
(_glob_path_into_details_vpartitions pid=97917)   File "/Users/charles/daft/venv/lib/python3.9/site-packages/ray/_private/serialization.py", line 369, in deserialize_objects
(_glob_path_into_details_vpartitions pid=97917)     obj = self._deserialize_object(data, metadata, object_ref)
(_glob_path_into_details_vpartitions pid=97917)   File "/Users/charles/daft/venv/lib/python3.9/site-packages/ray/_private/serialization.py", line 252, in _deserialize_object
(_glob_path_into_details_vpartitions pid=97917)     return self._deserialize_msgpack_data(data, metadata_fields)
(_glob_path_into_details_vpartitions pid=97917)   File "/Users/charles/daft/venv/lib/python3.9/site-packages/ray/_private/serialization.py", line 207, in _deserialize_msgpack_data
(_glob_path_into_details_vpartitions pid=97917)     python_objects = self._deserialize_pickle5_data(pickle5_data)
(_glob_path_into_details_vpartitions pid=97917)   File "/Users/charles/daft/venv/lib/python3.9/site-packages/ray/_private/serialization.py", line 197, in _deserialize_pickle5_data
(_glob_path_into_details_vpartitions pid=97917)     obj = pickle.loads(in_band)
(_glob_path_into_details_vpartitions pid=97917) AttributeError: type object 'FileFormatConfig' has no attribute 'from_parquet_config'

Pyrunner is working though so we can probably fix it later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah I haven't added anything for the Ray runner yet, I believe that we agreed that was out of scope for the e2e MVP.

@@ -648,8 +648,11 @@ def test_create_dataframe_parquet(valid_data: list[dict[str, float]], use_native
df = daft.read_parquet(f.name, use_native_downloader=use_native_downloader)
assert df.column_names == COL_NAMES

# df = df.where(daft.col("sepal_length") > 4.8)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove before merging?

}

#[cfg(feature = "python")]
impl PhysicalPlan {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts on implementing this part in python instead for brevity? we could write the methods in python and attach it to the rust class (or the python wrapper):

def to_partition_tasks(self) -> Iterator[PartitionTask]:
   ...

daft.daft.LogicalPlanBuilder.to_partition_tasks = to_partition_tasks

Copy link
Contributor Author

@clarkzinzow clarkzinzow Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts on implementing this part in python instead for brevity?

I briefly mentioned this in the PR description, but there are a few motivators for keeping at least the physical plan traversal in Rust, and maybe the calling of the physical_plan.py functions in Rust as well:

  1. We can gradually port the scheduling and execution logic for individual operators over from Python to Rust; I'm loosely thinking that we could create all-Rust-backed Instruction implementations + PartitionTask/Runner logic to execute a pipeline of such instructions without the GIL.
  2. By driving the Python-based physical plan scheduling from the Rust side, we avoid having to expose the Rust physical operators (or Python-oriented views of them) to Python, which would otherwise be a decent bit of pyo3 work.

(2) is the main driver since (1) is a bit hand-wavy at this point, but maybe there's a good middle ground here with a Python shim layer that's oriented to the Rust physical plan? We could then do away with a good bit of these pyo3 function calls to set up the arguments for the physical_plan.*() calls. For example, if we had something like

def filter(input: InProgressPhysicalPlan[PartitionT], predicate: PyExpr) -> InProgressPhysicalPlan[PartitionT]:
    filter_step = Filter(ExpressionsProjection([Expr._from_pyexpr(predicate)]))
    return physical_plan.pipeline_instruction(input, filter_step, ResourceRequest())

then the Rust-side would be reduced to:

            PhysicalPlan::Filter(Filter { input, predicate }) => {
                let upstream_iter = input.to_partition_tasks(py)?;
                let py_iter = py
                    .import(pyo3::intern!(py, "daft.execution.rust_physical_plan"))?
                    .getattr(pyo3::intern!(py, "filter"))?
                    .call1((upstream_iter, PyExpr::from(predicate.clone())))?;
                Ok(py_iter.into())
            }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh I see. This looks fine as-is for now, and the shim you're suggesting also sounds nice, maybe we can try that next week

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also shim as needed per op, so if a shim would be nicer for the Agg op, definitely feel free to add it!

@codecov
Copy link

codecov bot commented Aug 4, 2023

Codecov Report

Merging #1232 (697ab53) into main (839d18c) will decrease coverage by 0.37%.
Report is 1 commits behind head on main.
The diff coverage is 85.00%.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1232      +/-   ##
==========================================
- Coverage   88.04%   87.67%   -0.37%     
==========================================
  Files          56       56              
  Lines        5661     5698      +37     
==========================================
+ Hits         4984     4996      +12     
- Misses        677      702      +25     
Files Changed Coverage Δ
daft/context.py 79.54% <52.17%> (-1.02%) ⬇️
daft/logical/rust_logical_plan.py 84.61% <66.66%> (+20.97%) ⬆️
daft/logical/logical_plan.py 77.66% <78.57%> (-0.79%) ⬇️
daft/execution/execution_step.py 93.70% <86.66%> (-0.21%) ⬇️
daft/runners/runner_io.py 92.10% <90.00%> (+0.43%) ⬆️
daft/dataframe/dataframe.py 89.00% <100.00%> (+0.46%) ⬆️
daft/execution/physical_plan.py 94.60% <100.00%> (ø)
daft/execution/physical_plan_factory.py 93.22% <100.00%> (+0.11%) ⬆️
daft/expressions/expressions.py 91.83% <100.00%> (+0.04%) ⬆️
daft/filesystem.py 86.25% <100.00%> (+0.17%) ⬆️
... and 9 more

... and 2 files with indirect coverage changes

@clarkzinzow clarkzinzow changed the title [WIP] [FEAT] [New Query Planner] Logical --> physical translation, physical plan execution. [FEAT] [New Query Planner] Logical --> physical translation, physical plan execution. Aug 7, 2023
@clarkzinzow clarkzinzow merged commit f5837a2 into main Aug 7, 2023
@clarkzinzow clarkzinzow deleted the clark/physical-plan branch August 7, 2023 21:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants