diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergSchemaUtils.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergSchemaUtils.scala index 3c2f0cf0d81..3895850f365 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergSchemaUtils.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergSchemaUtils.scala @@ -20,6 +20,7 @@ import org.apache.spark.sql.delta.DeltaColumnMapping import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.iceberg.Schema import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.iceberg.types.TypeUtil import org.apache.spark.sql.types.{MetadataBuilder, StructType} @@ -29,12 +30,18 @@ object IcebergSchemaUtils { * Given an iceberg schema, convert it to a Spark schema. This conversion will keep the Iceberg * column IDs (used to read Parquet files) in the field metadata * - * @param icebergSchema - * @return StructType for the converted schema + * @param icebergSchema Iceberg schema + * @param castTimeType cast Iceberg TIME type to Spark Long + * @return Spark schema converted from Iceberg schema */ - def convertIcebergSchemaToSpark(icebergSchema: Schema): StructType = { + def convertIcebergSchemaToSpark(icebergSchema: Schema, + castTimeType: Boolean = false): StructType = { // Convert from Iceberg schema to Spark schema but without the column IDs - val baseConvertedSchema = SparkSchemaUtil.convert(icebergSchema) + val baseConvertedSchema = if (castTimeType) { + TypeUtil.visit(icebergSchema, new TypeToSparkTypeWithCustomCast()).asInstanceOf[StructType] + } else { + SparkSchemaUtil.convert(icebergSchema) + } // For each field, find the column ID (fieldId) and add to the StructField metadata SchemaMergingUtils.transformColumns(baseConvertedSchema) { (path, field, _) => diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergTable.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergTable.scala index 635596b064e..5e233ec3bd8 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergTable.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergTable.scala @@ -20,7 +20,7 @@ import java.util.Locale import scala.collection.JavaConverters._ -import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaColumnMappingMode, DeltaConfigs, IdMapping, SerializableFileStatus} +import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaColumnMappingMode, DeltaConfigs, IdMapping, SerializableFileStatus, Snapshot} import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.iceberg.{Table, TableProperties} @@ -36,7 +36,7 @@ import org.apache.spark.sql.types.StructType * A target Iceberg table for conversion to a Delta table. * * @param icebergTable the Iceberg table underneath. - * @param existingSchema schema used for incremental update, none for initial conversion. + * @param deltaSnapshot the delta snapshot used for incremental update, none for initial conversion. * @param convertStats flag for disabling convert iceberg stats directly into Delta stats. * If you wonder why we need this flag, you are not alone. * This flag is only used by the old, obsolete, legacy command @@ -49,23 +49,31 @@ import org.apache.spark.sql.types.StructType class IcebergTable( spark: SparkSession, icebergTable: Table, - existingSchema: Option[StructType], + deltaSnapshot: Option[Snapshot], convertStats: Boolean) extends ConvertTargetTable { - def this(spark: SparkSession, basePath: String, existingSchema: Option[StructType], + def this(spark: SparkSession, basePath: String, deltaTable: Option[Snapshot], convertStats: Boolean = true) = // scalastyle:off deltahadoopconfiguration this(spark, new HadoopTables(spark.sessionState.newHadoopConf).load(basePath), - existingSchema, convertStats) + deltaTable, convertStats) // scalastyle:on deltahadoopconfiguration + protected val existingSchema: Option[StructType] = deltaSnapshot.map(_.schema) + private val partitionEvolutionEnabled = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_PARTITION_EVOLUTION_ENABLED) private val bucketPartitionEnabled = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_BUCKET_PARTITION_ENABLED) - private val fieldPathToPhysicalName = + // When a table is CLONED/federated with the session conf ON, it will have the table property + // set and will continue to support CAST TIME TYPE even when later the session conf is OFF. + private val castTimeType = + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_CAST_TIME_TYPE) || + deltaSnapshot.exists(s => DeltaConfigs.CAST_ICEBERG_TIME_TYPE.fromMetaData(s.metadata)) + + protected val fieldPathToPhysicalName = existingSchema.map { SchemaMergingUtils.explode(_).collect { case (path, field) if DeltaColumnMapping.hasPhysicalName(field) => @@ -76,7 +84,7 @@ class IcebergTable( private val convertedSchema = { // Reuse physical names of existing columns. val mergedSchema = DeltaColumnMapping.setPhysicalNames( - IcebergSchemaUtils.convertIcebergSchemaToSpark(icebergTable.schema()), + IcebergSchemaUtils.convertIcebergSchemaToSpark(icebergTable.schema(), castTimeType), fieldPathToPhysicalName) // Assign physical names to new columns. @@ -88,8 +96,13 @@ class IcebergTable( override val properties: Map[String, String] = { val maxSnapshotAgeMs = PropertyUtil.propertyAsLong(icebergTable.properties, TableProperties.MAX_SNAPSHOT_AGE_MS, TableProperties.MAX_SNAPSHOT_AGE_MS_DEFAULT) + val castTimeTypeConf = if (castTimeType) { + Some((DeltaConfigs.CAST_ICEBERG_TIME_TYPE.key -> "true")) + } else { + None + } icebergTable.properties().asScala.toMap + (DeltaConfigs.COLUMN_MAPPING_MODE.key -> "id") + - (DeltaConfigs.LOG_RETENTION.key -> s"$maxSnapshotAgeMs millisecond") + (DeltaConfigs.LOG_RETENTION.key -> s"$maxSnapshotAgeMs millisecond") ++ castTimeTypeConf } override val partitionSchema: StructType = { diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/TypeToSparkTypeWithCustomCast.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/TypeToSparkTypeWithCustomCast.scala new file mode 100644 index 00000000000..67530cd6fe1 --- /dev/null +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/TypeToSparkTypeWithCustomCast.scala @@ -0,0 +1,127 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.commands.convert + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.iceberg.MetadataColumns +import org.apache.iceberg.Schema +import org.apache.iceberg.relocated.com.google.common.collect.Lists +import org.apache.iceberg.types.Type +import org.apache.iceberg.types.Type.TypeID._ +import org.apache.iceberg.types.Types +import org.apache.iceberg.types.TypeUtil + +import org.apache.spark.sql.types.ArrayType +import org.apache.spark.sql.types.BinaryType +import org.apache.spark.sql.types.BooleanType +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.DateType +import org.apache.spark.sql.types.DecimalType +import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.types.FloatType +import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.LongType +import org.apache.spark.sql.types.MapType +import org.apache.spark.sql.types.Metadata +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.TimestampNTZType +import org.apache.spark.sql.types.TimestampType + +/** + * This class is copied from [[org.apache.iceberg.spark.TypeToSparkType]] to + * add custom type casting. Currently, it supports the following casting + * * Iceberg TIME -> Spark Long + * + */ +class TypeToSparkTypeWithCustomCast extends TypeUtil.SchemaVisitor[DataType] { + + val METADATA_COL_ATTR_KEY = "__metadata_col"; + + override def schema(schema: Schema, structType: DataType): DataType = structType + + override def struct(struct: Types.StructType, fieldResults: util.List[DataType]): DataType = { + val fields = struct.fields(); + val sparkFields: util.List[StructField] = + Lists.newArrayListWithExpectedSize(fieldResults.size()) + for (i <- 0 until fields.size()) { + val field = fields.get(i) + val `type` = fieldResults.get(i) + val metadata = fieldMetadata(field.fieldId()) + var sparkField = StructField.apply(field.name(), `type`, field.isOptional(), metadata) + if (field.doc() != null) { + sparkField = sparkField.withComment(field.doc()) + } + sparkFields.add(sparkField) + } + + StructType.apply(sparkFields) + } + + override def field(field: Types.NestedField, fieldResult: DataType): DataType = fieldResult + + override def list(list: Types.ListType, elementResult: DataType): DataType = + ArrayType.apply(elementResult, list.isElementOptional()) + + override def map(map: Types.MapType, keyResult: DataType, valueResult: DataType): DataType = + MapType.apply(keyResult, valueResult, map.isValueOptional()) + + override def primitive(primitive: Type.PrimitiveType): DataType = { + primitive.typeId() match { + case BOOLEAN => BooleanType + case INTEGER => IntegerType + case LONG => LongType + case FLOAT => FloatType + case DOUBLE => DoubleType + case DATE => DateType + // This line is changed to allow casting TIME to Spark Long. + // The result is microseconds since midnight. + case TIME => LongType + case TIMESTAMP => + val ts = primitive.asInstanceOf[Types.TimestampType] + if (ts.shouldAdjustToUTC()) { + TimestampType + } else { + TimestampNTZType + } + case STRING => StringType + case UUID => // use String + StringType + case FIXED => BinaryType + case BINARY => BinaryType + case DECIMAL => + val decimal = primitive.asInstanceOf[Types.DecimalType] + DecimalType.apply(decimal.precision(), decimal.scale()); + case _ => + throw new UnsupportedOperationException( + "Cannot convert unknown type to Spark: " + primitive); + } + } + + private def fieldMetadata(fieldId: Int): Metadata = { + if (MetadataColumns.metadataFieldIds().contains(fieldId)) { + return new MetadataBuilder().putBoolean(METADATA_COL_ATTR_KEY, value = true).build() + } + + Metadata.empty + } +} diff --git a/iceberg/src/test/scala/org/apache/spark/sql/delta/CloneIcebergSuite.scala b/iceberg/src/test/scala/org/apache/spark/sql/delta/CloneIcebergSuite.scala index 083b7c8683f..ed561685d61 100644 --- a/iceberg/src/test/scala/org/apache/spark/sql/delta/CloneIcebergSuite.scala +++ b/iceberg/src/test/scala/org/apache/spark/sql/delta/CloneIcebergSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine import java.sql.Date +import java.time.LocalTime import scala.collection.JavaConverters._ import scala.util.Try @@ -26,7 +27,10 @@ import org.apache.spark.sql.delta.commands.convert.ConvertUtils import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.stats.StatisticsCollection +import org.apache.iceberg.Schema import org.apache.iceberg.hadoop.HadoopTables +import org.apache.iceberg.types.Types +import org.apache.iceberg.types.Types.NestedField import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} @@ -451,6 +455,58 @@ class CloneIcebergByPathSuite extends CloneIcebergSuiteBase } } +/** + * This suite test features in Iceberg that is not directly supported by Spark. + * See also [[NonSparkIcebergTestUtils]]. + * We do not put these tests in or extend from [[CloneIcebergSuiteBase]] because they + * use non-Spark way to create test data. + */ +class CloneNonSparkIcebergByPathSuite extends QueryTest + with ConvertIcebergToDeltaUtils { + + protected val cloneTable = "clone" + + private def sourceIdentifier: String = s"iceberg.`$tablePath`" + + private def runCreateOrReplace(mode: String, source: String): DataFrame = { + Try(spark.sql(s"DELETE FROM $cloneTable")) + spark.sql(s"CREATE OR REPLACE TABLE $cloneTable $mode CLONE $source") + } + + private val mode = "SHALLOW" + + test("cast Iceberg TIME to Spark long") { + withTable(table, cloneTable) { + val schema = new Schema( + Seq[NestedField]( + NestedField.required(1, "id", Types.IntegerType.get), + NestedField.required(2, "event_time", Types.TimeType.get) + ).asJava + ) + val rows = Seq( + Map( + "id" -> 1, + "event_time" -> LocalTime.of(14, 30, 11) + ) + ) + NonSparkIcebergTestUtils.createIcebergTable(spark, tablePath, schema, rows) + intercept[UnsupportedOperationException] { + runCreateOrReplace(mode, sourceIdentifier) + } + withSQLConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_CAST_TIME_TYPE.key -> "true") { + runCreateOrReplace(mode, sourceIdentifier) + val expectedMicrosec = (14 * 3600 + 30 * 60 + 11) * 1000000L + checkAnswer(spark.table(cloneTable), Row(1, expectedMicrosec) :: Nil) + val clonedDeltaTable = DeltaLog.forTable( + spark, + spark.sessionState.catalog.getTableMetadata(TableIdentifier(cloneTable)) + ) + assert(DeltaConfigs.CAST_ICEBERG_TIME_TYPE.fromMetaData(clonedDeltaTable.update().metadata)) + } + } + } +} + class CloneIcebergByNameSuite extends CloneIcebergSuiteBase { override def sourceIdentifier: String = table diff --git a/iceberg/src/test/scala/org/apache/spark/sql/delta/NonSparkIcebergTestUtils.scala b/iceberg/src/test/scala/org/apache/spark/sql/delta/NonSparkIcebergTestUtils.scala new file mode 100644 index 00000000000..95f608f0aad --- /dev/null +++ b/iceberg/src/test/scala/org/apache/spark/sql/delta/NonSparkIcebergTestUtils.scala @@ -0,0 +1,89 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import scala.collection.JavaConverters._ + +import org.apache.iceberg.{DataFile, DataFiles, Files, PartitionSpec, Schema, Table} +import org.apache.iceberg.data.GenericRecord +import org.apache.iceberg.data.parquet.GenericParquetWriter +import org.apache.iceberg.hadoop.HadoopTables +import org.apache.iceberg.io.FileAppender +import org.apache.iceberg.parquet.Parquet +import org.apache.iceberg.types.Types +import org.apache.iceberg.types.Types.NestedField + +import org.apache.spark.sql.SparkSession + +object NonSparkIcebergTestUtils { + + /** + * Create an Iceberg table with formats/data types not supported by Spark. + * This is primarily used for compatibility tests. It includes the following features + * * TIME data type that is not supported by Spark. + * @param location Iceberg table root path + * @param schema Iceberg table schema + * @param rows Data rows we write into the table + */ + def createIcebergTable( + spark: SparkSession, + location: String, + schema: Schema, + rows: Seq[Map[String, Any]]): Table = { + // scalastyle:off deltahadoopconfiguration + val tables = new HadoopTables(spark.sessionState.newHadoopConf()) + // scalastyle:on deltahadoopconfiguration + val table = tables.create( + schema, + PartitionSpec.unpartitioned(), + location + ) + + val records = rows.map { row => + val record = GenericRecord.create(schema) + row.foreach { + case (key, value) => record.setField(key, value) + } + record + } + + val parquetLocation = location + "/data/001.parquet" + + val fileAppender: FileAppender[GenericRecord] = Parquet + .write(table.io().newOutputFile(parquetLocation)) + .schema(schema) + .createWriterFunc(GenericParquetWriter.buildWriter) + .overwrite() + .build(); + try { + fileAppender.addAll(records.asJava) + } finally { + fileAppender.close + } + + val dataFile = DataFiles.builder(PartitionSpec.unpartitioned()) + .withInputFile(table.io().newInputFile(parquetLocation)) + .withMetrics(fileAppender.metrics()) + .build(); + + table + .newAppend + .appendFile(dataFile) + .commit + table + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala index 0c567eebac0..10d3b6b0e9f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala @@ -296,7 +296,7 @@ class DeltaAnalysis(session: SparkSession) resolveCloneCommand( cloneStatement.target, CloneIcebergSource( - table.tableIdentifier, sparkTable = None, tableSchema = None, session), + table.tableIdentifier, sparkTable = None, deltaSnapshot = None, session), cloneStatement) case DataSourceV2Relation(table, _, _, _, _) @@ -313,7 +313,7 @@ class DeltaAnalysis(session: SparkSession) } resolveCloneCommand( cloneStatement.target, - CloneIcebergSource(tableIdent, Some(table), tableSchema = None, session), + CloneIcebergSource(tableIdent, Some(table), deltaSnapshot = None, session), cloneStatement) case u: UnresolvedRelation => diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala index 23deca4ba9c..b70ca428b39 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala @@ -755,6 +755,13 @@ trait DeltaConfigsBase extends DeltaLogging { helpMessage = "needs to be a boolean." ) + val CAST_ICEBERG_TIME_TYPE = buildConfig[Boolean]( + key = "castIcebergTimeType", + defaultValue = "false", + fromString = _.toBoolean, + validationFunction = _ => true, + helpMessage = "Casting Iceberg TIME type to Spark Long type enabled" + ) /** * Enable optimized writes into a Delta table. Optimized writes adds an adaptive shuffle before * the write to write compacted files into a Delta table during a write. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala index bbfded0256a..94f6afbdf9f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala @@ -282,11 +282,11 @@ case class CloneParquetSource( case class CloneIcebergSource( tableIdentifier: TableIdentifier, sparkTable: Option[Table], - tableSchema: Option[StructType], + deltaSnapshot: Option[Snapshot], spark: SparkSession) extends CloneConvertedSource(spark) { override lazy val convertTargetTable: ConvertTargetTable = - ConvertUtils.getIcebergTable(spark, tableIdentifier.table, sparkTable, tableSchema) + ConvertUtils.getIcebergTable(spark, tableIdentifier.table, sparkTable, deltaSnapshot) override def format: String = CloneSourceFormat.ICEBERG diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/ConvertUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/ConvertUtils.scala index 482f165fcfa..0fabda5c3e9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/ConvertUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/ConvertUtils.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta.commands.convert import java.lang.reflect.InvocationTargetException -import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaErrors, SerializableFileStatus} +import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaErrors, SerializableFileStatus, Snapshot} import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaMergingUtils @@ -70,7 +70,7 @@ trait ConvertUtilsBase extends DeltaLogging { * @param spark: the spark session to use. * @param targetDir: the target directory of the Iceberg table. * @param sparkTable: the optional V2 table interface of the Iceberg table. - * @param tableSchema: the existing converted Delta table schema (if exists) of the Iceberg table. + * @param deltaTable: the existing converted Delta table (if exists) of the Iceberg table. * @param collectStats: collect column stats on convert * @return a target Iceberg table. */ @@ -78,7 +78,7 @@ trait ConvertUtilsBase extends DeltaLogging { spark: SparkSession, targetDir: String, sparkTable: Option[Table], - tableSchema: Option[StructType], + deltaSnapshot: Option[Snapshot], collectStats: Boolean = true): ConvertTargetTable = { try { val convertIcebergStats = collectStats && @@ -88,18 +88,18 @@ trait ConvertUtilsBase extends DeltaLogging { val constFromTable = clazz.getConstructor( classOf[SparkSession], Utils.classForName(icebergLibTableClassPath), - classOf[Option[StructType]], + classOf[Option[Snapshot]], java.lang.Boolean.TYPE ) val method = sparkTable.get.getClass.getMethod("table") - constFromTable.newInstance(spark, method.invoke(sparkTable.get), tableSchema, + constFromTable.newInstance(spark, method.invoke(sparkTable.get), deltaSnapshot, java.lang.Boolean.valueOf(convertIcebergStats)) } else { val baseDir = getQualifiedPath(spark, new Path(targetDir)).toString val constFromPath = clazz.getConstructor( - classOf[SparkSession], classOf[String], classOf[Option[StructType]], + classOf[SparkSession], classOf[String], classOf[Option[Snapshot]], java.lang.Boolean.TYPE) - constFromPath.newInstance(spark, baseDir, tableSchema, + constFromPath.newInstance(spark, baseDir, deltaSnapshot, java.lang.Boolean.valueOf(convertIcebergStats)) } } catch { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 3a65a3660b8..d43dcf0cb34 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1553,6 +1553,14 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(false) + val DELTA_CONVERT_ICEBERG_CAST_TIME_TYPE = { + buildConf("convert.iceberg.castTimeType") + .internal() + .doc("Cast Iceberg TIME type to Spark Long when converting to Delta") + .booleanConf + .createWithDefault(false) + } + final object NonDeterministicPredicateWidening { final val OFF = "off" final val LOGGING = "logging"