Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] isAdaptiveContext null value for ColumnarOverrideRules #3801

Closed
zhli1142015 opened this issue Nov 21, 2023 · 0 comments · Fixed by #3795
Closed

[VL] isAdaptiveContext null value for ColumnarOverrideRules #3801

zhli1142015 opened this issue Nov 21, 2023 · 0 comments · Fixed by #3795
Labels
bug Something isn't working triage

Comments

@zhli1142015
Copy link
Contributor

zhli1142015 commented Nov 21, 2023

Backend

VL (Velox)

Bug description

This issue is found from delta test case(below), the reason is the nested ColumnarOverrideRules evaluation. The local data of ColumnarOverrideRules is overridden in nested evaluation.

Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value").write.format("delta").mode("overwrite").save("/tmp/veloxtest5")
spark.conf.set("spark.databricks.delta.stats.skipping", "false")
spark.read.format("delta").load("/tmp/veloxtest5").filter(col("value") === 4 and col("key") === 3).collect.toSeq

Error:

java.lang.IllegalArgumentException: For input string: "null"
  at scala.collection.immutable.StringLike.parseBoolean(StringLike.scala:330)

  at scala.collection.immutable.StringLike.toBoolean(StringLike.scala:289)

  at scala.collection.immutable.StringLike.toBoolean$(StringLike.scala:289)

  at scala.collection.immutable.StringOps.toBoolean(StringOps.scala:33)

  at io.glutenproject.extension.ColumnarOverrideRules.isAdaptiveContext(ColumnarOverrides.scala:799)

  at io.glutenproject.extension.ColumnarOverrideRules.$anonfun$preOverrides$4(ColumnarOverrides.scala:844)

  at io.glutenproject.extension.ColumnarOverrideRules.$anonfun$transformPlan$3(ColumnarOverrides.scala:904)

  at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)

  at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)

  at scala.collection.immutable.List.foldLeft(List.scala:91)

  at io.glutenproject.extension.ColumnarOverrideRules.$anonfun$transformPlan$1(ColumnarOverrides.scala:903)

Detail log:

23/11/21 13:28:07 WARN PhysicalPlanSelector: shouldUseGluten: true
23/11/21 13:28:07 WARN PhysicalPlanSelector: =========================
running shouldUseGluten from:
io.glutenproject.utils.QueryPlanSelector.maybe(QueryPlanSelector.scala:72)
io.glutenproject.extension.ColumnarOverrideRules.io$glutenproject$extension$ColumnarOverrideRules$$$anonfun$preColumnarTransitions$1(ColumnarOverrides.scala:839)
io.glutenproject.extension.ColumnarOverrideRules$$anonfun$preColumnarTransitions$4.apply(ColumnarOverrides.scala:838)
io.glutenproject.extension.ColumnarOverrideRules$$anonfun$preColumnarTransitions$4.apply(ColumnarOverrides.scala:838)
org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$1(Columnar.scala:553)
plan:
Filter (((isnotnull(value#1127) AND isnotnull(key#1126)) AND (value#1127 = 4)) AND (key#1126 = 3))
+- FileScan parquet [key#1126,value#1127] Batched: true, DataFilters: [isnotnull(value#1127), isnotnull(key#1126), (value#1127 = 4), (key#1126 = 3)], Format: Parquet, Location: TahoeLogFileIndex(1 paths)[file:/tmp/veloxtest5], PartitionFilters: [], PushedFilters: [IsNotNull(value), IsNotNull(key), EqualTo(value,4), EqualTo(key,3)], ReadSchema: structkey:int,value:int

=========================
23/11/21 13:28:07 WARN ColumnarOverrideRules: preColumnarTransitions ----------------------------->

=========================
23/11/21 13:28:08 WARN PhysicalPlanSelector: shouldUseGluten: true
23/11/21 13:28:08 WARN PhysicalPlanSelector: =========================
running shouldUseGluten from:
io.glutenproject.utils.QueryPlanSelector.maybe(QueryPlanSelector.scala:72)
io.glutenproject.extension.ColumnarOverrideRules.io$glutenproject$extension$ColumnarOverrideRules$$$anonfun$preColumnarTransitions$1(ColumnarOverrides.scala:839)
io.glutenproject.extension.ColumnarOverrideRules$$anonfun$preColumnarTransitions$4.apply(ColumnarOverrides.scala:838)
io.glutenproject.extension.ColumnarOverrideRules$$anonfun$preColumnarTransitions$4.apply(ColumnarOverrides.scala:838)
org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$1(Columnar.scala:553)
plan:
Project [path#1158, partitionValues#1159, size#1160L, modificationTime#1161L, dataChange#1162, null AS stats#1173, tags#1164]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).path, true, false, true) AS path#1158, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), true, false, true), knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).partitionValues) AS partitionValues#1159, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).size AS size#1160L, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).modificationTime AS modificationTime#1161L, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).dataChange AS dataChange#1162, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), true, false, true), knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).tags) AS tags#1164]
+- MapElements org.apache.spark.sql.Dataset$$Lambda$6862/1496757472@3fb77b28, obj#1157: org.apache.spark.sql.delta.actions.AddFile
+- DeserializeToObject newInstance(class scala.Tuple1), obj#1156: scala.Tuple1
+- Project [add#800]
+- Filter isnotnull(add#800)
+- Scan ExistingRDD Delta Table State #5 - file:///tmp/veloxtest5/_delta_log[txn#799,add#800,remove#801,metaData#802,protocol#803,cdc#804,commitInfo#805]

=========================
23/11/21 13:28:08 WARN ColumnarOverrideRules: preColumnarTransitions -------------------------------------> Local data is overridden.

23/11/21 13:28:08 WARN PhysicalPlanSelector: shouldUseGluten: true
23/11/21 13:28:08 WARN PhysicalPlanSelector: =========================
running shouldUseGluten from:
io.glutenproject.utils.QueryPlanSelector.maybe(QueryPlanSelector.scala:72)
io.glutenproject.extension.ColumnarOverrideRules.io$glutenproject$extension$ColumnarOverrideRules$$$anonfun$postColumnarTransitions$1(ColumnarOverrides.scala:847)
io.glutenproject.extension.ColumnarOverrideRules$$anonfun$postColumnarTransitions$4.apply(ColumnarOverrides.scala:846)
io.glutenproject.extension.ColumnarOverrideRules$$anonfun$postColumnarTransitions$4.apply(ColumnarOverrides.scala:846)
org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$2(Columnar.scala:555)
plan:
ColumnarToRow
+- ProjectExecTransformer [path#1158, partitionValues#1159, size#1160L, modificationTime#1161L, dataChange#1162, null AS stats#1173, tags#1164]
+- RowToColumnar
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).path, true, false, true) AS path#1158, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), true, false, true), knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).partitionValues) AS partitionValues#1159, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).size AS size#1160L, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).modificationTime AS modificationTime#1161L, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).dataChange AS dataChange#1162, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), true, false, true), knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).tags) AS tags#1164]
+- MapElements org.apache.spark.sql.Dataset$$Lambda$6862/1496757472@3fb77b28, obj#1157: org.apache.spark.sql.delta.actions.AddFile
+- DeserializeToObject newInstance(class scala.Tuple1), obj#1156: scala.Tuple1
+- ColumnarToRow
+- ProjectExecTransformer [add#800]
+- FilterExecTransformer isnotnull(add#800)
+- RowToColumnar
+- Scan ExistingRDD Delta Table State #5 - file:///tmp/veloxtest5/_delta_log[txn#799,add#800,remove#801,metaData#802,protocol#803,cdc#804,commitInfo#805]

=========================
23/11/21 13:28:08 WARN ColumnarOverrideRules: postColumnarTransitions -------------------->

23/11/21 13:28:08 WARN PhysicalPlanSelector: shouldUseGluten: true
23/11/21 13:28:08 WARN PhysicalPlanSelector: =========================
running shouldUseGluten from:
io.glutenproject.utils.QueryPlanSelector.maybe(QueryPlanSelector.scala:72)
io.glutenproject.extension.ColumnarOverrideRules.io$glutenproject$extension$ColumnarOverrideRules$$$anonfun$preColumnarTransitions$1(ColumnarOverrides.scala:839)
io.glutenproject.extension.ColumnarOverrideRules$$anonfun$preColumnarTransitions$4.apply(ColumnarOverrides.scala:838)
io.glutenproject.extension.ColumnarOverrideRules$$anonfun$preColumnarTransitions$4.apply(ColumnarOverrides.scala:838)
org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$1(Columnar.scala:553)
plan:
Project [path#1230, partitionValues#1231, size#1232L, modificationTime#1233L, dataChange#1234, null AS stats#1245, tags#1236]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).path, true, false, true) AS path#1230, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), true, false, true), knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).partitionValues) AS partitionValues#1231, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).size AS size#1232L, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).modificationTime AS modificationTime#1233L, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).dataChange AS dataChange#1234, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), true, false, true), knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).tags) AS tags#1236]
+- MapElements org.apache.spark.sql.Dataset$$Lambda$6862/1496757472@3fb77b28, obj#1229: org.apache.spark.sql.delta.actions.AddFile
+- DeserializeToObject newInstance(class scala.Tuple1), obj#1228: scala.Tuple1
+- Project [add#800]
+- Filter isnotnull(add#800)
+- Scan ExistingRDD Delta Table State #5 - file:///tmp/veloxtest5/_delta_log[txn#799,add#800,remove#801,metaData#802,protocol#803,cdc#804,commitInfo#805]

=========================
23/11/21 13:28:08 WARN ColumnarOverrideRules: preColumnarTransitions ------------------------->

23/11/21 13:28:08 WARN PhysicalPlanSelector: shouldUseGluten: true
23/11/21 13:28:08 WARN PhysicalPlanSelector: =========================
running shouldUseGluten from:
io.glutenproject.utils.QueryPlanSelector.maybe(QueryPlanSelector.scala:72)
io.glutenproject.extension.ColumnarOverrideRules.io$glutenproject$extension$ColumnarOverrideRules$$$anonfun$postColumnarTransitions$1(ColumnarOverrides.scala:847)
io.glutenproject.extension.ColumnarOverrideRules$$anonfun$postColumnarTransitions$4.apply(ColumnarOverrides.scala:846)
io.glutenproject.extension.ColumnarOverrideRules$$anonfun$postColumnarTransitions$4.apply(ColumnarOverrides.scala:846)
org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$2(Columnar.scala:555)
plan:
ColumnarToRow
+- ProjectExecTransformer [path#1230, partitionValues#1231, size#1232L, modificationTime#1233L, dataChange#1234, null AS stats#1245, tags#1236]
+- RowToColumnar
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).path, true, false, true) AS path#1230, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), true, false, true), knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).partitionValues) AS partitionValues#1231, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).size AS size#1232L, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).modificationTime AS modificationTime#1233L, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).dataChange AS dataChange#1234, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), true, false, true), knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).tags) AS tags#1236]
+- MapElements org.apache.spark.sql.Dataset$$Lambda$6862/1496757472@3fb77b28, obj#1229: org.apache.spark.sql.delta.actions.AddFile
+- DeserializeToObject newInstance(class scala.Tuple1), obj#1228: scala.Tuple1
+- ColumnarToRow
+- ProjectExecTransformer [add#800]
+- FilterExecTransformer isnotnull(add#800)
+- RowToColumnar
+- Scan ExistingRDD Delta Table State #5 - file:///tmp/veloxtest5/_delta_log[txn#799,add#800,remove#801,metaData#802,protocol#803,cdc#804,commitInfo#805]

=========================
23/11/21 13:28:08 WARN ColumnarOverrideRules: postColumnarTransitions -------------------------->
23/11/21 13:28:08 WARN PhysicalPlanSelector: shouldUseGluten: true
23/11/21 13:28:08 WARN PhysicalPlanSelector: =========================
running shouldUseGluten from:
io.glutenproject.utils.QueryPlanSelector.maybe(QueryPlanSelector.scala:72)
io.glutenproject.extension.ColumnarOverrideRules.io$glutenproject$extension$ColumnarOverrideRules$$$anonfun$postColumnarTransitions$1(ColumnarOverrides.scala:847)
io.glutenproject.extension.ColumnarOverrideRules$$anonfun$postColumnarTransitions$4.apply(ColumnarOverrides.scala:846)
io.glutenproject.extension.ColumnarOverrideRules$$anonfun$postColumnarTransitions$4.apply(ColumnarOverrides.scala:846)
org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$2(Columnar.scala:555)
plan:
ColumnarToRow
+- FilterExecTransformer (((isnotnull(value#1127) AND isnotnull(key#1126)) AND (value#1127 = 4)) AND (key#1126 = 3))
+- NativeFileNativeScan parquet [key#1126,value#1127] Batched: true, DataFilters: [isnotnull(value#1127), isnotnull(key#1126), (value#1127 = 4), (key#1126 = 3)], Format: Parquet, Location: TahoeLogFileIndex(1 paths)[file:/tmp/veloxtest5], PartitionFilters: [], PushedFilters: [IsNotNull(value), IsNotNull(key), EqualTo(value,4), EqualTo(key,3)], ReadSchema: structkey:int,value:int

=========================
23/11/21 13:28:08 WARN ColumnarOverrideRules: postColumnarTransitions ---------------------------> Error happens.

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

No response

@zhli1142015 zhli1142015 added bug Something isn't working triage labels Nov 21, 2023
@zhouyuan zhouyuan changed the title isAdaptiveContext null value for ColumnarOverrideRules [VL] isAdaptiveContext null value for ColumnarOverrideRules Nov 22, 2023
@github-project-automation github-project-automation bot moved this from Todo to Done in Gluten 1.1.0 Nov 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant