-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CHORE] Begin integrating Rust Logical Plan with Dataframe API #1207
Conversation
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #1207 +/- ##
==========================================
- Coverage 88.41% 88.10% -0.32%
==========================================
Files 55 56 +1
Lines 5620 5656 +36
==========================================
+ Hits 4969 4983 +14
- Misses 651 673 +22
|
@clarkzinzow I ended up moving the file type (parquet) directly into the SourceInfo enum. This avoids creating a new enum and should be cleaner, I hope |
@@ -65,7 +65,7 @@ def __init__(self, plan: logical_plan.LogicalPlan) -> None: | |||
Args: | |||
plan: LogicalPlan describing the steps required to arrive at this DataFrame | |||
""" | |||
if not isinstance(plan, logical_plan.LogicalPlan): | |||
if not isinstance(plan, (logical_plan.LogicalPlan, rust_logical_plan.RustLogicalPlanBuilder)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change the type annotation of the plan
arg and the _plan
property accessor to be a union of LogicalPlan
and RustLogicalPlanBuilder
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline; LogicalPlan
has too many methods so Union
will not work out of the box
context = get_context() | ||
|
||
if context.use_rust_planner: | ||
plan = cast( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of this type casting, we could make the DataFrame
constructor take a Union[LogicalPlan, RustLogicalPlanBuilder]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Discussed offline; same as above)
) | ||
context = get_context() | ||
|
||
if context.use_rust_planner: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some quick questions about the current "switching between all-Python logical plan vs. Rust-based logical plan builder" setup.
We previously talked about exposing a LogicalPlanBuilder
interface that would have two implementations, the all-Python query planner (PyLogicalPlanBuilder
) and the new Rust query planner (RustLogicalPlanBuilder
), where the DataFrame
API layer would be rewritten to use that LogicalPlanBuilder
interface and we could limit the number of places we'd need to switch on context.use_rust_planner
. E.g. the DataFrame
constructor would take a LogicalPlanBuilder
instance instead of a union of the all-Python LogicalPlan
and the RustLogicalPlanBuilder
:
def __init__(self, builder: LogicalPlanBuilder) -> None:
if not isinstance(builder, LogicalPlanBuilder):
if isinstance(builder, dict):
raise ValueError(
f"DataFrames should be constructed with a dictionary of columns using `daft.from_pydict`"
)
if isinstance(builder, list):
raise ValueError(
f"DataFrames should be constructed with a list of dictionaries using `daft.from_pylist`"
)
raise ValueError(f"Expected DataFrame to be constructed with a LogicalPlanBuilder, received: {builder}")
self._builder = builder
self._result_cache: Optional[PartitionCacheEntry] = None
self._preview = DataFramePreview(preview_partition=None, dataframe_num_rows=None)
And read_parquet()
would look something like this:
def read_parquet(
path: Union[str, List[str]],
schema_hints: Optional[Dict[str, DataType]] = None,
fs: Optional[fsspec.AbstractFileSystem] = None,
io_config: Optional["IOConfig"] = None,
use_native_downloader: bool = False,
) -> DataFrame:
if isinstance(path, list) and len(path) == 0:
raise ValueError(f"Cannot read DataFrame from from empty list of Parquet filepaths")
context = get_context()
# This could eventually be reduced to context.new_logical_plan_builder().
builder = RustLogicalPlanBuilder() if context.use_rust_planner else PyLogicalPlanBuilder()
new_builder = builder.scan(
path,
schema_hints,
ParquetSourceInfo(
io_config=io_config,
use_native_downloader=use_native_downloader,
),
fs,
)
return DataFrame(new_builder)
This should end up being a good bit cleaner than imperatively switching between implementations within each DataFrame
API function/method, and we should be able to line up the builder interfaces (such as builder.scan()
or builder.filter()
) since they're adding the same logical op to each underlying logical plan implementation. This does, however, require more upfront changes to the DataFrame
implementation, since each method would need to be ported to the PyLogicalPlanBuilder
interface (although this should be straightforward).
Do you agree that this is a better long-term approach, and if so, are you thinking that deferring this refactor is the best choice for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline --
Do you agree that this is a better long-term approach, and if so, are you thinking that deferring this refactor is the best choice for now?
yes and yes, to get rust plan creation from dataframes in our hands ASAP
filepaths, | ||
schema.schema.clone(), | ||
)); | ||
pub fn read_parquet(filepaths: Vec<String>, schema: &PySchema) -> PyResult<LogicalPlanBuilder> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Instead of having a LogicalPlanBuilder
method per read API (e.g. for reading Parquets, CSVs, JSONs, etc.), we could still have a source
method that also takes a FileFormat
enum variant, which would then be incorporated into the SourceInfo
.
On second thought, making the LogicalPlanBuilder
methods 1:1 with the DataFrame
APIs makes more sense to me than 1:1 with the logical operators, since the former leaks less representational details to the Python side and makes the builder abstraction more useful so keeping format-specific read_*
methods seems like the best choice to me! E.g. it would also allow us to hide a FileFormat
enum from the Python side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the former leaks less representational details to the Python side and makes the builder abstraction more useful
yeah, this was what I was hoping for as well!
src/daft-plan/src/source_info.rs
Outdated
@@ -1,24 +1,24 @@ | |||
use daft_core::schema::SchemaRef; | |||
|
|||
pub enum SourceInfo { | |||
FilesInfo(FilesInfo), | |||
ParquetFilesInfo(ParquetFilesInfo), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Until there's deviation in what data we're holding in the SourceInfo
structs across Parquet, CSV, JSON, etc. file types, I think that we should keep it as a FilesInfo
enum variant containing a FilesInfo
struct that contains an additional FileFormat
enum:
pub enum SourceInfo {
FilesInfo(FilesInfo),
}
impl SourceInfo {
pub fn schema(&self) -> SchemaRef {
use SourceInfo::*;
match self {
FilesInfo(files_info) => files_info.schema.clone(),
}
}
}
pub enum FileFormat {
Parquet,
Csv,
Json,
}
pub struct FilesInfo {
pub filepaths: Vec<String>, // TODO: pull in some sort of URL crate for this
pub file_format: FileFormat,
pub schema: SchemaRef,
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
daft.read_parquet
now creates a Rust logical plan (whenDAFT_DEVELOPER_RUST_QUERY_PLANNER=1
is set).This involves: