Skip to content

Commit

Permalink
drafted change
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJi-db committed Feb 27, 2025
1 parent 0fbe9b9 commit b3df5a7
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,11 @@ object IcebergPartitionUtil {
def hasBucketPartition(partSpec: PartitionSpec): Boolean = {
partSpec.fields.asScala.toSeq.exists(spec => spec.transform().isInstanceOf[Bucket[_]])
}

// return true if the partition spec has a partition that is not a bucket partition
def hasNonBucketPartition(partSpec: PartitionSpec): Boolean = {
partSpec.isPartitioned && partSpec.fields().asScala.exists { field =>
!field.transform().isInstanceOf[Bucket[_]]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package org.apache.spark.sql.delta.commands.convert

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.delta.{DeltaColumnMapping, SerializableFileStatus}
import org.apache.spark.sql.delta.DeltaErrors.cloneFromIcebergSourceWithPartitionEvolution
import org.apache.spark.sql.delta.commands.convert.IcebergTable.ERR_MULTIPLE_PARTITION_SPECS
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.hadoop.fs.Path
import org.apache.iceberg.{BaseTable, DataFile, DataFiles, FileContent, FileFormat, ManifestContent, ManifestFile, ManifestFiles, PartitionData, PartitionSpec, RowLevelOperationMode, Schema, StructLike, Table, TableProperties}
import org.apache.iceberg.transforms.IcebergPartitionUtil

import org.apache.spark.SparkThrowable
import org.apache.spark.internal.{LoggingShims, MDC}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.types.StructType
Expand All @@ -44,6 +49,14 @@ class IcebergFileManifest(

private var _sizeInBytes: Option[Long] = None

private val specIdsToIfSpecHasNonBucketPartition =
table.specs().asScala.map { case (specId, spec) =>
specId.toInt -> IcebergPartitionUtil.hasNonBucketPartition(spec)
}

private val partitionEvolutionEnabled =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_PARTITION_EVOLUTION_ENABLED)

val basePath = table.location()

override def numFiles: Long = {
Expand Down Expand Up @@ -93,6 +106,10 @@ class IcebergFileManifest(
}

// Localize variables so we don't need to serialize the File Manifest class
// Some contexts: Spark needs all variables in closure to be serializable
// while class members carry the entire class, so they require serialization of the class
// As IcebergFileManifest is not serializable,
// we localize member variables to avoid serialization of the class
val localTable = table
// We use the latest snapshot timestamp for all generated Delta AddFiles due to the fact that
// retrieving timestamp for each DataFile is non-trivial time-consuming. This can be improved
Expand All @@ -102,23 +119,35 @@ class IcebergFileManifest(
val shouldConvertPartition = spark.sessionState.conf
.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_USE_NATIVE_PARTITION_VALUES)
val convertPartition = if (shouldConvertPartition) {
new IcebergPartitionConverter(localTable, partitionSchema)
new IcebergPartitionConverter(localTable, partitionSchema, partitionEvolutionEnabled)
} else {
null
}

val shouldConvertStats = convertStats

val shouldCheckPartitionEvolution = !partitionEvolutionEnabled
val specIdsToIfSpecHasNonBucketPartitionMap = specIdsToIfSpecHasNonBucketPartition
val tableSpecsSize = table.specs().size()

val manifestFiles = localTable
.currentSnapshot()
.dataManifests(localTable.io())
.asScala
.map(new ManifestFileWrapper(_))
.toSeq

spark
.createDataset(manifestFiles)
.flatMap(ManifestFiles.read(_, localTable.io()).asScala.map(new DataFileWrapper(_)))
.map { dataFile: DataFileWrapper =>
if (shouldCheckPartitionEvolution) {
IcebergFileManifest.validateLimitedPartitionEvolution(
dataFile.specId,
tableSpecsSize,
specIdsToIfSpecHasNonBucketPartitionMap
)
}
ConvertTargetFile(
SerializableFileStatus(
path = dataFile.path,
Expand All @@ -143,3 +172,34 @@ class IcebergFileManifest(
fileSparkResults = None
}
}

object IcebergFileManifest {
// scalastyle:off
/**
* Validates on partition evolution for proposed partitionSpecId
* We don't support the conversion of tables with partition evolution
*
* However, we allow one special case where
* all data files have either no-partition or bucket-partition
* regardless of multiple partition spec present in the table
*/
// scalastyle:on
private def validateLimitedPartitionEvolution(
partitionSpecId: Int,
tableSpecsSize: Int,
specIdsToIfSpecHasNonBucketPartition: mutable.Map[Int, Boolean]): Unit = {
if (hasPartitionEvolved(
partitionSpecId, tableSpecsSize, specIdsToIfSpecHasNonBucketPartition)
) {
throw cloneFromIcebergSourceWithPartitionEvolution()
}
}

private def hasPartitionEvolved(
partitionSpecID: Int,
tableSpecsSize: Int,
specIdsToIfSpecHasNonBucketPartition: mutable.Map[Int, Boolean]): Boolean = {
val isSpecPartitioned = specIdsToIfSpecHasNonBucketPartition(partitionSpecID)
isSpecPartitioned && tableSpecsSize > 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.DeltaColumnMapping
import org.apache.spark.sql.delta.util.{DateFormatter, TimestampFormatter}
import org.apache.iceberg.{PartitionData, PartitionField, Schema, StructLike, Table}
import org.apache.iceberg.{PartitionData, PartitionField, PartitionSpec, Schema, StructLike, Table}
import org.apache.iceberg.transforms.IcebergPartitionUtil
import org.apache.iceberg.types.{Conversions, Type => IcebergType}
import org.apache.iceberg.types.Type.{PrimitiveType => IcebergPrimitiveType, TypeID}
Expand Down Expand Up @@ -69,9 +69,17 @@ case class IcebergPartitionConverter(
val timestampFormatter: TimestampFormatter =
TimestampFormatter(ConvertUtils.timestampPartitionPattern, java.util.TimeZone.getDefault)

def this(table: Table, partitionSchema: StructType) =
def this(table: Table, partitionSchema: StructType, partitionEvolutionEnabled: Boolean) =
this(table.schema(),
IcebergPartitionConverter.physicalNameToPartitionField(table, partitionSchema))
// We only allow empty partition when partition evolution happened
// This is an extra safety mechanism as we should have already passed
// a non-bucket partitionSchema when table has >1 specs
if (table.specs().size() > 1 && !partitionEvolutionEnabled) {
Map.empty[String, PartitionField]
} else {
IcebergPartitionConverter.physicalNameToPartitionField(table, partitionSchema)
}
)

/**
* Convert an Iceberg [[PartitionData]] into a Map of (columnID -> partitionValue) used by Delta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import java.util.Locale
import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaColumnMappingMode, DeltaConfigs, IdMapping, SerializableFileStatus, Snapshot}
import org.apache.spark.sql.delta.DeltaErrors.{cloneFromIcebergSourceWithoutSpecs, cloneFromIcebergSourceWithPartitionEvolution}
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.iceberg.{Table, TableProperties}
import org.apache.iceberg.{PartitionSpec, Table, TableProperties}
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.transforms.{Bucket, IcebergPartitionUtil}
import org.apache.iceberg.util.PropertyUtil
Expand Down Expand Up @@ -76,7 +77,7 @@ class IcebergTable(
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_CAST_TIME_TYPE) ||
deltaSnapshot.exists(s => DeltaConfigs.CAST_ICEBERG_TIME_TYPE.fromMetaData(s.metadata))

protected val fieldPathToPhysicalName =
protected val fieldPathToPhysicalName: Map[Seq[String], String] =
existingSchema.map {
SchemaMergingUtils.explode(_).collect {
case (path, field) if DeltaColumnMapping.hasPhysicalName(field) =>
Expand Down Expand Up @@ -115,11 +116,33 @@ class IcebergTable(
bucketPartitionToNonPartition
}

val tablePartitionSpec: PartitionSpec = {
// Validate && Get Partition Spec from Iceberg table
// We don't support conversion from iceberg tables with partition evolution
// So normally we only allow table having one partition spec
//
// However, we allow one special case where
// all data files have either no-partition or bucket-partition
// in this case we will convert them into non-partition, so
// we will use an arbitrary non-bucket-partition spec as table's spec
if (icebergTable.specs().size() == 1 || partitionEvolutionEnabled || !bucketPartitionEnabled) {
icebergTable.spec()
} else if (icebergTable.specs().isEmpty) {
throw cloneFromIcebergSourceWithoutSpecs()
} else {
icebergTable.specs().asScala.values.find(
!IcebergPartitionUtil.hasNonBucketPartition(_)
).getOrElse {
throw cloneFromIcebergSourceWithPartitionEvolution()
}
}
}

override val partitionSchema: StructType = {
// Reuse physical names of existing columns.
val mergedPartitionSchema = DeltaColumnMapping.setPhysicalNames(
StructType(
IcebergPartitionUtil.getPartitionFields(icebergTable.spec(), icebergTable.schema())),
IcebergPartitionUtil.getPartitionFields(tablePartitionSpec, icebergTable.schema())),
fieldPathToPhysicalName)

// Assign physical names to new partition columns.
Expand Down Expand Up @@ -148,20 +171,6 @@ class IcebergTable(
override val format: String = "iceberg"

def checkConvertible(): Unit = {
/**
* Having multiple partition specs implies that the Iceberg table has experienced
* partition evolution. (https://iceberg.apache.org/evolution/#partition-evolution)
* We don't support the conversion of such tables right now.
*
* Note that this simple check won't consider the underlying data, so there might be cases
* s.t. the data itself is partitioned using a single spec despite multiple specs created
* in the past. we do not account for that atm due to the complexity of data introspection
*/

if (!partitionEvolutionEnabled && icebergTable.specs().size() > 1) {
throw new UnsupportedOperationException(IcebergTable.ERR_MULTIPLE_PARTITION_SPECS)
}

/**
* If the sql conf bucketPartitionEnabled is true, then convert iceberg table with
* bucket partition to unpartitioned delta table; if bucketPartitionEnabled is false,
Expand Down Expand Up @@ -201,11 +210,7 @@ class IcebergTable(
object IcebergTable {
/** Error message constants */
val ERR_MULTIPLE_PARTITION_SPECS =
s"""This Iceberg table has undergone partition evolution. Iceberg tables that had partition
| columns removed can be converted without data loss by setting the SQL configuration
| '${DeltaSQLConf.DELTA_CONVERT_ICEBERG_PARTITION_EVOLUTION_ENABLED.key}' to true. Tables that
| had data columns converted to partition columns will not be able to read the pre-partition
| column values.""".stripMargin
s"Source iceberg table has undergone partition evolution"
val ERR_CUSTOM_NAME_MAPPING = "Cannot convert Iceberg tables with column name mapping"

val ERR_BUCKET_PARTITION = "Cannot convert Iceberg tables with bucket partition"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ trait ConvertIcebergToDeltaSuiteBase
readIcebergHadoopTable(tablePath).updateSpec().addField("data2").commit()
spark.sql(s"INSERT INTO $table VALUES (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')")
// partition evolution happens, convert will fail
val e1 = intercept[UnsupportedOperationException] {
val e1 = intercept[DeltaAnalysisException] {
convert(s"iceberg.`$tablePath`")
}
assert(e1.getMessage.contains(IcebergTable.ERR_MULTIPLE_PARTITION_SPECS))
Expand Down
22 changes: 22 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,28 @@
],
"sqlState" : "42613"
},
"DELTA_CLONE_INCOMPATIBLE_SOURCE" : {
"message" : [
"The clone source has valid format, but has unsupported feature with Delta"
],
"subClass" : {
"ICEBERG_MISSING_PARTITION_SPECS" : {
"message" : [
"Source iceberg table has no partition specs in table"
]
},
"ICEBERG_UNDERGONE_PARTITION_EVOLUTION" : {
"message" : [
"Source iceberg table has undergone partition evolution.",
"Iceberg tables that had partition columns removed can be converted without data loss by setting the SQL configuration",
"'spark.databricks.delta.convert.iceberg.partitionEvolution.enabled' to true.",
"Tables that had data columns converted to partition columns will not",
"be able to read the pre-partition column values."
]
}
},
"sqlState" : "0AKDC"
},
"DELTA_CLONE_UNSUPPORTED_SOURCE" : {
"message" : [
"Unsupported clone source '<name>', whose format is <format>.",
Expand Down
14 changes: 14 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3042,6 +3042,20 @@ trait DeltaErrorsBase
)
}

def cloneFromIcebergSourceWithPartitionEvolution(): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_CLONE_INCOMPATIBLE_SOURCE.ICEBERG_UNDERGONE_PARTITION_EVOLUTION",
messageParameters = Array()
)
}

def cloneFromIcebergSourceWithoutSpecs(): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_CLONE_INCOMPATIBLE_SOURCE.ICEBERG_MISSING_PARTITION_SPECS",
messageParameters = Array()
)
}

def partitionSchemaInIcebergTables: Throwable = {
new DeltaIllegalArgumentException(errorClass = "DELTA_PARTITION_SCHEMA_IN_ICEBERG_TABLES")
}
Expand Down

0 comments on commit b3df5a7

Please sign in to comment.