-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add unnest_column to DataFrame #5106
Conversation
Thanks @vincev -- I hope someone else will be able to review this PR, otherwise I hope to have time to do so this weekend or early next week. |
I plan to review this PR tomorrow |
Thank you @alamb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @vincev -- this was a pleasure to review. I think the code is well commented, tested and structured.
I went through this code carefully and I think we could improve the performance of the physical operator, but that can also be done as a follow on PR when better performance is desired.
// Update schema with unnested column type. | ||
let input = Arc::new(inputs[0].clone()); | ||
let nested_field = input.schema().field_from_column(column)?; | ||
let unnested_field = schema.field_from_column(column)?; | ||
let fields = input | ||
.schema() | ||
.fields() | ||
.iter() | ||
.map(|f| { | ||
if f == nested_field { | ||
unnested_field.clone() | ||
} else { | ||
f.clone() | ||
} | ||
}) | ||
.collect::<Vec<_>>(); | ||
|
||
let schema = Arc::new(DFSchema::new_with_metadata( | ||
fields, | ||
input.schema().metadata().clone(), | ||
)?); | ||
|
||
Ok(LogicalPlan::Unnest(Unnest { | ||
input, | ||
column: column.clone(), | ||
schema, | ||
})) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the intent of from_plan
is to be mechanical construction rather than potentially changing the schema, so I was surprised this code is necessary.
However, when I tried changing this code to be something more like
Ok(LogicalPlan::Unnest(Unnest {
input: inputs[0].clone().into(),
column: column.clone(),
schema: schema.clone(),
}))
The test fails:
---- unnest_columns stdout ----
Error: Internal("PhysicalExpr Column references column 'tags' at index 2 (zero-based) but input schema only has 1 columns: [\"tags\"]")
failures:
unnest_columns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, I had to add this code because tests were failing due to a projection push-down when calling an aggregate function like count. At this point the child schema has only the count field while the unnest schema has all the fields in the initial projection.
I'll plan to merge this PR later today or tomorrow unless there are other comments or people need more time to review |
Thank you @alamb |
Benchmark runs are scheduled for baseline = 916ec5d and contender = 0dfc66d. 0dfc66d is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
This PR introduces changes to
DataFrame
to add support for unnest/explode functionality as requested in #212.I would like to get feedback on these changes and then create another PR to try to add support to the SQL parser.
Rationale for this change
This PR add an
unnest_column
method toDataFrame
to unnest list types columns (see tests), given the following data frame:The call
df.unnest_column("tags")
produces:calling
df.unnest_column("points")
produces:and calling
df.unnest_column("points").unnest_column("tags")
produces:What changes are included in this PR?
This PR add the following changes:
unnest_column
toDataFrame
Unnest
variant toLogicalPlan
that produces a new schema for the unnested columnUnnestExec
to the execution planDataFrame
for unnest operation and aggregate count on unnested columnLogicalPlan
Are these changes tested?
Added some initial tests for
DataFrame
here, and forLogicalPlan
here I am happy to add more tests following feedback.I also run some test on parquet files with 1.9M rows that unnest to 15M:
Are there any user-facing changes?
Add an
unnest_column
method toDataFrame
.