diff --git a/.github/workflows/velox_be.yml b/.github/workflows/velox_be.yml index fc21673eb45bc..a65d8dfc97e61 100644 --- a/.github/workflows/velox_be.yml +++ b/.github/workflows/velox_be.yml @@ -112,7 +112,7 @@ jobs: run: | docker exec ubuntu2004-test-slow-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -Piceberg -DargLine="-Dspark.test.home=/opt/spark322" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest' + mvn clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Prss -DargLine="-Dspark.test.home=/opt/spark322" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest' - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 run: | docker exec ubuntu2004-test-slow-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \ @@ -148,7 +148,7 @@ jobs: - name: Build and Run unit test for Spark 3.3.1(slow tests) run: | docker exec ubuntu2004-test-spark33-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten && \ - mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest' + mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest' - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.3 run: | docker exec ubuntu2004-test-spark33-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten/tools/gluten-it && \ @@ -184,7 +184,7 @@ jobs: - name: Build and Run unit test for Spark 3.3.1(other tests) run: | docker exec ubuntu2004-test-spark33-$GITHUB_RUN_ID bash -c 'cd /opt/gluten && \ - mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \ + mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \ mvn test -Pspark-3.3 -Pbackends-velox -DtagsToExclude=None -DtagsToInclude=io.glutenproject.tags.UDFTest' - name: Exit docker container if: ${{ always() }} @@ -214,7 +214,7 @@ jobs: - name: Build and Run unit test for Spark 3.4.1(slow tests) run: | docker exec ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten && \ - mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest' + mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest' - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.4 run: | docker exec ubuntu2004-test-spark34-slow-$GITHUB_RUN_ID bash -l -c 'cd /opt/gluten/tools/gluten-it && \ @@ -250,7 +250,7 @@ jobs: - name: Build and Run unit test for Spark 3.4.1(other tests) run: | docker exec ubuntu2004-test-spark34-$GITHUB_RUN_ID bash -c 'cd /opt/gluten && \ - mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \ + mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Pspark-ut -DargLine="-Dspark.test.home=/opt/spark331" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,io.glutenproject.tags.UDFTest,io.glutenproject.tags.SkipTestTags && \ mvn test -Pspark-3.4 -Pbackends-velox -DtagsToExclude=None -DtagsToInclude=io.glutenproject.tags.UDFTest' - name: Exit docker container if: ${{ always() }} @@ -280,7 +280,7 @@ jobs: run: | docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests' + mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests' - name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.2 run: | docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \ @@ -307,7 +307,7 @@ jobs: run: | docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -Piceberg -DskipTests' + mvn clean install -Pspark-3.3 -Pbackends-velox -Prss -DskipTests' - name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.3 run: | docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \ @@ -320,7 +320,7 @@ jobs: run: | docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -Piceberg -DskipTests' + mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -DskipTests' - name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.4 run: | docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \ @@ -360,7 +360,7 @@ jobs: run: | docker exec centos8-test-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests' + mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests' - name: TPC-H SF1.0 && TPC-DS SF30.0 Parquet local spark3.2 run: | docker exec centos8-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \ @@ -411,7 +411,7 @@ jobs: run: | docker exec centos7-test-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests' + mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests' - name: TPC-H SF1.0 && TPC-DS SF30.0 Parquet local spark3.2 run: | docker exec centos7-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \ @@ -493,7 +493,7 @@ jobs: run: | docker exec static-build-test-$GITHUB_RUN_ID bash -c ' cd /opt/gluten && \ - mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -Piceberg -DskipTests && \ + mvn clean install -Pspark-3.2 -Pbackends-velox -Prss -DskipTests && \ cd /opt/gluten/tools/gluten-it && \ mvn clean install -Pspark-3.2' - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 (centos 8) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala index 1f7dbb9cd9055..ad65c65e64e07 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala @@ -217,6 +217,4 @@ object CHBackendSettings extends BackendSettingsApi with Logging { override def needOutputSchemaForPlan(): Boolean = true override def allowDecimalArithmetic: Boolean = !SQLConf.get.decimalOperationsAllowPrecisionLoss - - override def requiredInputFilePaths(): Boolean = true } diff --git a/backends-velox/src/main/scala/io/glutenproject/execution/FilterExecTransformer.scala b/backends-velox/src/main/scala/io/glutenproject/execution/FilterExecTransformer.scala index 2d23943fd55d5..1c9dc1f9041fc 100644 --- a/backends-velox/src/main/scala/io/glutenproject/execution/FilterExecTransformer.scala +++ b/backends-velox/src/main/scala/io/glutenproject/execution/FilterExecTransformer.scala @@ -66,8 +66,10 @@ case class FilterExecTransformer(condition: Expression, child: SparkPlan) private def getLeftCondition: Expression = { val scanFilters = child match { // Get the filters including the manually pushed down ones. - case basicScanExecTransformer: BasicScanExecTransformer => - basicScanExecTransformer.filterExprs() + case batchScanTransformer: BatchScanExecTransformer => + batchScanTransformer.filterExprs() + case fileScanTransformer: FileSourceScanExecTransformer => + fileScanTransformer.filterExprs() // For fallback scan, we need to keep original filter. case _ => Seq.empty[Expression] diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index 4a2ef341af65f..bd056c2e3e410 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -260,23 +260,6 @@ After the two steps, you can query delta table by gluten/velox without scan's fa Gluten with velox backends also support the column mapping of delta tables. About column mapping, see more [here](https://docs.delta.io/latest/delta-column-mapping.html). -## Iceberg Support - -Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported. - -### How to use - -First of all, compile gluten-iceberg module by a `iceberg` profile, as follows: - -``` -mvn clean package -Pbackends-velox -Pspark-3.3 -Piceberg -DskipTests -``` - -Then, put the additional gluten-iceberg jar to the class path (usually it's `$SPARK_HOME/jars`). -The gluten-iceberg jar is in `gluten-iceberg/target` directory. - -After the two steps, you can query iceberg table by gluten/velox without scan's fallback. - # Coverage Spark3.3 has 387 functions in total. ~240 are commonly used. Velox's functions have two category, Presto and Spark. Presto has 124 functions implemented. Spark has 62 functions. Spark functions are verified to have the same result as Vanilla Spark. Some Presto functions have the same result as Vanilla Spark but some others have different. Gluten prefer to use Spark functions firstly. If it's not in Spark's list but implemented in Presto, we currently offload to Presto one until we noted some result mismatch, then we need to reimplement the function in Spark category. Gluten currently offloads 94 functions and 14 operators, more details refer to [Velox Backend's Supported Operators & Functions](../velox-backend-support-progress.md). diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala index a6443060a4ed7..fcd1bbfe84533 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala @@ -109,6 +109,4 @@ trait BackendSettingsApi { def requiredChildOrderingForWindow(): Boolean = false def staticPartitionWriteOnly(): Boolean = false - - def requiredInputFilePaths(): Boolean = false } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BaseDataSource.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BaseDataSource.scala deleted file mode 100644 index bd5f4e2940ebc..0000000000000 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BaseDataSource.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.glutenproject.execution - -import org.apache.spark.sql.connector.read.InputPartition -import org.apache.spark.sql.types.StructType - -trait BaseDataSource { - - /** Returns the actual schema of this data source scan. */ - def getDataSchema: StructType - - /** Returns the required partition schema, used to generate partition column. */ - def getPartitionSchema: StructType - - /** Returns the partitions generated by this data source scan. */ - def getPartitions: Seq[InputPartition] - - /** Returns the input file paths, used to validate the partition column path */ - def getInputFilePathsInternal: Seq[String] -} diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala index 29654d99d7096..a5cef1a219213 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala @@ -19,7 +19,9 @@ package io.glutenproject.execution import io.glutenproject.backendsapi.BackendsApiManager import io.glutenproject.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer} import io.glutenproject.extension.{GlutenPlan, ValidationResult} +import io.glutenproject.extension.columnar.TransformHints import io.glutenproject.metrics.MetricsUpdater +import io.glutenproject.sql.shims.SparkShimLoader import io.glutenproject.substrait.`type`.TypeBuilder import io.glutenproject.substrait.SubstraitContext import io.glutenproject.substrait.extensions.ExtensionBuilder @@ -414,15 +416,53 @@ object FilterHandler { // Separate and compare the filter conditions in Scan and Filter. // Push down the left conditions in Filter into Scan. - def applyFilterPushdownToScan(filter: FilterExec, reuseSubquery: Boolean): GlutenPlan = - filter.child match { + def applyFilterPushdownToScan(plan: FilterExec, reuseSubquery: Boolean): SparkPlan = + plan.child match { case fileSourceScan: FileSourceScanExec => val leftFilters = - getLeftFilters(fileSourceScan.dataFilters, flattenCondition(filter.condition)) - ScanTransformerFactory.createFileSourceScanTransformer( - fileSourceScan, - reuseSubquery, - extraFilters = leftFilters) + getLeftFilters(fileSourceScan.dataFilters, flattenCondition(plan.condition)) + // transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in partitionFilters + val newPartitionFilters = + ExpressionConverter.transformDynamicPruningExpr( + fileSourceScan.partitionFilters, + reuseSubquery) + new FileSourceScanExecTransformer( + fileSourceScan.relation, + fileSourceScan.output, + fileSourceScan.requiredSchema, + newPartitionFilters, + fileSourceScan.optionalBucketSet, + fileSourceScan.optionalNumCoalescedBuckets, + fileSourceScan.dataFilters ++ leftFilters, + fileSourceScan.tableIdentifier, + fileSourceScan.disableBucketedScan + ) + case batchScan: BatchScanExec => + batchScan.scan match { + case scan: FileScan => + val leftFilters = + getLeftFilters(scan.dataFilters, flattenCondition(plan.condition)) + val newPartitionFilters = + ExpressionConverter.transformDynamicPruningExpr(scan.partitionFilters, reuseSubquery) + new BatchScanExecTransformer( + batchScan.output, + scan, + leftFilters ++ newPartitionFilters, + table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScan)) + case _ => + if (batchScan.runtimeFilters.isEmpty) { + throw new UnsupportedOperationException( + s"${batchScan.scan.getClass.toString} is not supported.") + } else { + // IF filter expressions aren't empty, we need to transform the inner operators. + val newSource = batchScan.copy(runtimeFilters = ExpressionConverter + .transformDynamicPruningExpr(batchScan.runtimeFilters, reuseSubquery)) + TransformHints.tagNotTransformable( + newSource, + "The scan in BatchScanExec is not a FileScan") + newSource + } + } case other => throw new UnsupportedOperationException(s"${other.getClass.toString} is not supported.") } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala index 822d656ff1821..7bb32df6f7cfd 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala @@ -20,45 +20,43 @@ import io.glutenproject.backendsapi.BackendsApiManager import io.glutenproject.expression.{ConverterUtils, ExpressionConverter} import io.glutenproject.extension.ValidationResult import io.glutenproject.substrait.`type`.ColumnTypeNode -import io.glutenproject.substrait.SubstraitContext +import io.glutenproject.substrait.{SubstraitContext, SupportFormat} import io.glutenproject.substrait.plan.PlanBuilder import io.glutenproject.substrait.rel.{ReadRelNode, RelBuilder, SplitInfo} -import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.execution.InSubqueryExec +import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import com.google.common.collect.Lists import scala.collection.JavaConverters._ -trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource { +trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat { + + // The key of merge schema option in Parquet reader. + protected val mergeSchemaOptionKey = "mergeschema" - /** Returns the filters that can be pushed down to native file scan */ def filterExprs(): Seq[Expression] def outputAttributes(): Seq[Attribute] - /** This can be used to report FileFormat for a file based scan operator. */ - val fileFormat: ReadFileFormat + def getPartitions: Seq[InputPartition] + + def getPartitionSchemas: StructType + + def getDataSchemas: StructType // TODO: Remove this expensive call when CH support scan custom partition location. - def getInputFilePaths: Seq[String] = { - // This is a heavy operation, and only the required backend executes the corresponding logic. - if (BackendsApiManager.getSettings.requiredInputFilePaths()) { - getInputFilePathsInternal - } else { - Seq.empty - } - } + def getInputFilePaths: Seq[String] - /** Returns the split infos that will be processed by the underlying native engine. */ - def getSplitInfos: Seq[SplitInfo] = { + def getSplitInfos: Seq[SplitInfo] = getPartitions.map( BackendsApiManager.getIteratorApiInstance - .genSplitInfo(_, getPartitionSchema, fileFormat)) - } + .genSplitInfo(_, getPartitionSchemas, fileFormat)) def doExecuteColumnarInternal(): RDD[ColumnarBatch] = { val numOutputRows = longMetric("outputRows") @@ -87,12 +85,13 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource .supportFileFormatRead( fileFormat, schema.fields, - getPartitionSchema.nonEmpty, + getPartitionSchemas.nonEmpty, getInputFilePaths) ) { return ValidationResult.notOk( s"Not supported file format or complex type for scan: $fileFormat") } + val substraitContext = new SubstraitContext val relNode = doTransform(substraitContext).root @@ -103,9 +102,10 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource val output = outputAttributes() val typeNodes = ConverterUtils.collectAttributeTypeNodes(output) val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output) + val partitionSchemas = getPartitionSchemas val columnTypeNodes = output.map { attr => - if (getPartitionSchema.exists(_.name.equals(attr.name))) { + if (partitionSchemas.exists(_.name.equals(attr.name))) { new ColumnTypeNode(1) } else { new ColumnTypeNode(0) @@ -125,7 +125,11 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource exprNode, context, context.nextOperatorId(this.nodeName)) - relNode.asInstanceOf[ReadRelNode].setDataSchema(getDataSchema) + relNode.asInstanceOf[ReadRelNode].setDataSchema(getDataSchemas) TransformContext(output, output, relNode) } + + def executeInSubqueryForDynamicPruningExpression(inSubquery: InSubqueryExec): Unit = { + if (inSubquery.values().isEmpty) inSubquery.updateResult() + } } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala index 051ba9779084d..1d4551afa4b2a 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BatchScanExecTransformer.scala @@ -68,17 +68,17 @@ class BatchScanExecTransformer( override def getPartitions: Seq[InputPartition] = filteredFlattenPartitions - override def getPartitionSchema: StructType = scan match { + override def getPartitionSchemas: StructType = scan match { case fileScan: FileScan => fileScan.readPartitionSchema case _ => new StructType() } - override def getDataSchema: StructType = scan match { + override def getDataSchemas: StructType = scan match { case fileScan: FileScan => fileScan.readDataSchema case _ => new StructType() } - override def getInputFilePathsInternal: Seq[String] = { + override def getInputFilePaths: Seq[String] = { scan match { case fileScan: FileScan => fileScan.fileIndex.inputFiles.toSeq case _ => Seq.empty diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/DataSourceV2TransformerRegister.scala b/gluten-core/src/main/scala/io/glutenproject/execution/DataSourceV2TransformerRegister.scala deleted file mode 100644 index da0305acd148d..0000000000000 --- a/gluten-core/src/main/scala/io/glutenproject/execution/DataSourceV2TransformerRegister.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.glutenproject.execution - -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec - -/** - * Data sources v2 transformer should implement this trait so that they can register an alias to - * their data source v2 transformer. This allows users to give the data source v2 transformer alias - * as the format type over the fully qualified class name. - */ -trait DataSourceV2TransformerRegister { - - /** - * The scan class name that this data source v2 transformer provider adapts. This is overridden by - * children to provide a alias for the data source v2 transformer. For example: - * - * {{{ - * override def scanClassName(): String = "org.apache.iceberg.spark.source.SparkBatchQueryScan" - * }}} - */ - def scanClassName(): String - - def createDataSourceV2Transformer( - batchScan: BatchScanExec, - partitionFilters: Seq[Expression]): BatchScanExecTransformer -} diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala index 80e5e7e6478bc..822f98c479861 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/FileSourceScanExecTransformer.scala @@ -38,14 +38,14 @@ import org.apache.spark.util.collection.BitSet import scala.collection.JavaConverters class FileSourceScanExecTransformer( - @transient override val relation: HadoopFsRelation, + @transient relation: HadoopFsRelation, output: Seq[Attribute], requiredSchema: StructType, partitionFilters: Seq[Expression], optionalBucketSet: Option[BitSet], optionalNumCoalescedBuckets: Option[Int], dataFilters: Seq[Expression], - override val tableIdentifier: Option[TableIdentifier], + tableIdentifier: Option[TableIdentifier], disableBucketedScan: Boolean = false) extends FileSourceScanExecShim( relation, @@ -57,7 +57,7 @@ class FileSourceScanExecTransformer( dataFilters, tableIdentifier, disableBucketedScan) - with DatasourceScanTransformer { + with BasicScanExecTransformer { // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. @transient override lazy val metrics: Map[String, SQLMetric] = @@ -89,11 +89,11 @@ class FileSourceScanExecTransformer( optionalNumCoalescedBuckets, disableBucketedScan) - override def getPartitionSchema: StructType = relation.partitionSchema + override def getPartitionSchemas: StructType = relation.partitionSchema - override def getDataSchema: StructType = relation.dataSchema + override def getDataSchemas: StructType = relation.dataSchema - override def getInputFilePathsInternal: Seq[String] = { + override def getInputFilePaths: Seq[String] = { relation.location.inputFiles.toSeq } @@ -150,7 +150,7 @@ class FileSourceScanExecTransformer( } val readRelNode = transformCtx.root.asInstanceOf[ReadRelNode] - readRelNode.setDataSchema(getDataSchema) + readRelNode.setDataSchema(getDataSchemas) readRelNode.setProperties(JavaConverters.mapAsJavaMap(options)) } transformCtx diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala b/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala index 10bfadf307e67..e69de29bb2d1d 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/ScanTransformerFactory.scala @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.glutenproject.execution - -import io.glutenproject.expression.ExpressionConverter -import io.glutenproject.sql.shims.SparkShimLoader - -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.connector.read.Scan -import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} - -import java.util.ServiceLoader -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.JavaConverters._ - -object ScanTransformerFactory { - - private val dataSourceV2TransformerMap = new ConcurrentHashMap[String, Class[_]]() - - def createFileSourceScanTransformer( - scanExec: FileSourceScanExec, - reuseSubquery: Boolean, - extraFilters: Seq[Expression] = Seq.empty, - validation: Boolean = false): FileSourceScanExecTransformer = { - // transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in partitionFilters - val newPartitionFilters = if (validation) { - scanExec.partitionFilters - } else { - ExpressionConverter.transformDynamicPruningExpr(scanExec.partitionFilters, reuseSubquery) - } - new FileSourceScanExecTransformer( - scanExec.relation, - scanExec.output, - scanExec.requiredSchema, - newPartitionFilters, - scanExec.optionalBucketSet, - scanExec.optionalNumCoalescedBuckets, - scanExec.dataFilters ++ extraFilters, - scanExec.tableIdentifier, - scanExec.disableBucketedScan - ) - } - - def createBatchScanTransformer( - batchScanExec: BatchScanExec, - reuseSubquery: Boolean, - validation: Boolean = false): BatchScanExecTransformer = { - val newPartitionFilters = if (validation) { - batchScanExec.runtimeFilters - } else { - ExpressionConverter.transformDynamicPruningExpr(batchScanExec.runtimeFilters, reuseSubquery) - } - val scan = batchScanExec.scan - scan match { - case _ if dataSourceV2TransformerExists(scan.getClass.getName) => - val cls = lookupDataSourceV2Transformer(scan.getClass.getName) - cls - .getDeclaredConstructor() - .newInstance() - .asInstanceOf[DataSourceV2TransformerRegister] - .createDataSourceV2Transformer(batchScanExec, newPartitionFilters) - case _ => - new BatchScanExecTransformer( - batchScanExec.output, - batchScanExec.scan, - newPartitionFilters, - table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)) - } - } - - def supportedBatchScan(scan: Scan): Boolean = scan match { - case _: FileScan => true - case _ if dataSourceV2TransformerExists(scan.getClass.getName) => true - case _ => false - } - - private def lookupDataSourceV2Transformer(scanClassName: String): Class[_] = { - dataSourceV2TransformerMap.computeIfAbsent( - scanClassName, - _ => { - val loader = Option(Thread.currentThread().getContextClassLoader) - .getOrElse(getClass.getClassLoader) - val serviceLoader = ServiceLoader.load(classOf[DataSourceV2TransformerRegister], loader) - serviceLoader.asScala - .filter(_.scanClassName().equalsIgnoreCase(scanClassName)) - .toList match { - case head :: Nil => - // there is exactly one registered alias - head.getClass - case _ => null - } - } - ) - } - - private def dataSourceV2TransformerExists(scanClassName: String): Boolean = { - lookupDataSourceV2Transformer(scanClassName) != null - } -} diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala index a73527d2f3136..9dd6b2e0ed1e0 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala @@ -226,7 +226,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f context } - /** Find all BasicScanExecTransformer in one WholeStageTransformer */ + /** Find all BasicScanExecTransformers in one WholeStageTransformer */ private def findAllScanTransformers(): Seq[BasicScanExecTransformer] = { val basicScanExecTransformers = new mutable.ListBuffer[BasicScanExecTransformer]() diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala index 4a9e48c6aea18..7fbc34e973bc1 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.expressions.{BinaryArithmetic, _} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero -import org.apache.spark.sql.execution.{ScalarSubquery, _} +import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec import org.apache.spark.sql.hive.HiveSimpleUDFTransformer @@ -455,9 +455,7 @@ object ExpressionConverter extends SQLConfHelper with Logging { * Transform BroadcastExchangeExec to ColumnarBroadcastExchangeExec in DynamicPruningExpression. * * @param partitionFilters - * The partition filter of Scan * @return - * Transformed partition filter */ def transformDynamicPruningExpr( partitionFilters: Seq[Expression], @@ -470,13 +468,15 @@ object ExpressionConverter extends SQLConfHelper with Logging { case c2r: ColumnarToRowExecBase => c2r.child // in fallback case case plan: UnaryExecNode if !plan.isInstanceOf[GlutenPlan] => - plan.child match { - case _: ColumnarToRowExec => - val wholeStageTransformer = exchange.find(_.isInstanceOf[WholeStageTransformer]) - wholeStageTransformer.getOrElse( - BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan)) - case _ => + if (plan.child.isInstanceOf[ColumnarToRowExec]) { + val wholeStageTransformer = exchange.find(_.isInstanceOf[WholeStageTransformer]) + if (wholeStageTransformer.nonEmpty) { + wholeStageTransformer.get + } else { BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan) + } + } else { + BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan) } } ColumnarBroadcastExchangeExec(exchange.mode, newChild) @@ -486,7 +486,7 @@ object ExpressionConverter extends SQLConfHelper with Logging { // Disable ColumnarSubqueryBroadcast for scan-only execution. partitionFilters } else { - val newPartitionFilters = partitionFilters.map { + partitionFilters.map { case dynamicPruning: DynamicPruningExpression => dynamicPruning.transform { // Lookup inside subqueries for duplicate exchanges. @@ -516,12 +516,13 @@ object ExpressionConverter extends SQLConfHelper with Logging { // On the other hand, it needs to use // the AdaptiveSparkPlanExec.AdaptiveExecutionContext to hold the reused map // for each query. - newIn.child match { - case a: AdaptiveSparkPlanExec if reuseSubquery => - // When AQE is on and reuseSubquery is on. - a.context.subqueryCache - .update(newIn.canonicalized, transformSubqueryBroadcast) - case _ => + if (newIn.child.isInstanceOf[AdaptiveSparkPlanExec] && reuseSubquery) { + // When AQE is on and reuseSubquery is on. + newIn.child + .asInstanceOf[AdaptiveSparkPlanExec] + .context + .subqueryCache + .update(newIn.canonicalized, transformSubqueryBroadcast) } in.copy(plan = transformSubqueryBroadcast.asInstanceOf[BaseSubqueryExec]) case r: ReusedSubqueryExec if r.child.isInstanceOf[SubqueryBroadcastExec] => @@ -544,7 +545,7 @@ object ExpressionConverter extends SQLConfHelper with Logging { logWarning(errMsg) throw new UnsupportedOperationException(errMsg) } - case _ => + case other => val errMsg = "Can not get the reused ColumnarSubqueryBroadcastExec" + "by the ${newIn.canonicalized}" logWarning(errMsg) @@ -555,25 +556,6 @@ object ExpressionConverter extends SQLConfHelper with Logging { } case e: Expression => e } - updateSubqueryResult(newPartitionFilters) - newPartitionFilters - } - } - - private def updateSubqueryResult(partitionFilters: Seq[Expression]): Unit = { - // When it includes some DynamicPruningExpression, - // it needs to execute InSubqueryExec first, - // because doTransform path can't execute 'doExecuteColumnar' which will - // execute prepare subquery first. - partitionFilters.foreach { - case DynamicPruningExpression(inSubquery: InSubqueryExec) => - if (inSubquery.values().isEmpty) inSubquery.updateResult() - case e: Expression => - e.foreach { - case s: ScalarSubquery => s.updateResult() - case _ => - } - case _ => } } } diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index 84a0dc58bc35d..e655014b3c2a6 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -22,6 +22,7 @@ import io.glutenproject.execution._ import io.glutenproject.expression.ExpressionConverter import io.glutenproject.extension.columnar._ import io.glutenproject.metrics.GlutenTimeMetric +import io.glutenproject.sql.shims.SparkShimLoader import io.glutenproject.utils.{LogLevelUtil, PhysicalPlanSelector} import org.apache.spark.api.python.EvalPythonExecTransformer @@ -136,20 +137,30 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean) private def genFilterExec(plan: FilterExec): SparkPlan = { // FIXME: Filter push-down should be better done by Vanilla Spark's planner or by // a individual rule. - // Push down the left conditions in Filter into FileSourceScan. - val newChild: SparkPlan = plan.child match { - case scan: FileSourceScanExec => - TransformHints.getHint(scan) match { + // Push down the left conditions in Filter into Scan. + val newChild: SparkPlan = + if ( + plan.child.isInstanceOf[FileSourceScanExec] || + plan.child.isInstanceOf[BatchScanExec] + ) { + TransformHints.getHint(plan.child) match { case TRANSFORM_SUPPORTED() => val newScan = FilterHandler.applyFilterPushdownToScan(plan, reuseSubquery) newScan match { - case ts: TransformSupport if ts.doValidate().isValid => ts - case _ => replaceWithTransformerPlan(scan) + case ts: TransformSupport => + if (ts.doValidate().isValid) { + ts + } else { + replaceWithTransformerPlan(plan.child) + } + case p: SparkPlan => p } - case _ => replaceWithTransformerPlan(scan) + case _ => + replaceWithTransformerPlan(plan.child) } - case _ => replaceWithTransformerPlan(plan.child) - } + } else { + replaceWithTransformerPlan(plan.child) + } logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") BackendsApiManager.getSparkPlanExecApiInstance .genFilterExecTransformer(plan.condition, newChild) @@ -522,39 +533,52 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean) */ def applyScanTransformer(plan: SparkPlan): SparkPlan = plan match { case plan: FileSourceScanExec => - val transformer = ScanTransformerFactory.createFileSourceScanTransformer(plan, reuseSubquery) + val newPartitionFilters = + ExpressionConverter.transformDynamicPruningExpr(plan.partitionFilters, reuseSubquery) + val transformer = new FileSourceScanExecTransformer( + plan.relation, + plan.output, + plan.requiredSchema, + newPartitionFilters, + plan.optionalBucketSet, + plan.optionalNumCoalescedBuckets, + plan.dataFilters, + plan.tableIdentifier, + plan.disableBucketedScan + ) val validationResult = transformer.doValidate() if (validationResult.isValid) { logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") transformer } else { logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - val newSource = plan.copy(partitionFilters = transformer.partitionFilters) + val newSource = plan.copy(partitionFilters = newPartitionFilters) TransformHints.tagNotTransformable(newSource, validationResult.reason.get) newSource } case plan: BatchScanExec => - if (ScanTransformerFactory.supportedBatchScan(plan.scan)) { - val transformer = ScanTransformerFactory.createBatchScanTransformer(plan, reuseSubquery) - val validationResult = transformer.doValidate() - if (validationResult.isValid) { - logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - transformer - } else { - logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") - val newSource = plan.copy(runtimeFilters = transformer.runtimeFilters) - TransformHints.tagNotTransformable(newSource, validationResult.reason.get) - newSource - } + val newPartitionFilters: Seq[Expression] = plan.scan match { + case scan: FileScan => + ExpressionConverter.transformDynamicPruningExpr(scan.partitionFilters, reuseSubquery) + case _ => + ExpressionConverter.transformDynamicPruningExpr(plan.runtimeFilters, reuseSubquery) + } + val transformer = new BatchScanExecTransformer( + plan.output, + plan.scan, + newPartitionFilters, + table = SparkShimLoader.getSparkShims.getBatchScanExecTable(plan)) + + val validationResult = transformer.doValidate() + if (validationResult.isValid) { + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + transformer } else { - // If filter expressions aren't empty, we need to transform the inner operators, - // and fallback the BatchScanExec itself. - val newSource = plan.copy(runtimeFilters = ExpressionConverter - .transformDynamicPruningExpr(plan.runtimeFilters, reuseSubquery)) - TransformHints.tagNotTransformable(newSource, "The scan in BatchScanExec is not supported.") + logDebug(s"Columnar Processing for ${plan.getClass} is currently unsupported.") + val newSource = plan.copy(runtimeFilters = newPartitionFilters) + TransformHints.tagNotTransformable(newSource, validationResult.reason.get) newSource } - case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) => // TODO: Add DynamicPartitionPruningHiveScanSuite.scala val newPartitionFilters: Seq[Expression] = ExpressionConverter.transformDynamicPruningExpr( diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala b/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala index c15c538b89d53..9aa7a887a8a81 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/columnar/TransformHintRule.scala @@ -379,11 +379,11 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { if (plan.runtimeFilters.nonEmpty) { TransformHints.tagTransformable(plan) } else { - val transformer = - ScanTransformerFactory.createBatchScanTransformer( - plan, - reuseSubquery = false, - validation = true) + val transformer = new BatchScanExecTransformer( + plan.output, + plan.scan, + plan.runtimeFilters, + table = SparkShimLoader.getSparkShims.getBatchScanExecTable(plan)) TransformHints.tag(plan, transformer.doValidate().toTransformHint) } } @@ -397,11 +397,17 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { if (plan.partitionFilters.nonEmpty) { TransformHints.tagTransformable(plan) } else { - val transformer = - ScanTransformerFactory.createFileSourceScanTransformer( - plan, - reuseSubquery = false, - validation = true) + val transformer = new FileSourceScanExecTransformer( + plan.relation, + plan.output, + plan.requiredSchema, + plan.partitionFilters, + plan.optionalBucketSet, + plan.optionalNumCoalescedBuckets, + plan.dataFilters, + plan.tableIdentifier, + plan.disableBucketedScan + ) TransformHints.tag(plan, transformer.doValidate().toTransformHint) } } diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/DatasourceScanTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/substrait/SupportFormat.scala similarity index 68% rename from gluten-core/src/main/scala/io/glutenproject/execution/DatasourceScanTransformer.scala rename to gluten-core/src/main/scala/io/glutenproject/substrait/SupportFormat.scala index e4c73436afb72..d0ea8648d494b 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/DatasourceScanTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/substrait/SupportFormat.scala @@ -14,16 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.glutenproject.execution +package io.glutenproject.substrait -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.sources.BaseRelation +import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat -trait DatasourceScanTransformer extends BasicScanExecTransformer { - - /** The file-based relation to scan. */ - val relation: BaseRelation - - /** Identifier for the table in the metastore. */ - val tableIdentifier: Option[TableIdentifier] +/** + * A mix-in interface for BasicScanExecTransformer. This can be used to report FileFormat for a file + * based scan operator. + */ +trait SupportFormat { + @transient val fileFormat: ReadFileFormat } diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala index c98745fd1f645..17a9518601bfc 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala @@ -132,13 +132,13 @@ case class ColumnarCollapseTransformStages( * When it's the ClickHouse backend, BasicScanExecTransformer will not be included in * WholeStageTransformer. */ - private def isSeparateBaseScanExecTransformer(plan: SparkPlan): Boolean = plan match { + private def isSeparateBasicScanExecTransformer(plan: SparkPlan): Boolean = plan match { case _: BasicScanExecTransformer if separateScanRDD => true case _ => false } private def supportTransform(plan: SparkPlan): Boolean = plan match { - case plan: TransformSupport if !isSeparateBaseScanExecTransformer(plan) => true + case plan: TransformSupport if !isSeparateBasicScanExecTransformer(plan) => true case _ => false } diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala index 9f5e07e2940dd..bf8af495af973 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala @@ -70,11 +70,11 @@ class HiveTableScanExecTransformer( override def getPartitions: Seq[InputPartition] = partitions - override def getPartitionSchema: StructType = relation.tableMeta.partitionSchema + override def getPartitionSchemas: StructType = relation.tableMeta.partitionSchema - override def getDataSchema: StructType = relation.tableMeta.dataSchema + override def getDataSchemas: StructType = relation.tableMeta.dataSchema - override def getInputFilePathsInternal: Seq[String] = { + override def getInputFilePaths: Seq[String] = { // FIXME how does a hive table expose file paths? Seq.empty } @@ -172,7 +172,7 @@ class HiveTableScanExecTransformer( case (_, _) => } val readRelNode = transformCtx.root.asInstanceOf[ReadRelNode] - readRelNode.setDataSchema(getDataSchema) + readRelNode.setDataSchema(getDataSchemas) readRelNode.setProperties(JavaConverters.mapAsJavaMap(options)) } transformCtx diff --git a/gluten-delta/pom.xml b/gluten-delta/pom.xml index e2b47f8e1792b..76e64e9e3b4cd 100755 --- a/gluten-delta/pom.xml +++ b/gluten-delta/pom.xml @@ -93,12 +93,6 @@ ${hadoop.version} test - - com.google.protobuf - protobuf-java - ${protobuf.version} - test - org.scalatest scalatest_${scala.binary.version} diff --git a/gluten-iceberg/pom.xml b/gluten-iceberg/pom.xml deleted file mode 100644 index 8cae66d7ead26..0000000000000 --- a/gluten-iceberg/pom.xml +++ /dev/null @@ -1,168 +0,0 @@ - - - - gluten-parent - io.glutenproject - 1.1.0-SNAPSHOT - ../pom.xml - - 4.0.0 - - gluten-iceberg - jar - Gluten Iceberg - - - ${project.basedir}/src/main/resources - - - - - io.glutenproject - gluten-core - ${project.version} - provided - - - org.apache.spark - spark-core_${scala.binary.version} - provided - - - org.apache.spark - spark-sql_${scala.binary.version} - provided - - - org.apache.iceberg - iceberg-spark-${sparkbundle.version}_${scala.binary.version} - ${iceberg.version} - provided - - - - - io.glutenproject - gluten-core - ${project.version} - test-jar - test - - - io.glutenproject - backends-velox - ${project.version} - test - - - io.glutenproject - backends-velox - ${project.version} - test-jar - test - - - org.apache.spark - spark-core_${scala.binary.version} - - - org.apache.spark - spark-core_${scala.binary.version} - test-jar - - - org.apache.spark - spark-sql_${scala.binary.version} - test-jar - - - org.apache.spark - spark-catalyst_${scala.binary.version} - test-jar - - - org.apache.spark - spark-hive_${scala.binary.version} - ${spark.version} - test - - - org.apache.iceberg - iceberg-spark-runtime-${sparkbundle.version}_${scala.binary.version} - ${iceberg.version} - test - - - org.apache.hadoop - hadoop-client - ${hadoop.version} - test - - - com.google.protobuf - protobuf-java - ${protobuf.version} - test - - - org.scalatest - scalatest_${scala.binary.version} - test - - - - - target/scala-${scala.binary.version}/classes - target/scala-${scala.binary.version}/test-classes - - - ${resource.dir} - - - - - net.alchim31.maven - scala-maven-plugin - - - org.apache.maven.plugins - maven-compiler-plugin - - - org.scalastyle - scalastyle-maven-plugin - - - org.apache.maven.plugins - maven-checkstyle-plugin - - - com.diffplug.spotless - spotless-maven-plugin - - - org.scalatest - scalatest-maven-plugin - - - org.apache.maven.plugins - maven-jar-plugin - - - prepare-test-jar - test-compile - - test-jar - - - - - - org.apache.maven.plugins - maven-resources-plugin - - - - diff --git a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java deleted file mode 100644 index 3452836cfd833..0000000000000 --- a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.glutenproject.substrait.rel; - -import java.util.List; -import java.util.Map; - -public class IcebergLocalFilesBuilder { - - // TODO: Add makeIcebergLocalFiles for MOR iceberg table - - public static IcebergLocalFilesNode makeIcebergLocalFiles( - Integer index, - List paths, - List starts, - List lengths, - List> partitionColumns, - LocalFilesNode.ReadFileFormat fileFormat, - List preferredLocations) { - return new IcebergLocalFilesNode( - index, paths, starts, lengths, partitionColumns, fileFormat, preferredLocations); - } -} diff --git a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java deleted file mode 100644 index c763a46b19157..0000000000000 --- a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.glutenproject.substrait.rel; - -import java.util.List; -import java.util.Map; - -public class IcebergLocalFilesNode extends LocalFilesNode { - - class DeleteFile { - private final String path; - private final Integer fileContent; - private final ReadFileFormat fileFormat; - private final Long fileSize; - private final Long recordCount; - private final Map lowerBounds; - private final Map upperBounds; - - DeleteFile( - String path, - Integer fileContent, - ReadFileFormat fileFormat, - Long fileSize, - Long recordCount, - Map lowerBounds, - Map upperBounds) { - this.path = path; - this.fileContent = fileContent; - this.fileFormat = fileFormat; - this.fileSize = fileSize; - this.recordCount = recordCount; - this.lowerBounds = lowerBounds; - this.upperBounds = upperBounds; - } - } - - // TODO: Add delete file support for MOR iceberg table - - IcebergLocalFilesNode( - Integer index, - List paths, - List starts, - List lengths, - List> partitionColumns, - ReadFileFormat fileFormat, - List preferredLocations) { - super(index, paths, starts, lengths, partitionColumns, fileFormat, preferredLocations); - } -} diff --git a/gluten-iceberg/src/main/resources/META-INF/services/io.glutenproject.execution.DataSourceV2TransformerRegister b/gluten-iceberg/src/main/resources/META-INF/services/io.glutenproject.execution.DataSourceV2TransformerRegister deleted file mode 100644 index 658967bb99b6c..0000000000000 --- a/gluten-iceberg/src/main/resources/META-INF/services/io.glutenproject.execution.DataSourceV2TransformerRegister +++ /dev/null @@ -1 +0,0 @@ -io.glutenproject.execution.IcebergTransformerProvider \ No newline at end of file diff --git a/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala deleted file mode 100644 index ee4835ddc20e4..0000000000000 --- a/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.glutenproject.execution - -import io.glutenproject.sql.shims.SparkShimLoader -import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat -import io.glutenproject.substrait.rel.SplitInfo - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} -import org.apache.spark.sql.connector.catalog.Table -import org.apache.spark.sql.connector.read.Scan -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.ColumnarBatch - -import org.apache.iceberg.spark.source.GlutenIcebergSourceUtil - -class IcebergScanTransformer( - output: Seq[AttributeReference], - @transient scan: Scan, - runtimeFilters: Seq[Expression], - @transient table: Table) - extends BatchScanExecTransformer( - output = output, - scan = scan, - runtimeFilters = runtimeFilters, - table = table) { - - override def filterExprs(): Seq[Expression] = Seq.empty - - override def getPartitionSchema: StructType = new StructType() - - override def getDataSchema: StructType = new StructType() - - override def getInputFilePathsInternal: Seq[String] = Seq.empty - - override lazy val fileFormat: ReadFileFormat = GlutenIcebergSourceUtil.getFileFormat(scan) - - override def doExecuteColumnar(): RDD[ColumnarBatch] = throw new UnsupportedOperationException() - - override def getSplitInfos: Seq[SplitInfo] = { - getPartitions.zipWithIndex.map { - case (p, index) => GlutenIcebergSourceUtil.genSplitInfo(p, index) - } - } -} - -object IcebergScanTransformer { - def apply(batchScan: BatchScanExec, partitionFilters: Seq[Expression]): IcebergScanTransformer = { - new IcebergScanTransformer( - batchScan.output, - batchScan.scan, - partitionFilters, - table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScan)) - } -} diff --git a/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergTransformerProvider.scala b/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergTransformerProvider.scala deleted file mode 100644 index 17d146da2021d..0000000000000 --- a/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergTransformerProvider.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.glutenproject.execution - -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec - -class IcebergTransformerProvider extends DataSourceV2TransformerRegister { - - override def scanClassName(): String = "org.apache.iceberg.spark.source.SparkBatchQueryScan" - - override def createDataSourceV2Transformer( - batchScan: BatchScanExec, - partitionFilters: Seq[Expression]): BatchScanExecTransformer = { - IcebergScanTransformer.apply(batchScan, partitionFilters) - } -} diff --git a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala deleted file mode 100644 index 2dcfe832e1575..0000000000000 --- a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iceberg.spark.source - -import io.glutenproject.substrait.rel.{IcebergLocalFilesBuilder, SplitInfo} -import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat - -import org.apache.spark.softaffinity.SoftAffinityUtil -import org.apache.spark.sql.connector.read.{InputPartition, Scan} - -import org.apache.iceberg.{CombinedScanTask, FileFormat, FileScanTask, ScanTask} - -import java.lang.{Long => JLong} -import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} - -import scala.collection.JavaConverters._ - -object GlutenIcebergSourceUtil { - - def genSplitInfo(inputPartition: InputPartition, index: Int): SplitInfo = inputPartition match { - case partition: SparkInputPartition => - val paths = new JArrayList[String]() - val starts = new JArrayList[JLong]() - val lengths = new JArrayList[JLong]() - val partitionColumns = new JArrayList[JMap[String, String]]() - var fileFormat = ReadFileFormat.UnknownFormat - - val tasks = partition.taskGroup[ScanTask]().tasks().asScala - asFileScanTask(tasks.toList).foreach { - task => - paths.add(task.file().path().toString) - starts.add(task.start()) - lengths.add(task.length()) - partitionColumns.add(new JHashMap[String, String]()) - val currentFileFormat = task.file().format() match { - case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat - case FileFormat.ORC => ReadFileFormat.OrcReadFormat - case _ => - throw new UnsupportedOperationException( - "Iceberg Only support parquet and orc file format.") - } - if (fileFormat == ReadFileFormat.UnknownFormat) { - fileFormat = currentFileFormat - } else if (fileFormat != currentFileFormat) { - throw new UnsupportedOperationException( - s"Only one file format is supported, " + - s"find different file format $fileFormat and $currentFileFormat") - } - } - val preferredLoc = SoftAffinityUtil.getFilePartitionLocations( - paths.asScala.toArray, - inputPartition.preferredLocations()) - IcebergLocalFilesBuilder.makeIcebergLocalFiles( - index, - paths, - starts, - lengths, - partitionColumns, - fileFormat, - preferredLoc.toList.asJava - ) - case _ => - throw new UnsupportedOperationException("Only support iceberg SparkInputPartition.") - } - - def getFileFormat(sparkScan: Scan): ReadFileFormat = sparkScan match { - case scan: SparkBatchQueryScan => - val tasks = scan.tasks().asScala - asFileScanTask(tasks.toList).foreach { - task => - task.file().format() match { - case FileFormat.PARQUET => return ReadFileFormat.ParquetReadFormat - case FileFormat.ORC => return ReadFileFormat.OrcReadFormat - case _ => - } - } - throw new UnsupportedOperationException("Iceberg Only support parquet and orc file format.") - case _ => - throw new UnsupportedOperationException("Only support iceberg SparkBatchQueryScan.") - } - - private def asFileScanTask(tasks: List[ScanTask]): List[FileScanTask] = { - if (tasks.forall(_.isFileScanTask)) { - tasks.map(_.asFileScanTask()) - } else if (tasks.forall(_.isInstanceOf[CombinedScanTask])) { - tasks.flatMap(_.asCombinedScanTask().tasks().asScala) - } else { - throw new UnsupportedOperationException( - "Only support iceberg CombinedScanTask and FileScanTask.") - } - } -} diff --git a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala deleted file mode 100644 index 97c590dce212a..0000000000000 --- a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.glutenproject.execution - -import org.apache.spark.SparkConf - -class VeloxIcebergSuite extends WholeStageTransformerSuite { - - protected val rootPath: String = getClass.getResource("/").getPath - override protected val backend: String = "velox" - override protected val resourcePath: String = "/tpch-data-parquet-velox" - override protected val fileFormat: String = "parquet" - - override protected def sparkConf: SparkConf = { - super.sparkConf - .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") - .set("spark.sql.files.maxPartitionBytes", "1g") - .set("spark.sql.shuffle.partitions", "1") - .set("spark.memory.offHeap.size", "2g") - .set("spark.unsafe.exceptionOnMemoryLeak", "true") - .set("spark.sql.autoBroadcastJoinThreshold", "-1") - .set( - "spark.sql.extensions", - "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") - .set("spark.sql.catalog.spark_catalog.type", "hadoop") - .set("spark.sql.catalog.spark_catalog.warehouse", s"file://$rootPath/tpch-data-iceberg-velox") - } - - test("iceberg transformer exists") { - spark.sql(""" - |create table iceberg_tb using iceberg as - |(select 1 as col1, 2 as col2, 3 as col3) - |""".stripMargin) - - runQueryAndCompare(""" - |select * from iceberg_tb; - |""".stripMargin) { - checkOperatorMatch[IcebergScanTransformer] - } - } -} diff --git a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala deleted file mode 100644 index b8693a48ccab8..0000000000000 --- a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.glutenproject.execution - -import org.apache.spark.SparkConf - -import java.io.File - -class VeloxTPCHIcebergSuite extends VeloxTPCHSuite { - - protected val tpchBasePath: String = new File( - "../backends-velox/src/test/resources").getAbsolutePath - - override protected val resourcePath: String = - new File(tpchBasePath, "tpch-data-parquet-velox").getCanonicalPath - - override protected val veloxTPCHQueries: String = - new File(tpchBasePath, "tpch-queries-velox").getCanonicalPath - - override protected val queriesResults: String = - new File(tpchBasePath, "queries-output").getCanonicalPath - - override protected def sparkConf: SparkConf = { - super.sparkConf - .set( - "spark.sql.extensions", - "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") - .set("spark.sql.catalog.spark_catalog.type", "hadoop") - .set("spark.sql.catalog.spark_catalog.warehouse", s"file://$rootPath/tpch-data-iceberg-velox") - } - - override protected def createTPCHNotNullTables(): Unit = { - TPCHTables = TPCHTableNames.map { - table => - val tablePath = new File(resourcePath, table).getAbsolutePath - val tableDF = spark.read.format(fileFormat).load(tablePath) - tableDF.write.format("iceberg").mode("append").saveAsTable(table) - (table, tableDF) - }.toMap - } - - test("iceberg transformer exists") { - runQueryAndCompare(""" - |SELECT - | l_orderkey, - | o_orderdate - |FROM - | orders, - | lineitem - |WHERE - | l_orderkey = o_orderkey - |ORDER BY - | l_orderkey, - | o_orderdate - |LIMIT - | 10; - |""".stripMargin) { - df => - { - assert( - getExecutedPlan(df).count( - plan => { - plan.isInstanceOf[IcebergScanTransformer] - }) == 2) - } - } - } -} diff --git a/pom.xml b/pom.xml index 9f98a856e34ff..78c4c2e6f3691 100644 --- a/pom.xml +++ b/pom.xml @@ -40,7 +40,6 @@ 2.9.3 2.0.1 - 1.3.1 20 2.12 2.12.15 @@ -208,15 +207,6 @@ gluten-delta - - iceberg - - false - - - gluten-iceberg - - backends-velox diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index d380b0bd3e5ce..9e32b35b8f3bc 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -121,6 +121,20 @@ class FileSourceScanExecShim( val dynamicPartitionFilters = partitionFilters.filter(isDynamicPruningFilter) val selected = if (dynamicPartitionFilters.nonEmpty) { + // When it includes some DynamicPruningExpression, + // it needs to execute InSubqueryExec first, + // because doTransform path can't execute 'doExecuteColumnar' which will + // execute prepare subquery first. + dynamicPartitionFilters.foreach { + case DynamicPruningExpression(inSubquery: InSubqueryExec) => + if (inSubquery.values().isEmpty) inSubquery.updateResult() + case e: Expression => + e.foreach { + case s: ScalarSubquery => s.updateResult() + case _ => + } + case _ => + } GlutenTimeMetric.withMillisTime { // call the file index for the files matching all filters except dynamic partition filters val predicate = dynamicPartitionFilters.reduce(And) diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index b8a14701bb90d..cfbf91bc2188b 100644 --- a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -122,6 +122,20 @@ class FileSourceScanExecShim( val dynamicPartitionFilters = partitionFilters.filter(isDynamicPruningFilter) val selected = if (dynamicPartitionFilters.nonEmpty) { + // When it includes some DynamicPruningExpression, + // it needs to execute InSubqueryExec first, + // because doTransform path can't execute 'doExecuteColumnar' which will + // execute prepare subquery first. + dynamicPartitionFilters.foreach { + case DynamicPruningExpression(inSubquery: InSubqueryExec) => + if (inSubquery.values().isEmpty) inSubquery.updateResult() + case e: Expression => + e.foreach { + case s: ScalarSubquery => s.updateResult() + case _ => + } + case _ => + } GlutenTimeMetric.withMillisTime { // call the file index for the files matching all filters except dynamic partition filters val predicate = dynamicPartitionFilters.reduce(And) diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index ede6570b07774..0a62c41a69df9 100644 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -88,6 +88,20 @@ class FileSourceScanExecShim( val dynamicPartitionFilters = partitionFilters.filter(isDynamicPruningFilter) val selected = if (dynamicPartitionFilters.nonEmpty) { + // When it includes some DynamicPruningExpression, + // it needs to execute InSubqueryExec first, + // because doTransform path can't execute 'doExecuteColumnar' which will + // execute prepare subquery first. + dynamicPartitionFilters.foreach { + case DynamicPruningExpression(inSubquery: InSubqueryExec) => + if (inSubquery.values().isEmpty) inSubquery.updateResult() + case e: Expression => + e.foreach { + case s: ScalarSubquery => s.updateResult() + case _ => + } + case _ => + } GlutenTimeMetric.withMillisTime { // call the file index for the files matching all filters except dynamic partition filters val predicate = dynamicPartitionFilters.reduce(And)