Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12321][SQL] JSON format for TreeNode (use reflection) #10311

Closed
wants to merge 22 commits into from

Conversation

cloud-fan
Copy link
Contributor

An alternative solution for #10295 , instead of implementing json format for all logical/physical plans and expressions, use reflection to implement it in TreeNode.

Here I use pre-order traversal to flattern a plan tree to a plan list, and add an extra field num-children to each plan node, so that we can reconstruct the tree from the list.

example json:

logical plan tree:

[ {
  "class" : "org.apache.spark.sql.catalyst.plans.logical.Sort",
  "num-children" : 1,
  "order" : [ [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.SortOrder",
    "num-children" : 1,
    "child" : 0,
    "direction" : "Ascending"
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    "num-children" : 0,
    "name" : "i",
    "dataType" : "integer",
    "nullable" : true,
    "metadata" : { },
    "exprId" : {
      "id" : 10,
      "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6"
    },
    "qualifiers" : [ ]
  } ] ],
  "global" : false,
  "child" : 0
}, {
  "class" : "org.apache.spark.sql.catalyst.plans.logical.Project",
  "num-children" : 1,
  "projectList" : [ [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.Alias",
    "num-children" : 1,
    "child" : 0,
    "name" : "i",
    "exprId" : {
      "id" : 10,
      "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6"
    },
    "qualifiers" : [ ]
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.Add",
    "num-children" : 2,
    "left" : 0,
    "right" : 1
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    "num-children" : 0,
    "name" : "a",
    "dataType" : "integer",
    "nullable" : true,
    "metadata" : { },
    "exprId" : {
      "id" : 0,
      "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6"
    },
    "qualifiers" : [ ]
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.Literal",
    "num-children" : 0,
    "value" : "1",
    "dataType" : "integer"
  } ], [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.Alias",
    "num-children" : 1,
    "child" : 0,
    "name" : "j",
    "exprId" : {
      "id" : 11,
      "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6"
    },
    "qualifiers" : [ ]
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.Multiply",
    "num-children" : 2,
    "left" : 0,
    "right" : 1
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    "num-children" : 0,
    "name" : "a",
    "dataType" : "integer",
    "nullable" : true,
    "metadata" : { },
    "exprId" : {
      "id" : 0,
      "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6"
    },
    "qualifiers" : [ ]
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.Literal",
    "num-children" : 0,
    "value" : "2",
    "dataType" : "integer"
  } ] ],
  "child" : 0
}, {
  "class" : "org.apache.spark.sql.catalyst.plans.logical.LocalRelation",
  "num-children" : 0,
  "output" : [ [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    "num-children" : 0,
    "name" : "a",
    "dataType" : "integer",
    "nullable" : true,
    "metadata" : { },
    "exprId" : {
      "id" : 0,
      "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6"
    },
    "qualifiers" : [ ]
  } ] ],
  "data" : [ ]
} ]

@cloud-fan
Copy link
Contributor Author

I also tried to follow ScalaRelection to implement this, but found it's realy hard to obtain the TypeTag for concrete TreeNode subclass, without changing a lot of code.

cc @marmbrus @rxin @nongli

@SparkQA
Copy link

SparkQA commented Dec 15, 2015

Test build #47742 has finished for PR 10311 at commit 138f136.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 15, 2015

Test build #47743 has finished for PR 10311 at commit 548d198.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

Scala always puts the parameter names in the scalasig and you can get them from the Type (which they describe how to get here)

  def parameterNames: Seq[String] = {
    import scala.reflect.runtime.universe._
    val m = runtimeMirror(this.getClass.getClassLoader)
    val classSymbol = m.staticClass(this.getClass.getName)
    val t = classSymbol.selfType
    val formalTypeArgs = t.typeSymbol.asClass.typeParams
    val TypeRef(_, _, actualTypeArgs) = t
    val constructorSymbol = t.member(nme.CONSTRUCTOR)
    val params = if (constructorSymbol.isMethod) {
      constructorSymbol.asMethod.paramss
    } else {
      // Find the primary constructor, and use its parameter ordering.
      val primaryConstructorSymbol: Option[Symbol] = constructorSymbol.asTerm.alternatives.find(
        s => s.isMethod && s.asMethod.isPrimaryConstructor)
      if (primaryConstructorSymbol.isEmpty) {
        sys.error("Internal SQL error: Product object did not have a primary constructor.")
      } else {
        primaryConstructorSymbol.get.asMethod.paramss
      }
    }

    params.head.map(_.name.toString)
  }
scala> sql("SELECT 1").queryExecution.logical.parameterNames
res1: Seq[String] = List(projectList, child)

Note thats just a quick draft and you could probably simplify it (or pull this out into a utility since I think we do it in several places now).

@marmbrus
Copy link
Contributor

Other small comments:

  • instead of node-name I would probably call it class and use the fully qualified classname.
  • I think that the best test would be the ability to round trip plans (serialize them and then turn them back into objects) and then check equality with the original. If you put this as part of checkAnswer then you'll trivially make it impossible for anyone to ever add nodes where this ability is broken (assuming they write any tests for it). You could also consider adding this as a default listener when we initialize a test sql context. It will be important to print a good error message when this fails so that people understand why their test is failing.

}
}

params.flatten
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for curried constructor, they have more than 1 parameter lists and we should combine them.

}
}

params.flatten.map { p =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for curried constructor, they have more than 1 parameter lists and we should flatten and combine them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you double check that this doesn't cause problems with inner classes (like in the REPL).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked, this doesn't work for inner classes. So if a user creates an inner product class and use it as part of a TreeNode, we can't deserialize it from JSON.

The problem is not about this flatten, but we missing an extra parameter: the outer point. I think we can fix it later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I was only asking to make sure we weren't breaking our inner class handling that we do for datasets since I think we are now sharing this code.

@SparkQA
Copy link

SparkQA commented Dec 16, 2015

Test build #47819 has finished for PR 10311 at commit 6b293ba.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 16, 2015

Test build #47837 has finished for PR 10311 at commit b1c6187.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 16, 2015

Test build #47826 has finished for PR 10311 at commit 1681a11.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

var logicalRelations = logicalPlan.collect { case l: LogicalRelation => l }
try {
val jsonBackPlan = fromJSON(toJSON(logicalPlan)).asInstanceOf[LogicalPlan]
val normalized = jsonBackPlan transformDown {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment why you need to do this? (i.e. RDDs/data are serializable to JSON)

@marmbrus
Copy link
Contributor

This is looking really good!

@SparkQA
Copy link

SparkQA commented Dec 17, 2015

Test build #47927 has finished for PR 10311 at commit 3f87e65.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Statistics}

object TreeNodeJsonFormatter {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this an object instead of inlining the code in TreeNode or making it a trait, because we may need to access some classes in core or hive module to special handle them, and then we may have to move it to other module.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd really like to avoid moving them out. Better would be to have a trait JsonSerializable that we can mix in or something. We should also figure out if we really need to be able to serialize them or not. For leaf nodes I think grabbing the schema and some details would be sufficient.

@cloud-fan
Copy link
Contributor Author

now it can pass all tests in the sql module!

@SparkQA
Copy link

SparkQA commented Dec 18, 2015

Test build #47980 has finished for PR 10311 at commit 5dad7ec.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

It's weird that I can't reproduce the failed tests reported by jenkins locally. Adding better error message to see what's happening.

@SparkQA
Copy link

SparkQA commented Dec 18, 2015

Test build #48012 has finished for PR 10311 at commit 297e890.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public class JavaTwitterHashTagJoinSentiments\n

@SparkQA
Copy link

SparkQA commented Dec 18, 2015

Test build #48014 has finished for PR 10311 at commit ed44741.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 18, 2015

Test build #48026 has finished for PR 10311 at commit e9accee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -463,4 +479,244 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}
s"$nodeName(${args.mkString(",")})"
}

def toJSON: String = {
pretty(render(jsonValue))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't want this to be pretty by default. Thats going to make it harder to parse in Spark SQL.

@marmbrus
Copy link
Contributor

Only one minor comment, otherwise LGTM.

@SparkQA
Copy link

SparkQA commented Dec 18, 2015

Test build #48027 has finished for PR 10311 at commit 2e19939.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 19, 2015

Test build #48051 has finished for PR 10311 at commit 6d82ede.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

Thanks, merging to master.

@asfgit asfgit closed this in 7634fe9 Dec 21, 2015
@cloud-fan cloud-fan deleted the toJson-reflection branch December 22, 2015 00:39
@JoshRosen
Copy link
Contributor

It looks like this patch may have introduced some rare, non-deterministic flakiness into certain tests. See https://spark-tests.appspot.com/tests/org.apache.spark.sql.hive.execution.SQLQuerySuite/udf_java_method, for example, which has two examples of cases where logical plans could not be parsed to JSON: https://spark-tests.appspot.com/test-logs/33413063

@cloud-fan
Copy link
Contributor Author

hi @JoshRosen , I opend #10430 for hot fix, if you think this is emergent. I'll spend more time to come up with a proper fix.

@scwf
Copy link
Contributor

scwf commented Dec 22, 2015

Hi @cloud-fan can you explain in which cases we can use this feature or the motivation for this?

@marmbrus
Copy link
Contributor

@scwf the goal here is to be able to do later analysis of queries that users are running with Spark / Spark SQL.

@scwf
Copy link
Contributor

scwf commented Dec 23, 2015

Get it thanks @marmbrus :)

asfgit pushed a commit that referenced this pull request Dec 28, 2015
#10311 introduces some rare, non-deterministic flakiness for hive udf tests, see #10311 (comment)

I can't reproduce it locally, and may need more time to investigate, a quick solution is: bypass hive tests for json serialization.

Author: Wenchen Fan <[email protected]>

Closes #10430 from cloud-fan/hot-fix.
marmbrus pushed a commit to marmbrus/spark that referenced this pull request Jan 7, 2016
An alternative solution for apache#10295 , instead of implementing json format for all logical/physical plans and expressions, use reflection to implement it in `TreeNode`.

Here I use pre-order traversal to flattern a plan tree to a plan list, and add an extra field `num-children` to each plan node, so that we can reconstruct the tree from the list.

example json:

logical plan tree:
```
[ {
  "class" : "org.apache.spark.sql.catalyst.plans.logical.Sort",
  "num-children" : 1,
  "order" : [ [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.SortOrder",
    "num-children" : 1,
    "child" : 0,
    "direction" : "Ascending"
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    "num-children" : 0,
    "name" : "i",
    "dataType" : "integer",
    "nullable" : true,
    "metadata" : { },
    "exprId" : {
      "id" : 10,
      "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6"
    },
    "qualifiers" : [ ]
  } ] ],
  "global" : false,
  "child" : 0
}, {
  "class" : "org.apache.spark.sql.catalyst.plans.logical.Project",
  "num-children" : 1,
  "projectList" : [ [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.Alias",
    "num-children" : 1,
    "child" : 0,
    "name" : "i",
    "exprId" : {
      "id" : 10,
      "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6"
    },
    "qualifiers" : [ ]
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.Add",
    "num-children" : 2,
    "left" : 0,
    "right" : 1
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    "num-children" : 0,
    "name" : "a",
    "dataType" : "integer",
    "nullable" : true,
    "metadata" : { },
    "exprId" : {
      "id" : 0,
      "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6"
    },
    "qualifiers" : [ ]
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.Literal",
    "num-children" : 0,
    "value" : "1",
    "dataType" : "integer"
  } ], [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.Alias",
    "num-children" : 1,
    "child" : 0,
    "name" : "j",
    "exprId" : {
      "id" : 11,
      "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6"
    },
    "qualifiers" : [ ]
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.Multiply",
    "num-children" : 2,
    "left" : 0,
    "right" : 1
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    "num-children" : 0,
    "name" : "a",
    "dataType" : "integer",
    "nullable" : true,
    "metadata" : { },
    "exprId" : {
      "id" : 0,
      "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6"
    },
    "qualifiers" : [ ]
  }, {
    "class" : "org.apache.spark.sql.catalyst.expressions.Literal",
    "num-children" : 0,
    "value" : "2",
    "dataType" : "integer"
  } ] ],
  "child" : 0
}, {
  "class" : "org.apache.spark.sql.catalyst.plans.logical.LocalRelation",
  "num-children" : 0,
  "output" : [ [ {
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference",
    "num-children" : 0,
    "name" : "a",
    "dataType" : "integer",
    "nullable" : true,
    "metadata" : { },
    "exprId" : {
      "id" : 0,
      "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6"
    },
    "qualifiers" : [ ]
  } ] ],
  "data" : [ ]
} ]
```

Author: Wenchen Fan <[email protected]>

Closes apache#10311 from cloud-fan/toJson-reflection.
asfgit pushed a commit that referenced this pull request Jan 13, 2016
#10311 introduces some rare, non-deterministic flakiness for hive udf tests, see #10311 (comment)

I can't reproduce it locally, and may need more time to investigate, a quick solution is: bypass hive tests for json serialization.

Author: Wenchen Fan <[email protected]>

Closes #10430 from cloud-fan/hot-fix.

(cherry picked from commit 8543997)
Signed-off-by: Michael Armbrust <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants