-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DeltaLake DeletionVector Scan support for Databricks 14.3 [databricks] #11964
base: branch-25.02
Are you sure you want to change the base?
Add DeltaLake DeletionVector Scan support for Databricks 14.3 [databricks] #11964
Conversation
Signed-off-by: Raza Jafri <[email protected]>
… which will also be based off of 350
…ases end in db Revert the change in pom to remove 350db143 shim
This commit xfails the delta-lake tests that fail on databricks 14.3. This is for the sake of temporary expediency. These tests will be revisited and triaged for actual fixes. Signed-off-by: MithunR <[email protected]>
e6fde7b
to
c34d4e1
Compare
…tests XFail delta-lake tests failing on databricks 14.3
Signed-off-by: Tim Liu <[email protected]>
build |
Enabled the pre-merge CI to run DB14.3 integration tests, Please feel free to revert it if you do not want enable it, thanks! @razajafri |
Force Delta Lake reads on Databricks 14.3 to bypass perFile check
This reverts commit 160b746.
build |
@@ -18,7 +18,6 @@ | |||
{"spark": "330db"} | |||
{"spark": "332db"} | |||
{"spark": "341db"} | |||
{"spark": "350db143"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update copyrights
@@ -1,5 +1,5 @@ | |||
/* | |||
* Copyright (c) 2024, NVIDIA CORPORATION. | |||
* Copyright (c) 2022-2024, NVIDIA CORPORATION. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update copyrights
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not finished my review yet. I see a lot of formatting changes and function ordering
changes between the open source delta lake file for 31x
and our new one that makes it difficult to see exactly what is changed/added vs open source. Not the end of the world, it just makes it go more slowly. I also have been comparing it to the 24x version in our repo so it is kind of slow going.
@@ -32,8 +32,8 @@ abstract class DeltaProviderImplBase extends DeltaProvider { | |||
), | |||
GpuOverrides.exec[RapidsDeltaWriteExec]( | |||
"GPU write into a Delta Lake table", | |||
ExecChecks.hiddenHack(), | |||
(wrapped, conf, p, r) => new RapidsDeltaWriteExecMeta(wrapped, conf, p, r)).invisible() | |||
ExecChecks(TypeSig.all, TypeSig.all), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really support all of the types that this can support?
@@ -127,10 +127,10 @@ object GpuDeltaParquetFileFormatUtils { | |||
} | |||
} | |||
|
|||
withResource(table) { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Why make this change at all? It is just white space.
@@ -0,0 +1,102 @@ | |||
<?xml version="1.0" encoding="UTF-8"?> | |||
<!-- | |||
Copyright (c) 2023-2025 NVIDIA CORPORATION. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I know this was mostly copied from another place, but this shows up as a new file and so the copyright should technically start in 2025.
override def tagSupportForGpuFileSourceScan(meta: SparkPlanMeta[FileSourceScanExec]): Unit = { | ||
val format = meta.wrapped.relation.fileFormat | ||
if (format.getClass == classOf[DeltaParquetFileFormat]) { | ||
// val deltaFormat = format.asInstanceOf[DeltaParquetFileFormat] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: These comments need to be removed. Similarly are there plans to back port deletion vector support to databricks deltalake versions that support them? Like 23x+?
<groupId>com.nvidia</groupId> | ||
<artifactId>rapids-4-spark-jdk-profiles_2.12</artifactId> | ||
<version>25.02.0-SNAPSHOT</version> | ||
<relativePath>../../jdk-profiles/pom.xml</relativePath> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most versions of the pom.xml have this
spark-rapids/delta-lake/delta-20x/pom.xml
Line 26 in c82a2ea
<relativePath>../../jdk-profiles/pom.xml</relativePath> |
and 23x has a different path
spark-rapids/delta-lake/delta-23x/pom.xml
Line 26 in c82a2ea
<relativePath>../../pom.xml</relativePath> |
Is that a bug in 23x?
options, | ||
hadoopConf, | ||
metrics) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In 24x there was some code here.
< val delVecs = broadcastDvMap
< val maxDelVecScatterBatchSize = RapidsConf
< .DELTA_LOW_SHUFFLE_MERGE_SCATTER_DEL_VECTOR_BATCH_SIZE
< .get(sparkSession.sessionState.conf)
< val delVecScatterTimeMetric = metrics(GpuMetric.DELETION_VECTOR_SCATTER_TIME)
< val delVecSizeMetric = metrics(GpuMetric.DELETION_VECTOR_SIZE)
did that move somewhere else? Is it not needed any more? I get that this has to do with low shuffle merge, I just want to understand what is going on with it.
|
||
val schemaWithIndices = requiredSchema.fields.zipWithIndex | ||
def findColumn(name: String): Option[ColumnMetadata] = { | ||
val results = schemaWithIndices.filter(_._1.name == name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are case insensitive column names a thing we need to worry about with this? I don't think so with how it is used, but I would like to understand.
// We don't have any additional columns to generate, just return the original reader as is. | ||
if (isRowDeletedColumn.isEmpty && rowIndexColumn.isEmpty) return dataReader | ||
require(!isSplittable, "Cannot generate row index related metadata with file splitting") | ||
require(disablePushDowns, "Cannot generate row index related metadata with filter pushdown") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same thing as my comment above. Are we planning to force these to be set correctly? If not then we need to fall back to the CPU if we cannot support it. If so where is that happening?
s"${IS_ROW_DELETED_COLUMN_NAME} in the schema") | ||
} | ||
|
||
// val serializableHadoopConf = new SerializableConfiguration(hadoopConf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Please delete this if it is not needed.
import org.apache.spark.util.SerializableConfiguration | ||
|
||
case class GpuDelta31xParquetFileFormat( | ||
override val columnMappingMode: DeltaColumnMappingMode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the open source 3.1.0 and the 24x version of the plugin pass in a metadata class instead of these values directly. Is there a reason we have changed this?
This PR adds support for reading deletion vectors for the PERFILE reader.
contributes to #8654