-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12813][SQL] Eliminate serialization for back to back operations #10747
Conversation
/cc @cloud-fan |
Test build #49353 has finished for PR 10747 at commit
|
test this please |
Test build #49360 has finished for PR 10747 at commit
|
test("back to back MapPartitions") { | ||
val input = LocalRelation('_1.int, '_2.int) | ||
val plan = | ||
MapPartitions(func, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should have a test case that tests a plan that cannot be eliminated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, I guess I forgot to push it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a test here and and in end-to-end one in DatasetSuite now.
encoderFor[U], | ||
encoderFor[U].schema.toAttributes, | ||
logicalPlan)) | ||
MapPartitions[T, U](func, logicalPlan)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is different from the previous one, we only pass the type parameter T
to MapPartitions
and build a new encoder there which is unresolved, while before this PR we pass a resolvedTEncoder
. Do we break the life cycle of encoder in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just pushing the lifecycle of the encoder into the analyzer / physical operators where it belongs.
Test build #49404 has finished for PR 10747 at commit
|
input: Expression, | ||
serializer: Seq[NamedExpression], | ||
child: LogicalPlan) extends UnaryNode with ObjectOperator { | ||
override def output: Seq[Attribute] = serializer.map(_.toAttribute) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just use serializer.map(_.toAttribute.newInstance)
here? then we don't need to add NamedExpression.newInstance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would return different expressionIds anytime the function was called. Where as we want to fix the expression IDs when the NamedExpression is created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes
Test build #49417 has finished for PR 10747 at commit
|
@@ -31,7 +31,7 @@ import org.apache.spark.sql.types._ | |||
case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean) | |||
extends LeafExpression with NamedExpression { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated question: why BoundReference
extends NamedExpression
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its kinda of a hack, but sometimes after transforms we end up with BoundReferences in the place of fields that were AttributeReference
and so there were class cast exceptions. We might be able to remove this some day or now?
LGTM |
Thanks for reviewing! Merging to master. |
The goal of this PR is to eliminate unnecessary translations when there are back-to-back
MapPartitions
operations. In order to achieve this I also made the following simplifications:Deferred to a follow up PR:
as
operation.EliminateSerialization