Skip to content

Commit

Permalink
[VL] Enable native write files by default for Spark3.4 in VL backend (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you authored Jan 12, 2024
1 parent 8b4f86a commit 5189633
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,8 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
options: Map[String, String]): Option[String] = Some("Unsupported")

override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.glutenproject.backendsapi._
import io.glutenproject.execution.WriteFilesExecTransformer
import io.glutenproject.expression.WindowFunctionsBuilder
import io.glutenproject.extension.ValidationResult
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, OrcReadFormat, ParquetReadFormat}

Expand Down Expand Up @@ -400,4 +401,10 @@ object BackendSettings extends BackendSettingsApi {
override def supportTransformWriteFiles: Boolean = true

override def allowDecimalArithmetic: Boolean = SQLConf.get.decimalOperationsAllowPrecisionLoss

override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.getConf.enableNativeWriter.getOrElse(
SparkShimLoader.getSparkShims.enableNativeWriteFilesByDefault()
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ class VeloxTPCHV2BhjSuite extends VeloxTPCHSuite {
}

class VeloxPartitionedTableTPCHSuite extends VeloxTPCHSuite {

override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.memory.offHeap.size", "4g")
}

override protected def createTPCHNotNullTables(): Unit = {
TPCHTableDataFrames = TPCHTables.map {
table =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.utils.FallbackUtil

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.lit

import org.junit.Assert

Expand Down Expand Up @@ -114,15 +113,4 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
Assert.assertTrue(FallbackUtil.hasFallback(df.queryExecution.executedPlan))
}
}

ignore("parquet write with empty dataframe") {
withTempPath {
f =>
val df = spark.emptyDataFrame.select(lit(1).as("i"))
df.write.format("parquet").save(f.getCanonicalPath)
val res = spark.read.parquet(f.getCanonicalPath)
checkAnswer(res, Nil)
assert(res.schema.asNullable == df.schema.asNullable)
}
}
}
6 changes: 5 additions & 1 deletion cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ const std::string kBloomFilterNumBits = "spark.gluten.sql.columnar.backend.velox
const std::string kBloomFilterMaxNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits";
const std::string kVeloxSplitPreloadPerDriver = "spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver";

// write fies
const std::string kMaxPartitions = "spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession";

// metrics
const std::string kDynamicFiltersProduced = "dynamicFiltersProduced";
const std::string kDynamicFiltersAccepted = "dynamicFiltersAccepted";
Expand Down Expand Up @@ -433,7 +436,8 @@ std::shared_ptr<velox::Config> WholeStageResultIterator::createConnectorConfig()
configs[velox::connector::hive::HiveConfig::kFileColumnNamesReadAsLowerCaseSession] =
veloxCfg_->get<bool>(kCaseSensitive, false) == false ? "true" : "false";
configs[velox::connector::hive::HiveConfig::kArrowBridgeTimestampUnit] = "6";
configs[velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession] = "400";
configs[velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession] =
std::to_string(veloxCfg_->get<int32_t>(kMaxPartitions, 10000));

return std::make_shared<velox::core::MemConfig>(configs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,6 @@ trait BackendSettingsApi {
def requiredInputFilePaths(): Boolean = false

def enableBloomFilterAggFallbackRule(): Boolean = true

def enableNativeWriteFiles(): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
val enableTakeOrderedAndProject: Boolean =
!scanOnly && columnarConf.enableTakeOrderedAndProject &&
enableColumnarSort && enableColumnarLimit && enableColumnarShuffle && enableColumnarProject
val enableColumnarWrite: Boolean = columnarConf.enableNativeWriter
val enableColumnarWrite: Boolean = BackendsApiManager.getSettings.enableNativeWriteFiles()

def apply(plan: SparkPlan): SparkPlan = {
addTransformableTags(plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.spark.sql.execution.datasources

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.execution.ColumnarToRowExecBase
import io.glutenproject.extension.GlutenPlan
Expand Down Expand Up @@ -90,7 +89,7 @@ object GlutenWriterColumnarRules {
val parquetHiveFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
val orcHiveFormat = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"

if (!GlutenConfig.getConf.enableNativeWriter) {
if (!BackendsApiManager.getSettings.enableNativeWriteFiles()) {
return None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.spark.sql

import io.glutenproject.GlutenConfig
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.utils.{BackendTestUtils, SystemParameters}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -59,14 +58,6 @@ trait GlutenSQLTestsBaseTrait extends SharedSparkSession with GlutenTestsBaseTra
// .set("spark.sql.optimizer.excludedRules", ConstantFolding.ruleName + "," +
// NullPropagation.ruleName)

if (
BackendTestUtils.isVeloxBackendLoaded() &&
SparkShimLoader.getSparkVersion.startsWith("3.4")
) {
// Enable velox native write in spark 3.4
conf.set("spark.gluten.sql.native.writer.enabled", "true")
}

if (BackendTestUtils.isCHBackendLoaded()) {
conf
.set("spark.io.compression.codec", "LZ4")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,8 +663,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenParquetIOSuite]
// Velox doesn't write file metadata into parquet file.
.exclude("Write Spark version into Parquet metadata")
// Spark except exception but not occur in velox.
.exclude("SPARK-7837 Do not close output writer twice when commitTask() fails")
// Disable Spark's vectorized reading tests.
.exclude("Standard mode - fixed-length decimals")
.exclude("Legacy mode - fixed-length decimals")
Expand Down Expand Up @@ -708,8 +706,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude(("Various inferred partition value types"))
enableSuite[GlutenParquetProtobufCompatibilitySuite]
enableSuite[GlutenParquetV1QuerySuite]
// Velox convert the null as minimum value of int, which cause the partition dir is not align with spark.
.exclude("SPARK-11997 parquet with null partition values")
// Only for testing a type mismatch issue caused by hive (before hive 2.2).
// Only reproducible when spark.sql.parquet.enableVectorizedReader=true.
.exclude("SPARK-16632: read Parquet int32 as ByteType and ShortType")
Expand All @@ -729,8 +725,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude(
"SPARK-26677: negated null-safe equality comparison should not filter matched row groups")
enableSuite[GlutenParquetV2QuerySuite]
// Velox convert the null as minimum value of int, which cause the partition dir is not align with spark.
.exclude("SPARK-11997 parquet with null partition values")
// Only for testing a type mismatch issue caused by hive (before hive 2.2).
// Only reproducible when spark.sql.parquet.enableVectorizedReader=true.
.exclude("SPARK-16632: read Parquet int32 as ByteType and ShortType")
Expand Down Expand Up @@ -938,7 +932,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenFilteredScanSuite]
enableSuite[GlutenFiltersSuite]
enableSuite[GlutenInsertSuite]
// Spark except exception but not occur in velox.
// the native write staing dir is differnt with vanilla Spark for coustom partition paths
.exclude("SPARK-35106: Throw exception when rename custom partition paths returns false")
.exclude("Stop task set if FileAlreadyExistsException was thrown")
.exclude("INSERT rows, ALTER TABLE ADD COLUMNS with DEFAULTs, then SELECT them")
Expand Down Expand Up @@ -1120,12 +1114,8 @@ class VeloxTestSettings extends BackendTestSettings {
// following UT is removed in spark3.3.1
// enableSuite[GlutenSimpleShowCreateTableSuite]
enableSuite[GlutenFileSourceSQLInsertTestSuite]
// velox convert string null as -1583242847, which is not same with spark.
.exclude("SPARK-30844: static partition should also follow StoreAssignmentPolicy")
enableSuite[GlutenDSV2SQLInsertTestSuite]
enableSuite[GlutenSQLQuerySuite]
// Velox doesn't support spark.sql.optimizer.metadataOnly config.
.exclude("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly")
// Decimal precision exceeds.
.exclude("should be able to resolve a persistent view")
// Unstable. Needs to be fixed.
Expand All @@ -1151,15 +1141,7 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-38173: Quoted column cannot be recognized correctly when quotedRegexColumnNames is true")
enableSuite[GlutenSQLQueryTestSuite]
enableSuite[GlutenStatisticsCollectionSuite]
// The following five unit tests failed after enabling native table write, because the velox convert the null to Timestamp
// with minimum value of int, which will cause overflow when calling toMicro() method.
.exclude("column stats collection for null columns")
.exclude("store and retrieve column stats in different time zones")
.exclude(
"SPARK-38140: describe column stats (min, max) for timestamp column: desc results should be consistent with the written value if writing and desc happen in the same time zone")
.exclude(
"SPARK-38140: describe column stats (min, max) for timestamp column: desc should show different results if writing in UTC and desc in other time zones")
.exclude("Gluten - store and retrieve column stats in different time zones")
// The output byte size of Velox is different
.exclude("SPARK-33687: analyze all tables in a specific database")
enableSuite[GlutenSubquerySuite]
.excludeByPrefix(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class GlutenSQLQueryTestSuite
"datetime-parsing.sql",
"datetime-special.sql",
// "decimalArithmeticOperations.sql",
"describe-part-after-analyze.sql",
// "describe-part-after-analyze.sql",
"describe-query.sql",
"describe-table-after-alter-table.sql",
"describe-table-column.sql",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.util.QueryExecutionListener

import java.io.File
import org.apache.hadoop.fs.{Path, RawLocalFileSystem}

import java.io.{File, IOException}

class GlutenInsertSuite extends InsertSuite with GlutenSQLTestsBaseTrait {

Expand Down Expand Up @@ -208,4 +210,38 @@ class GlutenInsertSuite extends InsertSuite with GlutenSQLTestsBaseTrait {
checkAnswer(spark.sql("SELECT * FROM t"), spark.sql("SELECT * FROM source SORT BY c1"))
}
}

test("Gluten: SPARK-35106: Throw exception when rename custom partition paths returns false") {
withSQLConf(
"fs.file.impl" -> classOf[
GlutenRenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem].getName,
"fs.file.impl.disable.cache" -> "true") {
withTempPath {
path =>
withTable("t") {
sql("""
|create table t(i int, part1 int, part2 int) using parquet
|partitioned by (part1, part2)
""".stripMargin)

sql(s"alter table t add partition(part1=1, part2=1) location '${path.getAbsolutePath}'")

val e = intercept[IOException] {
sql(s"insert into t partition(part1=1, part2=1) select 1")
}
assert(e.getMessage.contains("Failed to rename"))
}
}
}
}
}

class GlutenRenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem extends RawLocalFileSystem {
override def rename(src: Path, dst: Path): Boolean = {
(!isSparkStagingDir(src) || isSparkStagingDir(dst)) && super.rename(src, dst)
}

private def isSparkStagingDir(path: Path): Boolean = {
path.toString.contains("_temporary")
}
}
14 changes: 12 additions & 2 deletions shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_PCT)
def abandonFlushableAggregationMinRows: Option[Int] =
conf.getConf(ABANDON_PARTIAL_AGGREGATION_MIN_ROWS)
def enableNativeWriter: Boolean = conf.getConf(NATIVE_WRITER_ENABLED)

// Please use `BackendsApiManager.getSettings.enableNativeWriteFiles()` instead
def enableNativeWriter: Option[Boolean] = conf.getConf(NATIVE_WRITER_ENABLED)

def directorySizeGuess: Option[Int] =
conf.getConf(DIRECTORY_SIZE_GUESS)
Expand Down Expand Up @@ -1201,6 +1203,14 @@ object GlutenConfig {
.checkValues(Set("local", "heap-over-local"))
.createWithDefaultString("local")

val MAX_PARTITION_PER_WRITERS_SESSION =
buildConf("spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession")
.internal()
.doc("Maximum number of partitions per a single table writer instance.")
.intConf
.checkValue(_ > 0, "must be a positive number")
.createWithDefault(10000)

val COLUMNAR_CH_SHUFFLE_PREFER_SPILL_ENABLED =
buildConf("spark.gluten.sql.columnar.backend.ch.shuffle.preferSpill")
.internal()
Expand Down Expand Up @@ -1307,7 +1317,7 @@ object GlutenConfig {
.internal()
.doc("This is config to specify whether to enable the native columnar parquet/orc writer")
.booleanConf
.createWithDefault(false)
.createOptional

val REMOVE_NATIVE_WRITE_FILES_SORT_AND_PROJECT =
buildConf("spark.gluten.sql.removeNativeWriteFilesSortAndProject")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.Table
Expand All @@ -31,7 +31,6 @@ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex, WriteJobDescription, WriteTaskResult}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand Down Expand Up @@ -101,4 +100,6 @@ trait SparkShims {
iterator: Iterator[InternalRow]): WriteTaskResult = {
throw new UnsupportedOperationException()
}

def enableNativeWriteFilesByDefault(): Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,6 @@ class Spark34Shims extends SparkShims {
iterator
)
}

override def enableNativeWriteFilesByDefault(): Boolean = true
}

0 comments on commit 5189633

Please sign in to comment.