Skip to content

Commit

Permalink
[VL] fix wrong result for delta deletion vector (#4614)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhli1142015 authored Feb 2, 2024
1 parent a58ebd6 commit e9bc041
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package io.glutenproject.execution

import io.glutenproject.extension.ValidationResult
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -49,6 +50,14 @@ class DeltaScanTransformer(

override lazy val fileFormat: ReadFileFormat = ReadFileFormat.ParquetReadFormat

override protected def doValidateInternal(): ValidationResult = {
if (requiredSchema.fields.exists(_.name == "__delta_internal_is_row_deleted")) {
return ValidationResult.notOk(s"Deletion vector is not supported in native.")
}

super.doValidateInternal()
}

}

object DeltaScanTransformer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class VeloxDeltaSuite extends WholeStageTransformerSuite {
}
}

testWithSpecifiedSparkVersion("column mapping with complex type") {
test("column mapping with complex type") {
withTable("t1") {
val simpleNestedSchema = new StructType()
.add("a", StringType, true)
Expand Down Expand Up @@ -183,4 +183,21 @@ class VeloxDeltaSuite extends WholeStageTransformerSuite {
)
}
}

testWithSpecifiedSparkVersion("deletion vector", Some("3.4.2")) {
withTempPath {
p =>
import testImplicits._
val path = p.getCanonicalPath
val df1 = Seq(1, 2, 3, 4, 5).toDF("id")
val values2 = Seq(6, 7, 8, 9, 10)
val df2 = values2.toDF("id")
df1.union(df2).coalesce(1).write.format("delta").save(path)
spark.sql(
s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)")
checkAnswer(spark.read.format("delta").load(path), df1.union(df2))
spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (${values2.mkString(", ")})")
checkAnswer(spark.read.format("delta").load(path), df1)
}
}
}

0 comments on commit e9bc041

Please sign in to comment.