diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java index 8c3a5deecd7..d8d2cc3e391 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java @@ -485,9 +485,7 @@ public static void validateKernelCanReadTheTable(Protocol protocol, String table public static void validateKernelCanWriteToTable( Protocol protocol, Metadata metadata, String tablePath) { - if (protocol.getMinReaderVersion() > TABLE_FEATURES_MIN_READER_VERSION) { - throw DeltaErrors.unsupportedReaderProtocol(tablePath, protocol.getMinReaderVersion()); - } + validateKernelCanReadTheTable(protocol, tablePath); if (protocol.getMinWriterVersion() > TABLE_FEATURES_MIN_WRITER_VERSION) { throw unsupportedWriterProtocol(tablePath, protocol.getMinWriterVersion()); @@ -495,10 +493,7 @@ public static void validateKernelCanWriteToTable( Set unsupportedFeatures = protocol.getImplicitlyAndExplicitlySupportedFeatures().stream() - .filter( - f -> - !f.hasKernelWriteSupport(metadata) - || (f.isReaderWriterFeature() && !f.hasKernelReadSupport())) + .filter(f -> !f.hasKernelWriteSupport(metadata)) .collect(toSet()); if (!unsupportedFeatures.isEmpty()) { @@ -538,11 +533,8 @@ private static Set extractAutomaticallyEnabledNewFeatures( .metadataRequiresFeatureToBeEnabled(currentProtocol, newMetadata)) .collect(toSet()); - Set combinedFeatures = new HashSet<>(protocolSupportedFeatures); - combinedFeatures.addAll(metadataEnabledFeatures); - // Each feature may have dependencies that are not yet enabled in the protocol. - Set newFeatures = getDependencyFeatures(combinedFeatures); + Set newFeatures = getDependencyFeatures(metadataEnabledFeatures); newFeatures.removeAll(protocolSupportedFeatures); return newFeatures; diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala index bb1013bc561..67f1af1e863 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala @@ -208,8 +208,8 @@ class TableFeaturesSuite extends AnyFunSuite { .filter(_.hasKernelWriteSupport(testMetadata())) .collect(toList()).asScala - // checkConstraints, generatedColumns, identityColumns, invariants, changeDataFeed, - // timestampNtz are writable because the metadata has not been set the info that + // checkConstraints, generatedColumns, identityColumns, invariants and changeDataFeed + // are writable because the metadata has not been set the info that // these features are enabled val expected = Seq( "columnMapping", @@ -281,7 +281,42 @@ class TableFeaturesSuite extends AnyFunSuite { } } + // Read is supported when all table readerWriter features are supported by the Kernel, + // but the table has writeOnly table feature unknonw to Kernel + test("validateKernelCanReadTheTable: with writeOnly feature unknown to Kernel") { + + // legacy reader protocol version + val protocol1 = new Protocol(1, 7, emptySet(), singleton("unknownFeature")) + validateKernelCanReadTheTable(protocol1, "/test/table") + + // table feature supported reader version + val protocol2 = new Protocol( + 3, + 7, + Set("columnMapping", "timestampNtz").asJava, + Set("columnMapping", "timestampNtz", "unknownFeature").asJava) + validateKernelCanReadTheTable(protocol2, "/test/table") + } + + test("validateKernelCanReadTheTable: reader version > 3") { + val protocol = new Protocol(4, 7, emptySet(), singleton("unknownFeature")) + val ex = intercept[KernelException] { + validateKernelCanReadTheTable(protocol, "/test/table") + } + assert(ex.getMessage.contains( + "requires reader version 4 which is unsupported by this version of Delta Kernel")) + } + // Writes + checkWriteUnsupported( + "validateKernelCanWriteToTable: protocol 8", // beyond the table feature writer version + new Protocol(3, 8)) + + checkWriteUnsupported( + // beyond the table feature reader/writer version + "validateKernelCanWriteToTable: protocol 4, 8", + new Protocol(4, 8)) + checkWriteSupported( "validateKernelCanWriteToTable: protocol 1", new Protocol(1, 1), @@ -471,7 +506,7 @@ class TableFeaturesSuite extends AnyFunSuite { s"validateKernelCanWriteToTable: protocol 7 with $feature, " + s"metadata contains $feature", new Protocol(3, 7, singleton(feature), singleton(feature)), - testMetadata(tblProps = Map("delta.enableTypeWidening" -> "true"))) + testMetadata(includeInvariant = true)) } checkWriteSupported( diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala index 7b581beaeb1..eabca9523b8 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala @@ -432,6 +432,8 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { assertCheckpointReadiness(result, expIsReadyForCheckpoint) } + // TODO: Change this to use the table metadata and protocol and + // not rely on DESCRIBE which adds some properties based on the protocol. def verifyTableProperties( tablePath: String, expProperties: ListMap[String, Any], diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index eaf500cef6f..1774d8a5882 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -360,7 +360,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa test("create table with all supported types") { withTempDirAndEngine { (tablePath, engine) => val parquetAllTypes = goldenTablePath("parquet-all-types") - val schema = removeUnsupportedTypes(tableSchema(parquetAllTypes)) + val schema = removeTimestampNtzTypeColumns(tableSchema(parquetAllTypes)) val table = Table.forPath(engine, tablePath) val txnBuilder = table.createTransactionBuilder(engine, testEngineInfo, CREATE_TABLE) @@ -628,7 +628,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa test("insert into table - all supported types data") { withTempDirAndEngine { (tblPath, engine) => val parquetAllTypes = goldenTablePath("parquet-all-types") - val schema = removeUnsupportedTypes(tableSchema(parquetAllTypes)) + val schema = removeTimestampNtzTypeColumns(tableSchema(parquetAllTypes)) val data = readTableUsingKernel(engine, parquetAllTypes, schema).to[Seq] val dataWithPartInfo = Seq(Map.empty[String, Literal] -> data) @@ -656,10 +656,14 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa Seq(true, false).foreach { includeTimestampNtz => test(s"insert into partitioned table - all supported partition column types data - " + - s"timesatmp_ntz included = $includeTimestampNtz") { + s"timestamp_ntz included = $includeTimestampNtz") { withTempDirAndEngine { (tblPath, engine) => val parquetAllTypes = goldenTablePath("parquet-all-types") - val schema = removeUnsupportedTypes(tableSchema(parquetAllTypes)) + val tableSchema = tableSchema(parquetAllTypes) + val schema = if (includeTimestampNtz) { + removeTimestampNtzTypeColumns(tableSchema(parquetAllTypes)) + } else tableSchema + val partCols = Seq( "byteType", "shortType", @@ -987,7 +991,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa } } - def removeUnsupportedTypes(structType: StructType): StructType = { + def removeTimestampNtzTypeColumns(structType: StructType): StructType = { def process(dataType: DataType): Option[DataType] = dataType match { case a: ArrayType => val newElementType = process(a.getElementType) @@ -1000,9 +1004,9 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa Some(new MapType(newKeyType, newValueType, m.isValueContainsNull)) case _ => None } - // case _: TimestampNTZType => None // ignore + case _: TimestampNTZType => None // ignore case s: StructType => - val newType = removeUnsupportedTypes(s); + val newType = removeTimestampNtzTypeColumns(s); if (newType.length() > 0) { Some(newType) } else { diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala index 865a8fce887..4094acc884b 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala @@ -404,6 +404,8 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase { tablePath, ListMap( // appendOnly, invariants implicitly supported as the protocol is upgraded from 2 to 7 + // These properties are not set in the table properties, but are generated by the + // Spark describe IN_COMMIT_TIMESTAMPS_ENABLED.getKey -> true, "delta.feature.appendOnly" -> "supported", "delta.feature.inCommitTimestamp" -> "supported",