Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DeltaLake DeletionVector Scan support for Databricks 14.3 [databricks] #11964

Draft
wants to merge 55 commits into
base: branch-25.02
Choose a base branch
from

Conversation

razajafri
Copy link
Collaborator

@razajafri razajafri commented Jan 14, 2025

This PR adds support for reading deletion vectors for the PERFILE reader.

contributes to #8654

Signed-off-by: Raza Jafri <[email protected]>
…ases end in db

Revert the change in pom to remove 350db143 shim
@razajafri razajafri changed the title Add deletion vectors read support for Databricks 14.3 [databricks] Add DeltaLake DeletionVector Scan support for Databricks 14.3 [databricks] Jan 14, 2025
razajafri and others added 3 commits January 17, 2025 03:26
This commit xfails the delta-lake tests that fail on databricks 14.3.

This is for the sake of temporary expediency.  These tests will be
revisited and triaged for actual fixes.

Signed-off-by: MithunR <[email protected]>
@razajafri razajafri force-pushed the SP-10661-db-14.3-deletion-vectors branch 2 times, most recently from e6fde7b to c34d4e1 Compare January 19, 2025 21:52
@pxLi
Copy link
Collaborator

pxLi commented Jan 23, 2025

build

@NvTimLiu
Copy link
Collaborator

NvTimLiu commented Jan 23, 2025

Enabled the pre-merge CI to run DB14.3 integration tests,
160b746

https://nvidia.slack.com/archives/C021GR1KTDY/p1737601230184499?thread_ts=1737422476.340569&cid=C021GR1KTDY

Please feel free to revert it if you do not want enable it, thanks! @razajafri

@razajafri
Copy link
Collaborator Author

build

@@ -18,7 +18,6 @@
{"spark": "330db"}
{"spark": "332db"}
{"spark": "341db"}
{"spark": "350db143"}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update copyrights

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update copyrights

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not finished my review yet. I see a lot of formatting changes and function ordering
changes between the open source delta lake file for 31x

https://github.com/delta-io/delta/blob/v3.1.0/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala

and our new one that makes it difficult to see exactly what is changed/added vs open source. Not the end of the world, it just makes it go more slowly. I also have been comparing it to the 24x version in our repo so it is kind of slow going.

@@ -32,8 +32,8 @@ abstract class DeltaProviderImplBase extends DeltaProvider {
),
GpuOverrides.exec[RapidsDeltaWriteExec](
"GPU write into a Delta Lake table",
ExecChecks.hiddenHack(),
(wrapped, conf, p, r) => new RapidsDeltaWriteExecMeta(wrapped, conf, p, r)).invisible()
ExecChecks(TypeSig.all, TypeSig.all),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really support all of the types that this can support?

@@ -127,10 +127,10 @@ object GpuDeltaParquetFileFormatUtils {
}
}

withResource(table) { _ =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why make this change at all? It is just white space.

@@ -0,0 +1,102 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2023-2025 NVIDIA CORPORATION.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I know this was mostly copied from another place, but this shows up as a new file and so the copyright should technically start in 2025.

override def tagSupportForGpuFileSourceScan(meta: SparkPlanMeta[FileSourceScanExec]): Unit = {
val format = meta.wrapped.relation.fileFormat
if (format.getClass == classOf[DeltaParquetFileFormat]) {
// val deltaFormat = format.asInstanceOf[DeltaParquetFileFormat]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: These comments need to be removed. Similarly are there plans to back port deletion vector support to databricks deltalake versions that support them? Like 23x+?

<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-jdk-profiles_2.12</artifactId>
<version>25.02.0-SNAPSHOT</version>
<relativePath>../../jdk-profiles/pom.xml</relativePath>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most versions of the pom.xml have this

<relativePath>../../jdk-profiles/pom.xml</relativePath>

and 23x has a different path

<relativePath>../../pom.xml</relativePath>

Is that a bug in 23x?

options,
hadoopConf,
metrics)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 24x there was some code here.

<     val delVecs = broadcastDvMap
<     val maxDelVecScatterBatchSize = RapidsConf
<       .DELTA_LOW_SHUFFLE_MERGE_SCATTER_DEL_VECTOR_BATCH_SIZE
<       .get(sparkSession.sessionState.conf)
<     val delVecScatterTimeMetric = metrics(GpuMetric.DELETION_VECTOR_SCATTER_TIME)
<     val delVecSizeMetric = metrics(GpuMetric.DELETION_VECTOR_SIZE)

did that move somewhere else? Is it not needed any more? I get that this has to do with low shuffle merge, I just want to understand what is going on with it.


val schemaWithIndices = requiredSchema.fields.zipWithIndex
def findColumn(name: String): Option[ColumnMetadata] = {
val results = schemaWithIndices.filter(_._1.name == name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are case insensitive column names a thing we need to worry about with this? I don't think so with how it is used, but I would like to understand.

// We don't have any additional columns to generate, just return the original reader as is.
if (isRowDeletedColumn.isEmpty && rowIndexColumn.isEmpty) return dataReader
require(!isSplittable, "Cannot generate row index related metadata with file splitting")
require(disablePushDowns, "Cannot generate row index related metadata with filter pushdown")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same thing as my comment above. Are we planning to force these to be set correctly? If not then we need to fall back to the CPU if we cannot support it. If so where is that happening?

s"${IS_ROW_DELETED_COLUMN_NAME} in the schema")
}

// val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please delete this if it is not needed.

import org.apache.spark.util.SerializableConfiguration

case class GpuDelta31xParquetFileFormat(
override val columnMappingMode: DeltaColumnMappingMode,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the open source 3.1.0 and the 24x version of the plugin pass in a metadata class instead of these values directly. Is there a reason we have changed this?

https://github.com/delta-io/delta/blob/71b09f0027c2940806ad2022a6b9fcd10505f3fd/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala#L57

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants