Skip to content

Commit

Permalink
[Spark] Remove Generated Column metadata from DataFrame read schema
Browse files Browse the repository at this point in the history
  • Loading branch information
tomvanbussel committed Mar 3, 2025
1 parent fd6f7cd commit 70b66aa
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,12 @@ private[sharing] class DeltaSharingDataSource
location = fileIndex,
// This is copied from DeltaLog.buildHadoopFsRelationWithFileIndex.
// Dropping column mapping metadata because it is not relevant for partition schema.
partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata(fileIndex.partitionSchema),
partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata(
TahoeDeltaTableUtils.removeInternalWriterMetadata(
spark,
SchemaUtils.dropNullTypeColumns(deltaSharingTableMetadata.metadata.schema)
)
),
// This is copied from DeltaLog.buildHadoopFsRelationWithFileIndex, original comment:
// We pass all table columns as `dataSchema` so that Spark will preserve the partition
// column locations. Otherwise, for any partition columns not in `dataSchema`, Spark would
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,8 @@ class DeltaLog private(
HadoopFsRelation(
fileIndex,
partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata(
snapshot.metadata.partitionSchema),
DeltaTableUtils.removeInternalWriterMetadata(spark, snapshot.metadata.partitionSchema)
),
// We pass all table columns as `dataSchema` so that Spark will preserve the partition
// column locations. Otherwise, for any partition columns not in `dataSchema`, Spark would
// just append them to the end of `dataSchema`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ class DeltaTableV2 private[delta](
}

private lazy val tableSchema: StructType = {
val baseSchema = cdcRelation.map(_.schema).getOrElse {
DeltaTableUtils.removeInternalWriterMetadata(spark, initialSnapshot.schema)
}
DeltaColumnMapping.dropColumnMappingMetadata(baseSchema)
val baseSchema = cdcRelation.map(_.schema).getOrElse(initialSnapshot.schema)
DeltaColumnMapping.dropColumnMappingMetadata(
DeltaTableUtils.removeInternalWriterMetadata(spark, baseSchema)
)
}

override def schema(): StructType = tableSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,15 @@ object CDCReader extends CDCReaderImpl
snapshotWithSchemaMode.snapshot
}

override val schema: StructType = cdcReadSchema(snapshotForBatchSchema.metadata.schema)
override val schema: StructType = {
cdcReadSchema(
DeltaColumnMapping.dropColumnMappingMetadata(
DeltaTableUtils.removeInternalWriterMetadata(
sqlContext.sparkSession, snapshotForBatchSchema.metadata.schema
)
)
)
}

override def unhandledFilters(filters: Array[Filter]): Array[Filter] = Array.empty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,13 @@ trait DeltaSourceBase extends Source
@volatile protected var hasCheckedReadIncompatibleSchemaChangesOnStreamStart: Boolean = false

override val schema: StructType = {
val readSchema = DeltaTableUtils.removeInternalWriterMetadata(spark, readSchemaAtSourceInit)
val readSchemaWithCdc = if (options.readChangeFeed) {
CDCReader.cdcReadSchema(readSchema)
CDCReader.cdcReadSchema(readSchemaAtSourceInit)
} else {
readSchema
readSchemaAtSourceInit
}
DeltaColumnMapping.dropColumnMappingMetadata(readSchemaWithCdc)
DeltaColumnMapping.dropColumnMappingMetadata(
DeltaTableUtils.removeInternalWriterMetadata(spark, readSchemaWithCdc))
}

// A dummy empty dataframe that can be returned at various point during streaming
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta
// scalastyle:off typedlit
// scalastyle:off import.ordering.noEmptyLine
import java.io.PrintWriter
import java.sql.{Date, Timestamp}

import scala.collection.JavaConverters._

Expand All @@ -39,7 +40,6 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming.MemoryStream

import org.apache.spark.sql.functions.{current_timestamp, lit, struct, typedLit}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{StreamingQueryException, Trigger}
Expand Down Expand Up @@ -2031,6 +2031,53 @@ trait GeneratedColumnSuiteBase
}
}
}

test("generated column metadata is not exposed in schema") {
val tableName = "table"
withTable(tableName) {
createDefaultTestTable(tableName)
Seq((1L, "foo", Timestamp.valueOf("2020-10-11 12:30:30"), 100, Date.valueOf("2020-11-12")))
.toDF("c1", "c3_p", "c5", "c6", "c8")
.write.format("delta").mode("append").saveAsTable(tableName)

val expectedSchema = new StructType()
.add("c1", LongType)
.add("c2_g", LongType)
.add("c3_p", StringType)
.add("c4_g_p", DateType)
.add("c5", TimestampType)
.add("c6", IntegerType)
.add("c7_g_p", IntegerType)
.add("c8", DateType)

assert(spark.read.table(tableName).schema === expectedSchema)

val ttDf = spark.read.option(DeltaOptions.VERSION_AS_OF, 0).table(tableName)
assert(ttDf.schema === expectedSchema)

val cdcDf = spark.read
.option(DeltaOptions.CDC_READ_OPTION, true)
.option(DeltaOptions.STARTING_VERSION_OPTION, 0)
.table(tableName)
assert(cdcDf.schema === expectedSchema
.add("_change_type", StringType)
.add("_commit_version", LongType)
.add("_commit_timestamp", TimestampType)
)

assert(spark.readStream.table(tableName).schema === expectedSchema)

val cdcStreamDf = spark.readStream
.option(DeltaOptions.CDC_READ_OPTION, true)
.option(DeltaOptions.STARTING_VERSION_OPTION, 0)
.table(tableName)
assert(cdcStreamDf.schema === expectedSchema
.add("_change_type", StringType)
.add("_commit_version", LongType)
.add("_commit_timestamp", TimestampType)
)
}
}
}

class GeneratedColumnSuite extends GeneratedColumnSuiteBase
Expand Down

0 comments on commit 70b66aa

Please sign in to comment.