From b0d50c0a61ac373d56fba3b1e0242f6048ce935e Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 2 Feb 2024 09:20:15 +0800 Subject: [PATCH] [CORE] Add schema validation for broadcast exchange (#4608) --- .../backendsapi/velox/ValidatorApiImpl.scala | 2 +- .../execution/RowToVeloxColumnarExec.scala | 4 ++-- .../ColumnarCachedBatchSerializer.scala | 3 +-- .../ColumnarBroadcastExchangeExec.scala | 23 ++++++++++++++----- .../utils/velox/VeloxTestSettings.scala | 1 - .../utils/velox/VeloxTestSettings.scala | 1 - .../utils/velox/VeloxTestSettings.scala | 1 - 7 files changed, 21 insertions(+), 14 deletions(-) diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/ValidatorApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/ValidatorApiImpl.scala index c4107c16ea57..2534232fee36 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/ValidatorApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/ValidatorApiImpl.scala @@ -81,7 +81,7 @@ class ValidatorApiImpl extends ValidatorApi { case array: ArrayType => doSchemaValidate(array.elementType) case _ => - Some(s"do not support data type: $schema") + Some(s"Schema / data type not supported: $schema") } } diff --git a/backends-velox/src/main/scala/io/glutenproject/execution/RowToVeloxColumnarExec.scala b/backends-velox/src/main/scala/io/glutenproject/execution/RowToVeloxColumnarExec.scala index 74e8dc783551..832fbb18ac55 100644 --- a/backends-velox/src/main/scala/io/glutenproject/execution/RowToVeloxColumnarExec.scala +++ b/backends-velox/src/main/scala/io/glutenproject/execution/RowToVeloxColumnarExec.scala @@ -16,7 +16,7 @@ */ package io.glutenproject.execution -import io.glutenproject.backendsapi.velox.ValidatorApiImpl +import io.glutenproject.backendsapi.BackendsApiManager import io.glutenproject.columnarbatch.ColumnarBatches import io.glutenproject.exec.Runtimes import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators @@ -45,7 +45,7 @@ import scala.collection.mutable.ListBuffer case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBase(child = child) { override def doExecuteColumnarInternal(): RDD[ColumnarBatch] = { - new ValidatorApiImpl().doSchemaValidate(schema).foreach { + BackendsApiManager.getValidatorApiInstance.doSchemaValidate(schema).foreach { reason => throw new UnsupportedOperationException( s"Input schema contains unsupported type when convert row to columnar for $schema " + diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala index d2c0329c87ca..66acfc5d15df 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution import io.glutenproject.GlutenConfig import io.glutenproject.backendsapi.BackendsApiManager -import io.glutenproject.backendsapi.velox.ValidatorApiImpl import io.glutenproject.columnarbatch.ColumnarBatches import io.glutenproject.exec.Runtimes import io.glutenproject.execution.{RowToVeloxColumnarExec, VeloxColumnarToRowExec} @@ -91,7 +90,7 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe } private def validateSchema(schema: StructType): Boolean = { - val reason = new ValidatorApiImpl().doSchemaValidate(schema) + val reason = BackendsApiManager.getValidatorApiInstance.doSchemaValidate(schema) if (reason.isDefined) { logInfo(s"Columnar cache does not support schema $schema, due to ${reason.get}") false diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index ef86f8ab85ed..6373c8de9a45 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -131,12 +131,23 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) ColumnarBroadcastExchangeExec(mode.canonicalized, child.canonicalized) } - override protected def doValidateInternal(): ValidationResult = mode match { - case _: HashedRelationBroadcastMode => - ValidationResult.ok - case _ => - // TODO IdentityBroadcastMode not supported. Need to support BroadcastNestedLoopJoin first. - ValidationResult.notOk("Only support HashedRelationBroadcastMode for now.") + override protected def doValidateInternal(): ValidationResult = { + mode match { + case _: HashedRelationBroadcastMode => + case _ => + // TODO IdentityBroadcastMode not supported. Need to support BroadcastNestedLoopJoin first. + return ValidationResult.notOk("Only support HashedRelationBroadcastMode for now.") + } + BackendsApiManager.getValidatorApiInstance + .doSchemaValidate(schema) + .map { + reason => + { + ValidationResult.notOk( + s"Unsupported schema in broadcast exchange: $schema, reason: $reason") + } + } + .getOrElse(ValidationResult.ok) } override def doPrepare(): Unit = { diff --git a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index 6236eb3b3626..18829f0224b0 100644 --- a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -293,7 +293,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDataFrameImplicitsSuite] enableSuite[GlutenGeneratorFunctionSuite] enableSuite[GlutenDataFrameTimeWindowingSuite] - .exclude("time window joins") // FIXME hongze enableSuite[GlutenDataFrameSessionWindowingSuite] enableSuite[GlutenBroadcastExchangeSuite] enableSuite[GlutenDataFramePivotSuite] diff --git a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index be165053a9a4..b6713caa8725 100644 --- a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -998,7 +998,6 @@ class VeloxTestSettings extends BackendTestSettings { "SPARK-9083: sort with non-deterministic expressions" ) enableSuite[GlutenDataFrameTimeWindowingSuite] - .exclude("time window joins") // FIXME hongze enableSuite[GlutenDataFrameTungstenSuite] enableSuite[GlutenDataFrameWindowFramesSuite] // Local window fixes are not added. diff --git a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index 8ed7bba86d1b..b7e85ae485f0 100644 --- a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -1001,7 +1001,6 @@ class VeloxTestSettings extends BackendTestSettings { // test for sort node not present but gluten uses shuffle hash join .exclude("SPARK-41048: Improve output partitioning and ordering with AQE cache") enableSuite[GlutenDataFrameTimeWindowingSuite] - .exclude("time window joins") // FIXME hongze enableSuite[GlutenDataFrameTungstenSuite] enableSuite[GlutenDataFrameWindowFramesSuite] // Local window fixes are not added.