Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-29221][SQL] LocalTableScanExec: handle the case where executors are accessing "null" rows #25913

Closed
wants to merge 2 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This patch proposes to fix the flaky test in SQLQueryTestSuite.sql (subquery/scalar-subquery/scalar-subquery-select.sql), which seems to throw NPE in executor side.

Both rows and unsafeRows in LocalTableScanExec are defined as @transient assuming that they're not needed in executors, but according to the error message and stack trace, at least rows has a path to be accessed from executors. unsafeRows will also throw NPE when executors access the value, as it accesses rows which would be null - this patch fixes it altogether.

22:58:15.942 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in stage 20635.0 (TID 36041)
java.lang.NullPointerException
	at org.apache.spark.sql.execution.LocalTableScanExec.stringArgs(LocalTableScanExec.scala:62)
	at org.apache.spark.sql.catalyst.trees.TreeNode.argString(TreeNode.scala:506)
	at org.apache.spark.sql.catalyst.trees.TreeNode.simpleString(TreeNode.scala:534)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.simpleString(QueryPlan.scala:179)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:181)
	at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:647)
	at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:675)
	at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:675)
	at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:569)
	at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:559)
	at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:551)
	at org.apache.spark.sql.catalyst.trees.TreeNode.toString(TreeNode.scala:548)
	at org.apache.spark.sql.catalyst.errors.package$TreeNodeException.<init>(package.scala:36)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:436)
	at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:425)
	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:102)
	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:63)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:132)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:261)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.execution.SubqueryExec.doCanonicalize(basicPhysicalOperators.scala:765)
	at org.apache.spark.sql.execution.SubqueryExec.doCanonicalize(basicPhysicalOperators.scala:735)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:292)
	at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:74)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
	at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:137)
	at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
	at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
	at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
	at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
	at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
	at scala.collection.mutable.HashTable.findEntry(HashTable.scala:136)
	at scala.collection.mutable.HashTable.findEntry$(HashTable.scala:135)
	at scala.collection.mutable.HashMap.findEntry(HashMap.scala:44)
	at scala.collection.mutable.HashMap.get(HashMap.scala:74)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:54)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:95)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$subexpressionElimination$1(CodeGenerator.scala:1113)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$subexpressionElimination$1$adapted(CodeGenerator.scala:1113)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1113)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1165)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:289)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:337)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:326)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.createCodeGeneratedObject(Projection.scala:123)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.createCodeGeneratedObject(Projection.scala:119)
	at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:47)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:156)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:166)
	at org.apache.spark.sql.execution.ProjectExec.$anonfun$doExecute$1(basicPhysicalOperators.scala:75)
	at org.apache.spark.sql.execution.ProjectExec.$anonfun$doExecute$1$adapted(basicPhysicalOperators.scala:74)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:455)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Why are the changes needed?

This patch tries to fix a flaky test.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Will trigger CI build multiple times as build/sbt "~sql/test-only *SQLQueryTestSuite -- -z subquery/scalar-subquery/scalar-subquery-select.sql" doesn't fail in my local dev.

@HeartSaVioR
Copy link
Contributor Author

I'll remove [WIP] once the 5 sequential tests will not fail on SQLQueryTestSuite.sql (subquery/scalar-subquery/scalar-subquery-select.sql).

@HeartSaVioR
Copy link
Contributor Author

retest this, please

3 similar comments
@HeartSaVioR
Copy link
Contributor Author

retest this, please

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@HeartSaVioR HeartSaVioR changed the title [WIP][SPARK-29221][SQL] LocalTableScanExec: handle the case rows is "null" due to @transient [WIP][SPARK-29221][SQL] LocalTableScanExec: handle the case where executors are accessing "null" rows Sep 24, 2019
@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111273 has finished for PR 25913 at commit c3b59c6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 24, 2019

NPE got resolved in LocalTableScanExec, but further step triggers another exception:

sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: subquery/scalar-subquery/scalar-subquery-select.sql
Expected "struct<[min_t3d:bigint,max_t2h:timestamp]>", but got "struct<[]>" Schema did not match for query #3
SELECT (SELECT min(t3d) FROM t3) min_t3d,
       (SELECT max(t2h) FROM t2) max_t2h
FROM   t1
WHERE  t1a = 'val1c': QueryOutput(SELECT (SELECT min(t3d) FROM t3) min_t3d,
       (SELECT max(t2h) FROM t2) max_t2h
FROM   t1
WHERE  t1a = 'val1c',struct<>,org.apache.spark.sql.catalyst.errors.package$TreeNodeException
makeCopy, tree:
null)
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:528)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:527)
	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
	at org.scalatest.Assertions.assertResult(Assertions.scala:1003)
	at org.scalatest.Assertions.assertResult$(Assertions.scala:998)
	at org.scalatest.FunSuite.assertResult(FunSuite.scala:1560)
	at org.apache.spark.sql.SQLQueryTestSuite.$anonfun$runQueries$11(SQLQueryTestSuite.scala:382)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.SQLQueryTestSuite.$anonfun$runQueries$9(SQLQueryTestSuite.scala:377)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.Assertions.withClue(Assertions.scala:1221)
	at org.scalatest.Assertions.withClue$(Assertions.scala:1208)
	at org.scalatest.FunSuite.withClue(FunSuite.scala:1560)
	at org.apache.spark.sql.SQLQueryTestSuite.runQueries(SQLQueryTestSuite.scala:353)
	at org.apache.spark.sql.SQLQueryTestSuite.$anonfun$runTest$15(SQLQueryTestSuite.scala:276)
	at org.apache.spark.sql.SQLQueryTestSuite.$anonfun$runTest$15$adapted(SQLQueryTestSuite.scala:274)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.sql.SQLQueryTestSuite.runTest(SQLQueryTestSuite.scala:274)
	at org.apache.spark.sql.SQLQueryTestSuite.$anonfun$createScalaTestCase$5(SQLQueryTestSuite.scala:223)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
	at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
	at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
	at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
	at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:56)
	at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
	at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
	at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
	at org.scalatest.Suite.run(Suite.scala:1147)
	at org.scalatest.Suite.run$(Suite.scala:1129)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
	at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
	at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:56)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:507)
	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

and actual stack trace is

03:55:28.169 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in stage 20635.0 (TID 36041)
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
HashAggregate(keys=[], functions=[partial_min(t3d#143422L)], output=[min#143439L])
+- Project [t3d#143422L]
   +- LocalTableScan <unknown>, [t3a#143419, t3b#143420, t3c#143421, t3d#143422L, t3e#143423, t3f#143424, t3g#143425, t3h#143426, t3i#143427]

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:436)
	at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:425)
	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:102)
	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:63)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:132)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:261)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:259)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:259)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.execution.SubqueryExec.doCanonicalize(basicPhysicalOperators.scala:765)
	at org.apache.spark.sql.execution.SubqueryExec.doCanonicalize(basicPhysicalOperators.scala:735)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:245)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:244)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:292)
	at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:74)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
	at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:137)
	at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
	at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
	at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
	at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
	at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
	at scala.collection.mutable.HashTable.findEntry(HashTable.scala:136)
	at scala.collection.mutable.HashTable.findEntry$(HashTable.scala:135)
	at scala.collection.mutable.HashMap.findEntry(HashMap.scala:44)
	at scala.collection.mutable.HashMap.get(HashMap.scala:74)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:54)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:95)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$subexpressionElimination$1(CodeGenerator.scala:1113)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$subexpressionElimination$1$adapted(CodeGenerator.scala:1113)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1113)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1165)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:289)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:337)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:326)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.createCodeGeneratedObject(Projection.scala:123)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.createCodeGeneratedObject(Projection.scala:119)
	at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:47)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:156)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:166)
	at org.apache.spark.sql.execution.ProjectExec.$anonfun$doExecute$1(basicPhysicalOperators.scala:75)
	at org.apache.spark.sql.execution.ProjectExec.$anonfun$doExecute$1$adapted(basicPhysicalOperators.scala:74)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:837)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:837)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:327)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:291)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:455)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.GeneratedConstructorAccessor31.newInstance(Unknown Source)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$7(TreeNode.scala:469)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$1(TreeNode.scala:468)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 76 more
Caused by: java.lang.NullPointerException
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.<init>(HashAggregateExec.scala:96)
	... 83 more
03:55:31.480 ERROR org.apache.spark.sql.SQLQueryTestSuite: Error using configs: spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 24, 2019

I guess the approach should be changed, as there seems to be plenty of pieces in physical plans which are not safe to execute in executors. Something might seem to be changed recently.

// This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash
// map and/or the sort-based aggregation once it has processed a given number of input rows.
private val testFallbackStartsAt: Option[(Int, Int)] = {
sqlContext.getConf("spark.sql.TungstenAggregate.testFallbackStartsAt", null) match {
case null | "" => None
case fallbackStartsAt =>
val splits = fallbackStartsAt.split(",").map(_.trim)
Some((splits.head.toInt, splits.last.toInt))
}
}

abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {
/**
* A handle to the SQL Context that was used to create this plan. Since many operators need
* access to the sqlContext for RDD operations or configuration this field is automatically
* populated by the query planning infrastructure.
*/
@transient final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull
protected def sparkContext = sqlContext.sparkContext

def getActiveSession: Option[SparkSession] = {
if (TaskContext.get != null) {
// Return None when running on executors.
None
} else {
Option(activeThreadSession.get)
}
}

When values/methods in SparkPlan are referenced in executor side, sqlContext in SparkPlan would be null (regardless of @transient), and calling sparkContext in SparkPlan would throw NPE. Thus, HashAggregateExec cannot be initialized in executor side. Same applies on using sqlContext or sparkContext while initializing physical node. Not sure we want to fix this up via changing them to return Option. (I agree that's just a band-aid.)

@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111274 has finished for PR 25913 at commit c3b59c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111275 has finished for PR 25913 at commit c3b59c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111276 has finished for PR 25913 at commit c3b59c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -37,7 +37,7 @@ case class LocalTableScanExec(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

@transient private lazy val unsafeRows: Array[InternalRow] = {
if (rows.isEmpty) {
if (rows == null || rows.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot remove @transient instead of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding transient is to avoid serializing and copying whole rows, so I guess it's preferred one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related commits are here:
256358f
f70f46d

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I'm a bit worried that this value could be different between a driver and executors. Which code touches this variable in executors? What's the stacktrace for unsafeRows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh, that's actually a kind of defensive programming, as it would bring NPE if there's no check for null. I haven't seen any actual stack trace to refer to unsafeRows.

Copy link
Member

@maropu maropu Sep 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think unsafeRows is not expected to be called in executor sides, so we need to check assert(rows != null) instead of the current approach. If it fails then, we need to fix some code not to call this variable in executors.

@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111277 has finished for PR 25913 at commit c3b59c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

It would be nice if someone in Spark SQL expert in depth could spend some time to investigate origin issue. I could apply sequential of band-aids until the further error message is gone, but I feel there might be another root issue on this.

@HeartSaVioR
Copy link
Contributor Author

Just updated the observation of second kind of error pattern.
#25913 (comment)

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 24, 2019

Triggering new 3 builds to see the chance of encountering new occurrence.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111289 has started for PR 25913 at commit c3b59c6.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111291 has started for PR 25913 at commit c3b59c6.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 24, 2019

Test build #111292 has started for PR 25913 at commit c3b59c6.

@HeartSaVioR
Copy link
Contributor Author

OK all three builds were cancelled. I'll just retrigger these builds.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@HeartSaVioR
Copy link
Contributor Author

Just reported the build failure to dev. mailing list as it seems to be env. issue. Not all PR builds have been failed, so worth retriggering.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 24, 2019

cc. @gatorsmile @cloud-fan @viirya to kindly ask for help to investigate actual cause.

if (rows.isEmpty) {
if (rows == null) {
Iterator("<unknown>", output)
} else if (rows.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to return different values in a driver and executors? How about computing this value in a driver then passing into executors?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Sep 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value except output should be meaningless, otherwise assumption in related comments goes wrong and we should serialize the rows. I guess that's only for visual.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 24, 2019

I'm wondering why the test fails intermittently, as the stack trace should always bring NPE. My bet is that it goes to the conditional case and trigger this.

I'll try to add some more band-aids, but honestly experts are needed to deep dive into the root cause.

EDIT: sqlContext and sparkContext are being used from so many plans without any checking with null, which would just throw NPE if expressions are executed in executor side. It's not possible to determine all the cases to classify whether they are called in driver and/or executor - even we classify once for now, it can be easily broken again. Finding root cause seems to be required.

@SparkQA
Copy link

SparkQA commented Sep 25, 2019

Test build #111309 has finished for PR 25913 at commit c3b59c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 25, 2019

Test build #111312 has finished for PR 25913 at commit c3b59c6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 25, 2019

Test build #111311 has finished for PR 25913 at commit 7079481.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 25, 2019

Test build #111315 has finished for PR 25913 at commit 0d9eab9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Sep 25, 2019

Submitted #25925 to fix it.

@HeartSaVioR
Copy link
Contributor Author

@viirya
Great! Thanks for submitting a patch. I'll close the PR.

I assume #25925 also resolves the issue newly discovered one, NPE on HashAggregateExec<init> as the origin cause looks to be same.

@maropu
Copy link
Member

maropu commented Sep 26, 2019

Anyway, nice catch, @HeartSaVioR !

@HeartSaVioR
Copy link
Contributor Author

Happy to help! Thanks again @viirya to deal with root cause. :)

@HeartSaVioR HeartSaVioR deleted the SPARK-29221 branch September 26, 2019 06:23
@viirya
Copy link
Member

viirya commented Sep 26, 2019

Thanks @HeartSaVioR for investigating this issue!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants