Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12813][SQL] Eliminate serialization for back to back operations #10747

Closed
wants to merge 5 commits into from

Conversation

marmbrus
Copy link
Contributor

The goal of this PR is to eliminate unnecessary translations when there are back-to-back MapPartitions operations. In order to achieve this I also made the following simplifications:

  • Operators no longer have hold encoders, instead they have only the expressions that they need. The benefits here are twofold: the expressions are visible to transformations so go through the normal resolution/binding process. now that they are visible we can change them on a case by case basis.
  • Operators no longer have type parameters. Since the engine is responsible for its own type checking, having the types visible to the complier was an unnecessary complication. We still leverage the scala compiler in the companion factory when constructing a new operator, but after this the types are discarded.

Deferred to a follow up PR:

  • Remove as much of the resolution/binding from Dataset/GroupedDataset as possible. We should still eagerly check resolution and throw an error though in the case of mismatches for an as operation.
  • Eliminate serializations in more cases by adding more cases to EliminateSerialization

@marmbrus
Copy link
Contributor Author

/cc @cloud-fan

@SparkQA
Copy link

SparkQA commented Jan 14, 2016

Test build #49353 has finished for PR 10747 at commit 4615c96.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait ObjectOperator extends LogicalPlan
    • case class MapPartitions(
    • case class AppendColumns(
    • case class MapGroups(
    • case class CoGroup(
    • trait ObjectOperator extends SparkPlan
    • case class MapPartitions(
    • case class AppendColumns(
    • case class MapGroups(
    • case class CoGroup(

@marmbrus
Copy link
Contributor Author

test this please

@SparkQA
Copy link

SparkQA commented Jan 14, 2016

Test build #49360 has finished for PR 10747 at commit ee7f3c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

test("back to back MapPartitions") {
val input = LocalRelation('_1.int, '_2.int)
val plan =
MapPartitions(func,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should have a test case that tests a plan that cannot be eliminated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, I guess I forgot to push it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a test here and and in end-to-end one in DatasetSuite now.

encoderFor[U],
encoderFor[U].schema.toAttributes,
logicalPlan))
MapPartitions[T, U](func, logicalPlan))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different from the previous one, we only pass the type parameter T to MapPartitions and build a new encoder there which is unresolved, while before this PR we pass a resolvedTEncoder. Do we break the life cycle of encoder in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just pushing the lifecycle of the encoder into the analyzer / physical operators where it belongs.

@SparkQA
Copy link

SparkQA commented Jan 14, 2016

Test build #49404 has finished for PR 10747 at commit ecde6e5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

input: Expression,
serializer: Seq[NamedExpression],
child: LogicalPlan) extends UnaryNode with ObjectOperator {
override def output: Seq[Attribute] = serializer.map(_.toAttribute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just use serializer.map(_.toAttribute.newInstance) here? then we don't need to add NamedExpression.newInstance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would return different expressionIds anytime the function was called. Where as we want to fix the expression IDs when the NamedExpression is created.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes

@SparkQA
Copy link

SparkQA commented Jan 15, 2016

Test build #49417 has finished for PR 10747 at commit c34aacf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -31,7 +31,7 @@ import org.apache.spark.sql.types._
case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean)
extends LeafExpression with NamedExpression {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated question: why BoundReference extends NamedExpression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its kinda of a hack, but sometimes after transforms we end up with BoundReferences in the place of fields that were AttributeReference and so there were class cast exceptions. We might be able to remove this some day or now?

@cloud-fan
Copy link
Contributor

LGTM

@marmbrus
Copy link
Contributor Author

Thanks for reviewing! Merging to master.

@asfgit asfgit closed this in cc7af86 Jan 15, 2016
@marmbrus marmbrus deleted the encoderExpressions branch March 8, 2016 00:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants