From 70b66aac219839a64bbba57039ad7cda646d2cd4 Mon Sep 17 00:00:00 2001 From: Tom van Bussel Date: Mon, 3 Mar 2025 12:43:29 +0100 Subject: [PATCH] [Spark] Remove Generated Column metadata from DataFrame read schema --- .../spark/DeltaSharingDataSource.scala | 7 ++- .../org/apache/spark/sql/delta/DeltaLog.scala | 3 +- .../sql/delta/catalog/DeltaTableV2.scala | 8 +-- .../sql/delta/commands/cdc/CDCReader.scala | 10 +++- .../spark/sql/delta/sources/DeltaSource.scala | 8 +-- .../sql/delta/GeneratedColumnSuite.scala | 49 ++++++++++++++++++- 6 files changed, 73 insertions(+), 12 deletions(-) diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala index 512c835a387..3217b9f0ff3 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala @@ -410,7 +410,12 @@ private[sharing] class DeltaSharingDataSource location = fileIndex, // This is copied from DeltaLog.buildHadoopFsRelationWithFileIndex. // Dropping column mapping metadata because it is not relevant for partition schema. - partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata(fileIndex.partitionSchema), + partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata( + TahoeDeltaTableUtils.removeInternalWriterMetadata( + spark, + SchemaUtils.dropNullTypeColumns(deltaSharingTableMetadata.metadata.schema) + ) + ), // This is copied from DeltaLog.buildHadoopFsRelationWithFileIndex, original comment: // We pass all table columns as `dataSchema` so that Spark will preserve the partition // column locations. Otherwise, for any partition columns not in `dataSchema`, Spark would diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 821aa3edbd0..9a11ec70cfd 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -623,7 +623,8 @@ class DeltaLog private( HadoopFsRelation( fileIndex, partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata( - snapshot.metadata.partitionSchema), + DeltaTableUtils.removeInternalWriterMetadata(spark, snapshot.metadata.partitionSchema) + ), // We pass all table columns as `dataSchema` so that Spark will preserve the partition // column locations. Otherwise, for any partition columns not in `dataSchema`, Spark would // just append them to the end of `dataSchema`. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index cafa3ec2d99..29339d5f19b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -202,10 +202,10 @@ class DeltaTableV2 private[delta]( } private lazy val tableSchema: StructType = { - val baseSchema = cdcRelation.map(_.schema).getOrElse { - DeltaTableUtils.removeInternalWriterMetadata(spark, initialSnapshot.schema) - } - DeltaColumnMapping.dropColumnMappingMetadata(baseSchema) + val baseSchema = cdcRelation.map(_.schema).getOrElse(initialSnapshot.schema) + DeltaColumnMapping.dropColumnMappingMetadata( + DeltaTableUtils.removeInternalWriterMetadata(spark, baseSchema) + ) } override def schema(): StructType = tableSchema diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala index a35dc75a82a..fdb58b02d1d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala @@ -150,7 +150,15 @@ object CDCReader extends CDCReaderImpl snapshotWithSchemaMode.snapshot } - override val schema: StructType = cdcReadSchema(snapshotForBatchSchema.metadata.schema) + override val schema: StructType = { + cdcReadSchema( + DeltaColumnMapping.dropColumnMappingMetadata( + DeltaTableUtils.removeInternalWriterMetadata( + sqlContext.sparkSession, snapshotForBatchSchema.metadata.schema + ) + ) + ) + } override def unhandledFilters(filters: Array[Filter]): Array[Filter] = Array.empty diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index 7d926999c86..8aff0189f80 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -225,13 +225,13 @@ trait DeltaSourceBase extends Source @volatile protected var hasCheckedReadIncompatibleSchemaChangesOnStreamStart: Boolean = false override val schema: StructType = { - val readSchema = DeltaTableUtils.removeInternalWriterMetadata(spark, readSchemaAtSourceInit) val readSchemaWithCdc = if (options.readChangeFeed) { - CDCReader.cdcReadSchema(readSchema) + CDCReader.cdcReadSchema(readSchemaAtSourceInit) } else { - readSchema + readSchemaAtSourceInit } - DeltaColumnMapping.dropColumnMappingMetadata(readSchemaWithCdc) + DeltaColumnMapping.dropColumnMappingMetadata( + DeltaTableUtils.removeInternalWriterMetadata(spark, readSchemaWithCdc)) } // A dummy empty dataframe that can be returned at various point during streaming diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/GeneratedColumnSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/GeneratedColumnSuite.scala index 47d6eaffbb4..62e3d15c5b0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/GeneratedColumnSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/GeneratedColumnSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta // scalastyle:off typedlit // scalastyle:off import.ordering.noEmptyLine import java.io.PrintWriter +import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ @@ -39,7 +40,6 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp, toJavaDate, toJavaTimestamp} import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming.MemoryStream - import org.apache.spark.sql.functions.{current_timestamp, lit, struct, typedLit} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{StreamingQueryException, Trigger} @@ -2031,6 +2031,53 @@ trait GeneratedColumnSuiteBase } } } + + test("generated column metadata is not exposed in schema") { + val tableName = "table" + withTable(tableName) { + createDefaultTestTable(tableName) + Seq((1L, "foo", Timestamp.valueOf("2020-10-11 12:30:30"), 100, Date.valueOf("2020-11-12"))) + .toDF("c1", "c3_p", "c5", "c6", "c8") + .write.format("delta").mode("append").saveAsTable(tableName) + + val expectedSchema = new StructType() + .add("c1", LongType) + .add("c2_g", LongType) + .add("c3_p", StringType) + .add("c4_g_p", DateType) + .add("c5", TimestampType) + .add("c6", IntegerType) + .add("c7_g_p", IntegerType) + .add("c8", DateType) + + assert(spark.read.table(tableName).schema === expectedSchema) + + val ttDf = spark.read.option(DeltaOptions.VERSION_AS_OF, 0).table(tableName) + assert(ttDf.schema === expectedSchema) + + val cdcDf = spark.read + .option(DeltaOptions.CDC_READ_OPTION, true) + .option(DeltaOptions.STARTING_VERSION_OPTION, 0) + .table(tableName) + assert(cdcDf.schema === expectedSchema + .add("_change_type", StringType) + .add("_commit_version", LongType) + .add("_commit_timestamp", TimestampType) + ) + + assert(spark.readStream.table(tableName).schema === expectedSchema) + + val cdcStreamDf = spark.readStream + .option(DeltaOptions.CDC_READ_OPTION, true) + .option(DeltaOptions.STARTING_VERSION_OPTION, 0) + .table(tableName) + assert(cdcStreamDf.schema === expectedSchema + .add("_change_type", StringType) + .add("_commit_version", LongType) + .add("_commit_timestamp", TimestampType) + ) + } + } } class GeneratedColumnSuite extends GeneratedColumnSuiteBase