Skip to content

Commit

Permalink
Preserve enablement for converting Iceberg table with bucket partitio…
Browse files Browse the repository at this point in the history
…n to unpartitioned delta table in table property (#4148)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

## Description
Once bucket partition to nonpartition is enabled for a table, the clone
from iceberg table will always convert bucket partition to non-partition
for that table, regardless of if the flag is turned off or not in the
future. This is to make those tables forward compatible with future
Deltas where "truly" bucket conversion is supported so they will keep
the old behavior and won't make regressions

## How was this patch tested?
Existing UTs
  • Loading branch information
ChengJi-db authored Feb 13, 2025
1 parent c3f2cb0 commit 35e47fd
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ class IcebergTable(
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_PARTITION_EVOLUTION_ENABLED)

private val bucketPartitionEnabled =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_BUCKET_PARTITION_ENABLED)
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_BUCKET_PARTITION_ENABLED) ||
deltaSnapshot.exists(s =>
DeltaConfigs.IGNORE_ICEBERG_BUCKET_PARTITION.fromMetaData(s.metadata)
)

// When a table is CLONED/federated with the session conf ON, it will have the table property
// set and will continue to support CAST TIME TYPE even when later the session conf is OFF.
Expand Down Expand Up @@ -101,8 +104,15 @@ class IcebergTable(
} else {
None
}
val bucketPartitionToNonPartition = if (bucketPartitionEnabled) {
Some((DeltaConfigs.IGNORE_ICEBERG_BUCKET_PARTITION.key -> "true"))
} else {
None
}
icebergTable.properties().asScala.toMap + (DeltaConfigs.COLUMN_MAPPING_MODE.key -> "id") +
(DeltaConfigs.LOG_RETENTION.key -> s"$maxSnapshotAgeMs millisecond") ++ castTimeTypeConf
(DeltaConfigs.LOG_RETENTION.key -> s"$maxSnapshotAgeMs millisecond") ++
castTimeTypeConf ++
bucketPartitionToNonPartition
}

override val partitionSchema: StructType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,15 @@ trait DeltaConfigsBase extends DeltaLogging {
validationFunction = _ => true,
helpMessage = "Casting Iceberg TIME type to Spark Long type enabled"
)

val IGNORE_ICEBERG_BUCKET_PARTITION = buildConfig[Boolean](
key = "ignoreIcebergBucketPartition",
defaultValue = "false",
fromString = _.toBoolean,
validationFunction = _ => true,
helpMessage = "Ignore Iceberg bucket partition, which means " +
"converting source iceberg table to a non-partition delta table"
)
/**
* Enable optimized writes into a Delta table. Optimized writes adds an adaptive shuffle before
* the write to write compacted files into a Delta table during a write.
Expand Down

0 comments on commit 35e47fd

Please sign in to comment.