From 6aae232617a908c7e971b486a72520a866e2a0c5 Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Wed, 5 Feb 2025 02:00:01 -0800 Subject: [PATCH 1/5] Table feature interfaces --- .../io/delta/kernel/internal/TableConfig.java | 62 +++++ .../io/delta/kernel/internal/TableImpl.java | 1 + .../internal/TransactionBuilderImpl.java | 1 + .../kernel/internal/TransactionImpl.java | 1 + .../kernel/internal/actions/Protocol.java | 2 +- .../internal/replay/ConflictChecker.java | 1 + .../kernel/internal/replay/LogReplay.java | 2 +- .../internal/rowtracking/RowTracking.java | 2 +- .../internal/snapshot/SnapshotManager.java | 2 +- .../FeatureAutoEnablementByMetadata.java | 49 ++++ .../internal/tablefeatures/TableFeature.java | 148 ++++++++++++ .../{ => tablefeatures}/TableFeatures.java | 222 +++++++++++++++++- .../internal/util/DomainMetadataUtils.java | 2 +- .../kernel/internal/TableFeaturesSuite.scala | 2 +- 14 files changed, 486 insertions(+), 11 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnablementByMetadata.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java rename kernel/kernel-api/src/main/java/io/delta/kernel/internal/{ => tablefeatures}/TableFeatures.java (53%) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java index bcd0512b1aa..b14bf6a3bbd 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java @@ -34,6 +34,55 @@ public class TableConfig { // TableConfigs // ////////////////// + /** + * Whether this Delta table is append-only. Files can't be deleted, or values can't be updated. + */ + public static final TableConfig APPEND_ONLY = + new TableConfig<>( + "delta.appendOnly", + "false", + Boolean::valueOf, + value -> true, + "needs to be a boolean.", + true); + + /** + * Enable change data feed output. When enabled, DELETE, UPDATE, and MERGE INTO operations will + * need to do additional work to output their change data in an efficiently readable format. + */ + public static final TableConfig CHANGE_DATA_FEED = + new TableConfig<>( + "delta.enableChangeDataFeed", + "false", + Boolean::valueOf, + value -> true, + "needs to be a boolean.", + true); + + /** Whether commands modifying this Delta table are allowed to create new deletion vectors. */ + public static final TableConfig ENABLE_DELETION_VECTORS_CREATION = + new TableConfig<>( + "delta.enableDeletionVectors", + "false", + Boolean::valueOf, + value -> true, + "needs to be a boolean.", + true); + + /** + * Indicates whether Row Tracking is enabled on the table. When this flag is turned on, all rows + * are guaranteed to have Row IDs and Row Commit Versions assigned to them, and writers are + * expected to preserve them by materializing them to hidden columns in the data files. + */ + public static final TableConfig ROW_TRACKING_ENABLED = + new TableConfig<>( + "delta.enableRowTracking", + "false", + Boolean::valueOf, + value -> true, + "needs to be a boolean.", + true); + /** * The shortest duration we have to keep logically deleted data files around before deleting them * physically. @@ -170,6 +219,19 @@ public class TableConfig { "needs to be a boolean.", true); + /** + * Whether widening the type of an existing column or field is allowed, either manually using + * ALTER TABLE CHANGE COLUMN or automatically if automatic schema evolution is enabled. + */ + public static final TableConfig ENABLE_TYPE_WIDENING = + new TableConfig<>( + "delta.enableTypeWidening", + "false", + Boolean::valueOf, + value -> true, + "needs to be a boolean.", + true); + /** All the valid properties that can be set on the table. */ private static final Map> VALID_PROPERTIES = Collections.unmodifiableMap( diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java index 0691c24ab82..0d9844d9841 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java @@ -29,6 +29,7 @@ import io.delta.kernel.internal.metrics.SnapshotQueryContext; import io.delta.kernel.internal.metrics.SnapshotReportImpl; import io.delta.kernel.internal.snapshot.SnapshotManager; +import io.delta.kernel.internal.tablefeatures.TableFeatures; import io.delta.kernel.internal.util.Clock; import io.delta.kernel.metrics.SnapshotReport; import io.delta.kernel.types.StructField; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java index fd16f04ce12..29752160bbe 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java @@ -36,6 +36,7 @@ import io.delta.kernel.internal.replay.LogReplay; import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.snapshot.SnapshotHint; +import io.delta.kernel.internal.tablefeatures.TableFeatures; import io.delta.kernel.internal.util.ColumnMapping; import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode; import io.delta.kernel.internal.util.SchemaUtils; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 44b0b49ccd9..3937d1a24e5 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -35,6 +35,7 @@ import io.delta.kernel.internal.replay.ConflictChecker; import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState; import io.delta.kernel.internal.rowtracking.RowTracking; +import io.delta.kernel.internal.tablefeatures.TableFeatures; import io.delta.kernel.internal.util.*; import io.delta.kernel.metrics.TransactionReport; import io.delta.kernel.types.StructType; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java index e71036935dd..fa35d020c4b 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java @@ -18,8 +18,8 @@ import static io.delta.kernel.internal.util.VectorUtils.stringArrayValue; import io.delta.kernel.data.*; -import io.delta.kernel.internal.TableFeatures; import io.delta.kernel.internal.data.GenericRow; +import io.delta.kernel.internal.tablefeatures.TableFeatures; import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.internal.util.VectorUtils; import io.delta.kernel.types.ArrayType; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java index 42fa784a7ce..01eb61840de 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java @@ -35,6 +35,7 @@ import io.delta.kernel.internal.actions.SetTransaction; import io.delta.kernel.internal.rowtracking.RowTracking; import io.delta.kernel.internal.rowtracking.RowTrackingMetadataDomain; +import io.delta.kernel.internal.tablefeatures.TableFeatures; import io.delta.kernel.internal.util.DomainMetadataUtils; import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.utils.CloseableIterable; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index fd00ad9a2a1..4c681aa0785 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -26,7 +26,6 @@ import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.engine.Engine; import io.delta.kernel.expressions.Predicate; -import io.delta.kernel.internal.TableFeatures; import io.delta.kernel.internal.actions.*; import io.delta.kernel.internal.checkpoints.SidecarFile; import io.delta.kernel.internal.fs.Path; @@ -35,6 +34,7 @@ import io.delta.kernel.internal.metrics.SnapshotMetrics; import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.snapshot.SnapshotHint; +import io.delta.kernel.internal.tablefeatures.TableFeatures; import io.delta.kernel.internal.util.DomainMetadataUtils; import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.types.StringType; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java index 9e54b6e1fa2..2a07c5aebec 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/rowtracking/RowTracking.java @@ -20,8 +20,8 @@ import io.delta.kernel.data.Row; import io.delta.kernel.internal.DeltaErrors; import io.delta.kernel.internal.SnapshotImpl; -import io.delta.kernel.internal.TableFeatures; import io.delta.kernel.internal.actions.*; +import io.delta.kernel.internal.tablefeatures.TableFeatures; import io.delta.kernel.utils.CloseableIterable; import io.delta.kernel.utils.CloseableIterator; import java.io.IOException; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 40d8bb411e3..11774741e8a 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -19,9 +19,9 @@ import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO; import static io.delta.kernel.internal.TableConfig.EXPIRED_LOG_CLEANUP_ENABLED; import static io.delta.kernel.internal.TableConfig.LOG_RETENTION; -import static io.delta.kernel.internal.TableFeatures.validateWriteSupportedTable; import static io.delta.kernel.internal.replay.LogReplayUtils.assertLogFilesBelongToTable; import static io.delta.kernel.internal.snapshot.MetadataCleanup.cleanupExpiredLogs; +import static io.delta.kernel.internal.tablefeatures.TableFeatures.validateWriteSupportedTable; import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static java.lang.String.format; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnablementByMetadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnablementByMetadata.java new file mode 100644 index 00000000000..72323527eec --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnablementByMetadata.java @@ -0,0 +1,49 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.internal.tablefeatures; + +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.actions.Protocol; + +/** + * Defines behavior for {@link TableFeature} that can be automatically enabled via a change in a + * table's metadata, e.g., through setting particular values of certain feature-specific table + * properties. + * + *

When the feature's metadata requirements are satisfied for new tables, or for + * existing tables when {@link #automaticallyUpdateProtocolOfExistingTables()} set to + * `true`, the client will silently add the feature to the protocol's `readerFeatures` + * and/or `writerFeatures`. Otherwise, a proper protocol version bump must be present in the same + * transaction. + */ +public interface FeatureAutoEnablementByMetadata { + /** + * Whether the feature can automatically update the protocol of an existing table when the + * metadata requirements are satisfied. As a rule of thumb, a table feature that requires explicit + * operations (e.g., turning on a table property) should set this flag to `true`, while features + * that are used implicitly (e.g., when using a new data type) should set this flag to `false`. + */ + default boolean automaticallyUpdateProtocolOfExistingTables() { + return true; + } + + /** + * Determine whether the feature must be supported and enabled because its metadata requirements + * are satisfied. + */ + boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java new file mode 100644 index 00000000000..c6e1ed5d917 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java @@ -0,0 +1,148 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.tablefeatures; + +import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +/** + * Base class for table features. + * + *

A feature can be explicitly supported by a table's protocol when the protocol contains + * a feature's `name`. Writers (for writer-only features) or readers and writers (for reader-writer + * features) must recognize supported features and must handle them appropriately. + * + *

A table feature that released before Delta Table Features (reader version 3 and writer version + * 7) is considered as a legacy feature. Legacy features are implicitly + * supported when (a) the protocol does not support table features, i.e., has reader + * version less than 3 or writer version less than 7 and (b) the feature's minimum reader/writer + * version is less than or equal to the current protocol's reader/writer version. + * + *

Separately, a feature can be automatically supported by a table's metadata when certain + * feature-specific table properties are set. For example, `changeDataFeed` is automatically + * supported when there's a table property `delta.enableChangeDataFeed=true`. This is independent of + * the table's enabled features. When a feature is supported (explicitly or implicitly) by the table + * protocol but its metadata requirements are not satisfied, then clients still have to understand + * the feature (at least to the extent that they can read and preserve the existing data in the + * table that uses the feature). + */ +public class TableFeature { + private final String featureName; + private final int minReaderVersion; + private final int minWriterVersion; + private final boolean readerWriterFeature; + private final boolean isLegacyFeature; + private final List requiredFeatures; + private final Optional featureAutoEnablementByMetadata; + + /** + * Create a new table feature. + * + * @param featureName a globally-unique string indicator to represent the feature. All characters + * must be letters (a-z, A-Z), digits (0-9), '-', or '_'. Words must be in camelCase. + * @param minReaderVersion the minimum reader version this feature requires. For a feature that + * can only be explicitly supported, this is either `0` or `3` (the reader protocol version + * that supports table features), depending on the feature is writer-only or reader-writer. + * For a legacy feature that can be implicitly supported, this is the first protocol version + * which the feature is introduced. + * @param minWriterVersion the minimum writer version this feature requires. For a feature that + * can only be explicitly supported, this is the writer protocol `7` that supports table + * features. For a legacy feature that can be implicitly supported, this is the first protocol + * version which the feature is introduced. + * @param isLegacyTableFeature this feature is a legacy feature? i.e a feature that released + * before Delta Table Features (reader version 3 and writer version 7). + * @param requiredFeatures Set of table features that this table feature depends on. I.e. the set + * of features that need to be enabled if this table feature is enabled. + */ + public TableFeature( + String featureName, + int minReaderVersion, + int minWriterVersion, + boolean readerWriterFeature, + boolean isLegacyTableFeature, + List requiredFeatures, + Optional featureAutoEnablementByMetadata) { + this.featureName = requireNonNull(featureName, "name is null"); + checkArgument(!featureName.isEmpty(), "name is empty"); + checkArgument( + featureName.chars().allMatch(c -> Character.isLetterOrDigit(c) || c == '-' || c == '_'), + "name contains invalid characters: " + featureName); + this.minReaderVersion = minReaderVersion; + this.minWriterVersion = minWriterVersion; + if (!readerWriterFeature) { + checkArgument(minReaderVersion == 0, "Writer-only feature must have minReaderVersion=0"); + } + this.readerWriterFeature = readerWriterFeature; + this.isLegacyFeature = isLegacyTableFeature; + this.requiredFeatures = + Collections.unmodifiableList(requireNonNull(requiredFeatures, "requiredFeatures is null")); + this.featureAutoEnablementByMetadata = + requireNonNull(featureAutoEnablementByMetadata, "featureAutoEnablementByMetadata is null"); + + featureAutoEnablementByMetadata.ifPresent( + autoEnablementByMetadata -> { + checkArgument( + isLegacyFeature() + && autoEnablementByMetadata.automaticallyUpdateProtocolOfExistingTables(), + "Legacy feature must be auto-update capable."); + }); + } + /** @return the name of the table feature. */ + String featureName() { + return featureName; + } + + /** + * @return true if this feature is applicable to both reader and writer, false if it is + * writer-only. + */ + boolean isReaderWriterFeature() { + return readerWriterFeature; + } + + /** @return the minimum reader version this feature requires */ + int minReaderVersion() { + return minReaderVersion; + } + + /** @return the minimum writer version that this feature requires. */ + int minWriterVersion() { + return minWriterVersion; + } + + /** @return if this feature is a legacy feature? */ + boolean isLegacyFeature() { + return isLegacyFeature; + } + + /** + * Set of table features that this table feature depends on. I.e. the set of features that need to + * be enabled if this table feature is enabled. + * + * @return the list of table features that this table feature depends on. + */ + List requiredFeatures() { + return requiredFeatures; + } + + Optional featureAutoEnablementByMetadata() { + return featureAutoEnablementByMetadata; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java similarity index 53% rename from kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java index 812c7dc87dc..b50812d9d0b 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java @@ -14,11 +14,16 @@ * limitations under the License. */ -package io.delta.kernel.internal; +package io.delta.kernel.internal.tablefeatures; import static io.delta.kernel.internal.DeltaErrors.*; import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED; +import static io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode.NONE; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import io.delta.kernel.internal.DeltaErrors; +import io.delta.kernel.internal.TableConfig; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.util.ColumnMapping; @@ -30,6 +35,204 @@ /** Contains utility methods related to the Delta table feature support in protocol. */ public class TableFeatures { + ////////////////////////////////////////////////////////////////////// + /// Define the {@link TableFeature}s that are supported by Kernel /// + ////////////////////////////////////////////////////////////////////// + static TableFeature APPEND_ONLY_FEATURE = + new TableFeature( + /* featureName = */ "appendOnly", + /* minReaderVersion = */ 0, + /* minWriterVersion = */ 2, + /* readerWriterFeature = */ false, + /* isLegacyFeature = */ true, + /* requiredFeatures = */ emptyList(), + /* featureAutoEnablementByMetadata = */ Optional.of( + (protocol, metadata) -> TableConfig.APPEND_ONLY.fromMetadata(metadata))); + + static TableFeature INVARIANTS_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "invariants", + /* minReaderVersion = */ 0, + /* minWriterVersion = */ 2, + /* readerWriterFeature = */ false, + /* isLegacyFeature = */ true, + /* requiredFeatures = */ emptyList(), + /* featureAutoEnablementByMetadata = */ Optional.of( + (protocol, metadata) -> hasInvariants(metadata.getSchema()))); + + static TableFeature CHECK_CONSTRAINTS_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "checkConstraints", + /* minReaderVersion = */ 0, + /* minWriterVersion = */ 3, + /* readerWriterFeature = */ false, + /* isLegacyFeature = */ true, + /* requiredFeatures = */ emptyList(), + /* featureAutoEnablementByMetadata = */ Optional.of( + (protocol, metadata) -> hasCheckConstraints(metadata))); + + static TableFeature CHANGE_DATA_FEED_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "changeDataFeed", + /* minReaderVersion = */ 0, + /* minWriterVersion = */ 3, + /* readerWriterFeature = */ false, + /* isLegacyFeature = */ true, + /* requiredFeatures = */ emptyList(), + /* featureAutoEnablementByMetadata = */ Optional.of( + (protocol, metadata) -> TableConfig.CHANGE_DATA_FEED.fromMetadata(metadata))); + + static TableFeature COLUMN_MAPPING_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "columnMapping", + /* minReaderVersion = */ 2, + /* minWriterVersion = */ 5, + /* readerWriterFeature = */ true, + /* isLegacyFeature = */ true, + /* requiredFeatures = */ emptyList(), + /* featureAutoEnablementByMetadata = */ Optional.of( + (protocol, metadata) -> + TableConfig.COLUMN_MAPPING_MODE.fromMetadata(metadata) != NONE)); + + static TableFeature TIMESTAMP_NTZ_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "timestampNtz", + /* minReaderVersion = */ 3, + /* minWriterVersion = */ 7, + /* readerWriterFeature = */ true, + /* isLegacyFeature = */ false, + /* requiredFeatures = */ emptyList(), + /* featureAutoEnablementByMetadata = */ Optional.of( + (protocol, metadata) -> { + // TODO: check schema recursively for variant types + return true; + })); + + static TableFeature VARIANT_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "variantType", + /* minReaderVersion = */ 3, + /* minWriterVersion = */ 7, + /* readerWriterFeature = */ true, + /* isLegacyFeature = */ false, + /* requiredFeatures = */ emptyList(), + /* featureAutoEnablementByMetadata = */ Optional.of( + (protocol, metadata) -> { + // TODO: check schema recursively for variant types + return true; + })); + + static TableFeature VARIANT_PREVIEW_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "variantType-preview", + /* minReaderVersion = */ 3, + /* minWriterVersion = */ 7, + /* readerWriterFeature = */ true, + /* isLegacyFeature = */ false, + /* requiredFeatures = */ emptyList(), + /* featureAutoEnablementByMetadata = */ Optional.of( + (protocol, metadata) -> { + // TODO: check schema recursively for variant types + return true; + })); + + static TableFeature DELETION_VECTORS_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "deletionVectors", + /* minReaderVersion = */ 3, + /* minWriterVersion = */ 7, + /* readerWriterFeature = */ true, + /* isLegacyFeature = */ false, + /* requiredFeatures = */ emptyList(), + /* featureAutoEnablementByMetadata = */ Optional.of( + (protocol, metadata) -> + TableConfig.ENABLE_DELETION_VECTORS_CREATION.fromMetadata(metadata))); + + static TableFeature DOMAIN_METADATA_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "domainMetadata", + /* minReaderVersion = */ 3, + /* minWriterVersion = */ 7, + /* readerWriterFeature = */ false, + /* isLegacyFeature = */ false, + /* requiredFeatures = */ emptyList(), + /* featureAutoEnablementByMetadata = */ Optional.empty()); + + static TableFeature ROW_TRACKING_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "rowTracking", + /* minReaderVersion = */ 3, + /* minWriterVersion = */ 7, + /* readerWriterFeature = */ false, + /* isLegacyFeature = */ false, + /* requiredFeatures = */ asList(DOMAIN_METADATA_TABLE_FEATURE), + /* featureAutoEnablementByMetadata = */ Optional.of( + (protocol, metadata) -> TableConfig.ROW_TRACKING_ENABLED.fromMetadata(metadata))); + + static TableFeature ICEBERG_COMPAT_V2_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "icebergCompatV2", + /* minReaderVersion = */ 3, + /* minWriterVersion = */ 7, + /* readerWriterFeature = */ false, + /* isLegacyFeature = */ false, + /* requiredFeatures = */ asList(COLUMN_MAPPING_TABLE_FEATURE), + /* featureAutoEnablementByMetadata = */ Optional.of( + (protocol, metadata) -> + TableConfig.ICEBERG_COMPAT_V2_ENABLED.fromMetadata(metadata))); + + static TableFeature TYPE_WIDENING_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "typeWidening", + /* minReaderVersion = */ 3, + /* minWriterVersion = */ 7, + /* readerWriterFeature = */ true, + /* isLegacyFeature = */ false, + /* requiredFeatures = */ emptyList(), + /* featureAutoEnablementByMetadata = */ Optional.of( + (protocol, metadata) -> { + return TableConfig.ENABLE_TYPE_WIDENING.fromMetadata(metadata); + // TODO: there is more. Need to check if any of the fields + // have the metadata field "delta.typeWidening" + })); + + static TableFeature TYPE_WIDENING_PREVIEW_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "typeWidening", + /* minReaderVersion = */ 3, + /* minWriterVersion = */ 7, + /* readerWriterFeature = */ true, + /* isLegacyFeature = */ false, + /* requiredFeatures = */ emptyList(), + /* featureAutoEnablementByMetadata = */ Optional.of( + (protocol, metadata) -> { + return TableConfig.ENABLE_TYPE_WIDENING.fromMetadata(metadata); + // TODO: there is more. Need to check if any of the fields + // have the metadata field "delta.typeWidening" + })); + + static TableFeature IN_COMMIT_TIMESTAMP_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "inCommitTimestamp", + /* minReaderVersion = */ 3, + /* minWriterVersion = */ 7, + /* readerWriterFeature = */ false, + /* isLegacyFeature = */ false, + /* requiredFeatures = */ emptyList(), + /* featureAutoEnablementByMetadata = */ Optional.of( + (protocol, metadata) -> + TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata))); + + static TableFeature VACUUM_PROTOCOL_CHECK_TABLE_FEATURE = + new TableFeature( + /* featureName = */ "vacuumProtocolCheck", + /* minReaderVersion = */ 3, + /* minWriterVersion = */ 7, + /* readerWriterFeature = */ true, + /* isLegacyFeature = */ false, + /* requiredFeatures = */ emptyList(), + /* featureAutoEnablementByMetadata = */ Optional.empty()); + private static final Set SUPPORTED_WRITER_FEATURES = Collections.unmodifiableSet( new HashSet() { @@ -268,14 +471,23 @@ private static boolean metadataRequiresWriterFeatureToBeEnabled( } private static void validateNoInvariants(StructType tableSchema) { - boolean hasInvariants = - tableSchema.fields().stream() - .anyMatch(field -> field.getMetadata().contains("delta.invariants")); - if (hasInvariants) { + if (hasInvariants(tableSchema)) { throw columnInvariantsNotSupported(); } } + private static boolean hasInvariants(StructType tableSchema) { + return tableSchema.fields().stream() + .anyMatch(field -> field.getMetadata().contains("delta.invariants")); + } + + private static boolean hasCheckConstraints(Metadata metadata) { + return metadata.getConfiguration().entrySet().stream() + .findAny() + .map(entry -> entry.getKey().equals("delta.constraints.")) + .orElse(false); + } + private static boolean isWriterFeatureSupported(Protocol protocol, String featureName) { List writerFeatures = protocol.getWriterFeatures(); if (writerFeatures == null) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/DomainMetadataUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/DomainMetadataUtils.java index 454d1a2b33a..528b61bbb22 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/DomainMetadataUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/DomainMetadataUtils.java @@ -18,9 +18,9 @@ import io.delta.kernel.data.ColumnVector; import io.delta.kernel.internal.DeltaErrors; -import io.delta.kernel.internal.TableFeatures; import io.delta.kernel.internal.actions.DomainMetadata; import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.internal.tablefeatures.TableFeatures; import java.util.HashMap; import java.util.List; import java.util.Map; diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala index ef0dcd376c6..967b4b4d606 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala @@ -17,7 +17,7 @@ package io.delta.kernel.internal import io.delta.kernel.data.{ArrayValue, ColumnVector, MapValue} import io.delta.kernel.exceptions.KernelException -import io.delta.kernel.internal.TableFeatures.validateWriteSupportedTable +import io.delta.kernel.internal.tablefeatures.TableFeatures.validateWriteSupportedTable import io.delta.kernel.internal.actions.{Format, Metadata, Protocol} import io.delta.kernel.internal.util.InternalUtils.singletonStringColumnVector import io.delta.kernel.types._ From a3e08c268e800a18449001d42925eecfed1979d4 Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Fri, 7 Feb 2025 13:19:58 -0800 Subject: [PATCH 2/5] interface updates --- .../tablefeatures/BaseTableFeatureImpl.java | 91 ++++ ...java => FeatureAutoEnabledByMetadata.java} | 9 +- .../internal/tablefeatures/TableFeature.java | 111 ++--- .../internal/tablefeatures/TableFeatures.java | 423 +++++++++--------- .../delta/actions/TableFeatureSupport.scala | 4 + 5 files changed, 338 insertions(+), 300 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/BaseTableFeatureImpl.java rename kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/{FeatureAutoEnablementByMetadata.java => FeatureAutoEnabledByMetadata.java} (86%) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/BaseTableFeatureImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/BaseTableFeatureImpl.java new file mode 100644 index 00000000000..fbee94567b6 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/BaseTableFeatureImpl.java @@ -0,0 +1,91 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.tablefeatures; + +import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.internal.tablefeatures.TableFeatures.LegacyFeatureType; +import io.delta.kernel.internal.tablefeatures.TableFeatures.ReaderWriterFeatureType; +import java.util.Collections; +import java.util.Set; + +/** Base class for table features. See {@link TableFeature} for more information. */ +public abstract class BaseTableFeatureImpl implements TableFeature { + private final String featureName; + private final int minReaderVersion; + private final int minWriterVersion; + + /** + * Constructor. Does validations to make sure: + * + *

    + *
  • Feature name is not null or empty and has valid characters + *
  • minReaderVersion is always 0 for writer features + *
  • all legacy features can be auto applied based on the metadata and protocol + * + * @param featureName a globally-unique string indicator to represent the feature. All characters + * must be letters (a-z, A-Z), digits (0-9), '-', or '_'. Words must be in camelCase. + * @param minReaderVersion the minimum reader version this feature requires. For a feature that + * can only be explicitly supported, this is either `0` or `3` (the reader protocol version + * that supports table features), depending on the feature is writer-only or reader-writer. + * For a legacy feature that can be implicitly supported, this is the first protocol version + * which the feature is introduced. + * @param minWriterVersion the minimum writer version this feature requires. For a feature that + * can only be explicitly supported, this is the writer protocol `7` that supports table + * features. For a legacy feature that can be implicitly supported, this is the first protocol + * version which the feature is introduced. + */ + public BaseTableFeatureImpl(String featureName, int minReaderVersion, int minWriterVersion) { + this.featureName = requireNonNull(featureName, "name is null"); + checkArgument(!featureName.isEmpty(), "name is empty"); + checkArgument( + featureName.chars().allMatch(c -> Character.isLetterOrDigit(c) || c == '-' || c == '_'), + "name contains invalid characters: " + featureName); + this.minReaderVersion = minReaderVersion; + this.minWriterVersion = minWriterVersion; + + validate(); + } + + @Override + public String featureName() { + return featureName; + } + + @Override + public boolean isReaderWriterFeature() { + return this instanceof ReaderWriterFeatureType; + } + + @Override + public int minReaderVersion() { + return minReaderVersion; + } + + public int minWriterVersion() { + return minWriterVersion; + } + + public boolean isLegacyFeature() { + return this instanceof LegacyFeatureType; + } + + @Override + public Set requiredFeatures() { + return Collections.emptySet(); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnablementByMetadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnabledByMetadata.java similarity index 86% rename from kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnablementByMetadata.java rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnabledByMetadata.java index 72323527eec..1e1e4605954 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnablementByMetadata.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnabledByMetadata.java @@ -30,20 +30,21 @@ * and/or `writerFeatures`. Otherwise, a proper protocol version bump must be present in the same * transaction. */ -public interface FeatureAutoEnablementByMetadata { +public interface FeatureAutoEnabledByMetadata extends TableFeature { /** * Whether the feature can automatically update the protocol of an existing table when the * metadata requirements are satisfied. As a rule of thumb, a table feature that requires explicit * operations (e.g., turning on a table property) should set this flag to `true`, while features * that are used implicitly (e.g., when using a new data type) should set this flag to `false`. */ - default boolean automaticallyUpdateProtocolOfExistingTables() { - return true; - } + boolean automaticallyUpdateProtocolOfExistingTables(); /** * Determine whether the feature must be supported and enabled because its metadata requirements * are satisfied. + * + * @param protocol the protocol of the table for features that are already enabled. + * @param metadata the metadata of the table for properties that can enable the feature. */ boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java index c6e1ed5d917..04316de087d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java @@ -16,11 +16,8 @@ package io.delta.kernel.internal.tablefeatures; import static io.delta.kernel.internal.util.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.util.Set; /** * Base class for table features. @@ -43,106 +40,50 @@ * the feature (at least to the extent that they can read and preserve the existing data in the * table that uses the feature). */ -public class TableFeature { - private final String featureName; - private final int minReaderVersion; - private final int minWriterVersion; - private final boolean readerWriterFeature; - private final boolean isLegacyFeature; - private final List requiredFeatures; - private final Optional featureAutoEnablementByMetadata; +public interface TableFeature { - /** - * Create a new table feature. - * - * @param featureName a globally-unique string indicator to represent the feature. All characters - * must be letters (a-z, A-Z), digits (0-9), '-', or '_'. Words must be in camelCase. - * @param minReaderVersion the minimum reader version this feature requires. For a feature that - * can only be explicitly supported, this is either `0` or `3` (the reader protocol version - * that supports table features), depending on the feature is writer-only or reader-writer. - * For a legacy feature that can be implicitly supported, this is the first protocol version - * which the feature is introduced. - * @param minWriterVersion the minimum writer version this feature requires. For a feature that - * can only be explicitly supported, this is the writer protocol `7` that supports table - * features. For a legacy feature that can be implicitly supported, this is the first protocol - * version which the feature is introduced. - * @param isLegacyTableFeature this feature is a legacy feature? i.e a feature that released - * before Delta Table Features (reader version 3 and writer version 7). - * @param requiredFeatures Set of table features that this table feature depends on. I.e. the set - * of features that need to be enabled if this table feature is enabled. - */ - public TableFeature( - String featureName, - int minReaderVersion, - int minWriterVersion, - boolean readerWriterFeature, - boolean isLegacyTableFeature, - List requiredFeatures, - Optional featureAutoEnablementByMetadata) { - this.featureName = requireNonNull(featureName, "name is null"); - checkArgument(!featureName.isEmpty(), "name is empty"); - checkArgument( - featureName.chars().allMatch(c -> Character.isLetterOrDigit(c) || c == '-' || c == '_'), - "name contains invalid characters: " + featureName); - this.minReaderVersion = minReaderVersion; - this.minWriterVersion = minWriterVersion; - if (!readerWriterFeature) { - checkArgument(minReaderVersion == 0, "Writer-only feature must have minReaderVersion=0"); - } - this.readerWriterFeature = readerWriterFeature; - this.isLegacyFeature = isLegacyTableFeature; - this.requiredFeatures = - Collections.unmodifiableList(requireNonNull(requiredFeatures, "requiredFeatures is null")); - this.featureAutoEnablementByMetadata = - requireNonNull(featureAutoEnablementByMetadata, "featureAutoEnablementByMetadata is null"); - - featureAutoEnablementByMetadata.ifPresent( - autoEnablementByMetadata -> { - checkArgument( - isLegacyFeature() - && autoEnablementByMetadata.automaticallyUpdateProtocolOfExistingTables(), - "Legacy feature must be auto-update capable."); - }); - } /** @return the name of the table feature. */ - String featureName() { - return featureName; - } + String featureName(); /** * @return true if this feature is applicable to both reader and writer, false if it is * writer-only. */ - boolean isReaderWriterFeature() { - return readerWriterFeature; - } + boolean isReaderWriterFeature(); /** @return the minimum reader version this feature requires */ - int minReaderVersion() { - return minReaderVersion; - } + int minReaderVersion(); /** @return the minimum writer version that this feature requires. */ - int minWriterVersion() { - return minWriterVersion; - } + int minWriterVersion(); /** @return if this feature is a legacy feature? */ - boolean isLegacyFeature() { - return isLegacyFeature; - } + boolean isLegacyFeature(); /** * Set of table features that this table feature depends on. I.e. the set of features that need to * be enabled if this table feature is enabled. * - * @return the list of table features that this table feature depends on. + * @return the set of table features that this table feature depends on. */ - List requiredFeatures() { - return requiredFeatures; - } + Set requiredFeatures(); + + /** + * Validate the table feature. This method should throw an exception if the table feature + * properties are invalid. Should be called after the object deriving the {@link TableFeature} is + * constructed. + */ + default void validate() { + if (!isReaderWriterFeature()) { + checkArgument(minReaderVersion() == 0, "Writer-only feature must have minReaderVersion=0"); + } - Optional featureAutoEnablementByMetadata() { - return featureAutoEnablementByMetadata; + if (isLegacyFeature()) { + checkArgument( + this instanceof FeatureAutoEnabledByMetadata + && ((FeatureAutoEnabledByMetadata) this) + .automaticallyUpdateProtocolOfExistingTables(), + "Legacy feature must be auto-enable capable based on metadata"); + } } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java index b50812d9d0b..b3f5bd6692d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java @@ -17,10 +17,11 @@ package io.delta.kernel.internal.tablefeatures; import static io.delta.kernel.internal.DeltaErrors.*; +import static io.delta.kernel.internal.TableConfig.APPEND_ONLY; +import static io.delta.kernel.internal.TableConfig.COLUMN_MAPPING_MODE; import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED; +import static io.delta.kernel.internal.TableConfig.ROW_TRACKING_ENABLED; import static io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode.NONE; -import static java.util.Arrays.asList; -import static java.util.Collections.emptyList; import io.delta.kernel.internal.DeltaErrors; import io.delta.kernel.internal.TableConfig; @@ -38,200 +39,211 @@ public class TableFeatures { ////////////////////////////////////////////////////////////////////// /// Define the {@link TableFeature}s that are supported by Kernel /// ////////////////////////////////////////////////////////////////////// - static TableFeature APPEND_ONLY_FEATURE = - new TableFeature( - /* featureName = */ "appendOnly", - /* minReaderVersion = */ 0, - /* minWriterVersion = */ 2, - /* readerWriterFeature = */ false, - /* isLegacyFeature = */ true, - /* requiredFeatures = */ emptyList(), - /* featureAutoEnablementByMetadata = */ Optional.of( - (protocol, metadata) -> TableConfig.APPEND_ONLY.fromMetadata(metadata))); - - static TableFeature INVARIANTS_TABLE_FEATURE = - new TableFeature( - /* featureName = */ "invariants", - /* minReaderVersion = */ 0, - /* minWriterVersion = */ 2, - /* readerWriterFeature = */ false, - /* isLegacyFeature = */ true, - /* requiredFeatures = */ emptyList(), - /* featureAutoEnablementByMetadata = */ Optional.of( - (protocol, metadata) -> hasInvariants(metadata.getSchema()))); - - static TableFeature CHECK_CONSTRAINTS_TABLE_FEATURE = - new TableFeature( - /* featureName = */ "checkConstraints", - /* minReaderVersion = */ 0, - /* minWriterVersion = */ 3, - /* readerWriterFeature = */ false, - /* isLegacyFeature = */ true, - /* requiredFeatures = */ emptyList(), - /* featureAutoEnablementByMetadata = */ Optional.of( - (protocol, metadata) -> hasCheckConstraints(metadata))); - - static TableFeature CHANGE_DATA_FEED_TABLE_FEATURE = - new TableFeature( - /* featureName = */ "changeDataFeed", - /* minReaderVersion = */ 0, - /* minWriterVersion = */ 3, - /* readerWriterFeature = */ false, - /* isLegacyFeature = */ true, - /* requiredFeatures = */ emptyList(), - /* featureAutoEnablementByMetadata = */ Optional.of( - (protocol, metadata) -> TableConfig.CHANGE_DATA_FEED.fromMetadata(metadata))); - - static TableFeature COLUMN_MAPPING_TABLE_FEATURE = - new TableFeature( + /** An interface to indicate a feature is legacy, i.e., released before Table Features. */ + public interface LegacyFeatureType extends FeatureAutoEnabledByMetadata { + @Override + default boolean automaticallyUpdateProtocolOfExistingTables() { + return true; // legacy features can always be applied to existing tables + } + } + + /** An interface to indicate a feature applies to readers and writers. */ + public interface ReaderWriterFeatureType {} + + /** A base class for all table legacy writer-only features. */ + public abstract static class LegacyWriterFeature extends BaseTableFeatureImpl + implements LegacyFeatureType { + public LegacyWriterFeature(String featureName, int minWriterVersion) { + super(featureName, /* minReaderVersion = */ 0, minWriterVersion); + } + } + + /** A base class for all table legacy reader-writer features. */ + public abstract static class LegacyReaderWriterFeature extends BaseTableFeatureImpl + implements LegacyFeatureType { + public LegacyReaderWriterFeature( + String featureName, int minReaderVersion, int minWriterVersion) { + super(featureName, minReaderVersion, minWriterVersion); + } + } + + /** A base class for all non-legacy table writer features. */ + public static class WriterFeature extends BaseTableFeatureImpl { + public WriterFeature(String featureName, int minWriterVersion) { + super(featureName, /* minReaderVersion = */ 0, minWriterVersion); + } + } + + /** A base class for all non-legacy table reader-writer features. */ + public static class ReaderWriterFeature extends BaseTableFeatureImpl + implements ReaderWriterFeatureType { + public ReaderWriterFeature(String featureName, int minReaderVersion, int minWriterVersion) { + super(featureName, minReaderVersion, minWriterVersion); + } + } + + public static final TableFeature APPEND_ONLY_FEATURE = new AppendOnlyFeature(); + + private static class AppendOnlyFeature extends LegacyWriterFeature { + AppendOnlyFeature() { + super(/* featureName = */ "appendOnly", /* minWriterVersion = */ 2); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return APPEND_ONLY.fromMetadata(metadata); + } + } + + public static final TableFeature INVARIANTS_FEATURE = new InvariantsFeature(); + + private static class InvariantsFeature extends LegacyWriterFeature { + InvariantsFeature() { + super(/* featureName = */ "invariants", /* minWriterVersion = */ 2); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return hasInvariants(metadata.getSchema()); + } + } + + public static final TableFeature COLUMN_MAPPING_FEATURE = new ColumnMappingFeature(); + + private static class ColumnMappingFeature extends LegacyReaderWriterFeature { + ColumnMappingFeature() { + super( /* featureName = */ "columnMapping", /* minReaderVersion = */ 2, - /* minWriterVersion = */ 5, - /* readerWriterFeature = */ true, - /* isLegacyFeature = */ true, - /* requiredFeatures = */ emptyList(), - /* featureAutoEnablementByMetadata = */ Optional.of( - (protocol, metadata) -> - TableConfig.COLUMN_MAPPING_MODE.fromMetadata(metadata) != NONE)); - - static TableFeature TIMESTAMP_NTZ_TABLE_FEATURE = - new TableFeature( - /* featureName = */ "timestampNtz", - /* minReaderVersion = */ 3, - /* minWriterVersion = */ 7, - /* readerWriterFeature = */ true, - /* isLegacyFeature = */ false, - /* requiredFeatures = */ emptyList(), - /* featureAutoEnablementByMetadata = */ Optional.of( - (protocol, metadata) -> { - // TODO: check schema recursively for variant types - return true; - })); - - static TableFeature VARIANT_TABLE_FEATURE = - new TableFeature( - /* featureName = */ "variantType", - /* minReaderVersion = */ 3, - /* minWriterVersion = */ 7, - /* readerWriterFeature = */ true, - /* isLegacyFeature = */ false, - /* requiredFeatures = */ emptyList(), - /* featureAutoEnablementByMetadata = */ Optional.of( - (protocol, metadata) -> { - // TODO: check schema recursively for variant types - return true; - })); - - static TableFeature VARIANT_PREVIEW_TABLE_FEATURE = - new TableFeature( - /* featureName = */ "variantType-preview", - /* minReaderVersion = */ 3, - /* minWriterVersion = */ 7, - /* readerWriterFeature = */ true, - /* isLegacyFeature = */ false, - /* requiredFeatures = */ emptyList(), - /* featureAutoEnablementByMetadata = */ Optional.of( - (protocol, metadata) -> { - // TODO: check schema recursively for variant types - return true; - })); - - static TableFeature DELETION_VECTORS_TABLE_FEATURE = - new TableFeature( - /* featureName = */ "deletionVectors", - /* minReaderVersion = */ 3, - /* minWriterVersion = */ 7, - /* readerWriterFeature = */ true, - /* isLegacyFeature = */ false, - /* requiredFeatures = */ emptyList(), - /* featureAutoEnablementByMetadata = */ Optional.of( - (protocol, metadata) -> - TableConfig.ENABLE_DELETION_VECTORS_CREATION.fromMetadata(metadata))); - - static TableFeature DOMAIN_METADATA_TABLE_FEATURE = - new TableFeature( - /* featureName = */ "domainMetadata", - /* minReaderVersion = */ 3, - /* minWriterVersion = */ 7, - /* readerWriterFeature = */ false, - /* isLegacyFeature = */ false, - /* requiredFeatures = */ emptyList(), - /* featureAutoEnablementByMetadata = */ Optional.empty()); - - static TableFeature ROW_TRACKING_TABLE_FEATURE = - new TableFeature( - /* featureName = */ "rowTracking", - /* minReaderVersion = */ 3, - /* minWriterVersion = */ 7, - /* readerWriterFeature = */ false, - /* isLegacyFeature = */ false, - /* requiredFeatures = */ asList(DOMAIN_METADATA_TABLE_FEATURE), - /* featureAutoEnablementByMetadata = */ Optional.of( - (protocol, metadata) -> TableConfig.ROW_TRACKING_ENABLED.fromMetadata(metadata))); - - static TableFeature ICEBERG_COMPAT_V2_TABLE_FEATURE = - new TableFeature( - /* featureName = */ "icebergCompatV2", - /* minReaderVersion = */ 3, - /* minWriterVersion = */ 7, - /* readerWriterFeature = */ false, - /* isLegacyFeature = */ false, - /* requiredFeatures = */ asList(COLUMN_MAPPING_TABLE_FEATURE), - /* featureAutoEnablementByMetadata = */ Optional.of( - (protocol, metadata) -> - TableConfig.ICEBERG_COMPAT_V2_ENABLED.fromMetadata(metadata))); - - static TableFeature TYPE_WIDENING_TABLE_FEATURE = - new TableFeature( - /* featureName = */ "typeWidening", - /* minReaderVersion = */ 3, - /* minWriterVersion = */ 7, - /* readerWriterFeature = */ true, - /* isLegacyFeature = */ false, - /* requiredFeatures = */ emptyList(), - /* featureAutoEnablementByMetadata = */ Optional.of( - (protocol, metadata) -> { - return TableConfig.ENABLE_TYPE_WIDENING.fromMetadata(metadata); - // TODO: there is more. Need to check if any of the fields - // have the metadata field "delta.typeWidening" - })); - - static TableFeature TYPE_WIDENING_PREVIEW_TABLE_FEATURE = - new TableFeature( - /* featureName = */ "typeWidening", - /* minReaderVersion = */ 3, - /* minWriterVersion = */ 7, - /* readerWriterFeature = */ true, - /* isLegacyFeature = */ false, - /* requiredFeatures = */ emptyList(), - /* featureAutoEnablementByMetadata = */ Optional.of( - (protocol, metadata) -> { - return TableConfig.ENABLE_TYPE_WIDENING.fromMetadata(metadata); - // TODO: there is more. Need to check if any of the fields - // have the metadata field "delta.typeWidening" - })); - - static TableFeature IN_COMMIT_TIMESTAMP_TABLE_FEATURE = - new TableFeature( - /* featureName = */ "inCommitTimestamp", - /* minReaderVersion = */ 3, - /* minWriterVersion = */ 7, - /* readerWriterFeature = */ false, - /* isLegacyFeature = */ false, - /* requiredFeatures = */ emptyList(), - /* featureAutoEnablementByMetadata = */ Optional.of( - (protocol, metadata) -> - TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata))); - - static TableFeature VACUUM_PROTOCOL_CHECK_TABLE_FEATURE = - new TableFeature( - /* featureName = */ "vacuumProtocolCheck", - /* minReaderVersion = */ 3, - /* minWriterVersion = */ 7, - /* readerWriterFeature = */ true, - /* isLegacyFeature = */ false, - /* requiredFeatures = */ emptyList(), - /* featureAutoEnablementByMetadata = */ Optional.empty()); + /* minWriterVersion = */ 5); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return COLUMN_MAPPING_MODE.fromMetadata(metadata) != NONE; + } + } + + public static final TableFeature VARIANT_FEATURE = new VariantTypeTableFeature("variantType"); + public static final TableFeature VARIANT_PREVIEW_FEATURE = + new VariantTypeTableFeature("variantType-preview"); + + static class VariantTypeTableFeature extends ReaderWriterFeature + implements FeatureAutoEnabledByMetadata { + VariantTypeTableFeature(String featureName) { + super( + /* featureName = */ featureName, /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); + } + + @Override + public boolean automaticallyUpdateProtocolOfExistingTables() { + return true; + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + // TODO: check schema recursively for variant types + return false; + } + } + + public static final TableFeature DOMAIN_METADATA_FEATURE = + new WriterFeature("domainMetadata", /* minWriterVersion = */ 7); + + public static final TableFeature ROW_TRACKING_FEATURE = new RowTrackingFeature(); + + private static class RowTrackingFeature extends WriterFeature + implements FeatureAutoEnabledByMetadata { + RowTrackingFeature() { + super("rowTracking", /* minWriterVersion = */ 7); + } + + @Override + public boolean automaticallyUpdateProtocolOfExistingTables() { + return true; + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return ROW_TRACKING_ENABLED.fromMetadata(metadata); + } + + @Override + public Set requiredFeatures() { + return Collections.singleton(DOMAIN_METADATA_FEATURE); + } + } + + public static final TableFeature ICEBERG_COMPAT_V2_TABLE_FEATURE = + new IcebergCompatV2TableFeature(); + + private static class IcebergCompatV2TableFeature extends WriterFeature + implements FeatureAutoEnabledByMetadata { + IcebergCompatV2TableFeature() { + super("icebergCompatV2", /* minWriterVersion = */ 7); + } + + @Override + public boolean automaticallyUpdateProtocolOfExistingTables() { + return true; + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return TableConfig.ICEBERG_COMPAT_V2_ENABLED.fromMetadata(metadata); + } + + public @Override Set requiredFeatures() { + return Collections.singleton(COLUMN_MAPPING_FEATURE); + } + } + + public static final TableFeature TYPE_WIDENING_TABLE_FEATURE = + new TypeWideningTableFeature("typeWidening"); + public static final TableFeature TYPE_WIDENING_PREVIEW_TABLE_FEATURE = + new TypeWideningTableFeature("typeWidening-preview"); + + private static class TypeWideningTableFeature extends ReaderWriterFeature + implements FeatureAutoEnabledByMetadata { + TypeWideningTableFeature(String featureName) { + super(featureName, /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); + } + + @Override + public boolean automaticallyUpdateProtocolOfExistingTables() { + return true; + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return TableConfig.ENABLE_TYPE_WIDENING.fromMetadata(metadata); + // TODO: there is more. Need to check if any of the fields + } + } + + public static final TableFeature IN_COMMIT_TIMESTAMP_FEATURE = + new InCommitTimestampTableFeature(); + + private static class InCommitTimestampTableFeature extends WriterFeature + implements FeatureAutoEnabledByMetadata { + InCommitTimestampTableFeature() { + super("inCommitTimestamp", /* minWriterVersion = */ 7); + } + + @Override + public boolean automaticallyUpdateProtocolOfExistingTables() { + return true; + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata); + } + } + + public static final TableFeature VACUUM_PROTOCOL_CHECK_TABLE_FEATURE = + new ReaderWriterFeature( + "vacuumProtocolCheck", /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); private static final Set SUPPORTED_WRITER_FEATURES = Collections.unmodifiableSet( @@ -242,8 +254,8 @@ public class TableFeatures { add("columnMapping"); add("typeWidening-preview"); add("typeWidening"); - add(DOMAIN_METADATA_FEATURE_NAME); - add(ROW_TRACKING_FEATURE_NAME); + add("domainMetadata"); + add("rowTracking"); } }); @@ -263,19 +275,9 @@ public class TableFeatures { } }); - public static final String DOMAIN_METADATA_FEATURE_NAME = "domainMetadata"; - - public static final String ROW_TRACKING_FEATURE_NAME = "rowTracking"; - - public static final String INVARIANTS_FEATURE_NAME = "invariants"; - - /** The minimum writer version required to support table features. */ - public static final int TABLE_FEATURES_MIN_WRITER_VERSION = 7; - //////////////////// // Helper Methods // //////////////////// - public static void validateReadSupportedTable( Protocol protocol, String tablePath, Optional metadata) { switch (protocol.getMinReaderVersion()) { @@ -341,7 +343,7 @@ public static void validateWriteSupportedTable( throw unsupportedWriterProtocol(tablePath, minWriterVersion); case 7: for (String writerFeature : protocol.getWriterFeatures()) { - if (writerFeature.equals(INVARIANTS_FEATURE_NAME)) { + if (writerFeature.equals("invariants")) { // For version 7, we allow 'invariants' to be present in the protocol's writerFeatures // to unblock certain use cases, provided that no invariants are defined in the schema. validateNoInvariants(tableSchema); @@ -409,7 +411,7 @@ public static Set extractAutomaticallyEnabledWriterFeatures( * @return true if the "domainMetadata" feature is supported, false otherwise */ public static boolean isDomainMetadataSupported(Protocol protocol) { - return isWriterFeatureSupported(protocol, DOMAIN_METADATA_FEATURE_NAME); + return isWriterFeatureSupported(protocol, "domainMetadata"); } /** @@ -419,7 +421,7 @@ public static boolean isDomainMetadataSupported(Protocol protocol) { * @return true if the protocol supports row tracking, false otherwise */ public static boolean isRowTrackingSupported(Protocol protocol) { - return isWriterFeatureSupported(protocol, ROW_TRACKING_FEATURE_NAME); + return isWriterFeatureSupported(protocol, "rowTracking"); } /** @@ -493,7 +495,6 @@ private static boolean isWriterFeatureSupported(Protocol protocol, String featur if (writerFeatures == null) { return false; } - return writerFeatures.contains(featureName) - && protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION; + return writerFeatures.contains(featureName) && protocol.getMinWriterVersion() >= 7; } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala index 7b75ac42c80..390191477e1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala @@ -367,6 +367,10 @@ trait TableFeatureSupport { this: Protocol => newProtocol.implicitlyAndExplicitlySupportedFeatures) { newProtocol } else { + // means we have some that is added after table feature support. + // Whatever the feature (reader or readerWriter), it is always going to + // be have minWriterVersion as 7. Overall required minReaderVersion + // should be based on the supported feature requirements. Protocol(minReaderVersion, TABLE_FEATURES_MIN_WRITER_VERSION) .withFeatures(readerAndWriterFeatures) } From f00cd11af06f7d0acb8da43587cf2556fd424949 Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Tue, 11 Feb 2025 01:23:08 -0800 Subject: [PATCH 3/5] utility methods --- .../internal/TransactionBuilderImpl.java | 4 +- .../kernel/internal/actions/Protocol.java | 256 +++++++++++++++++- .../tablefeatures/BaseTableFeatureImpl.java | 91 ------- .../FeatureAutoEnabledByMetadata.java | 2 +- .../internal/tablefeatures/TableFeature.java | 71 ++++- .../internal/tablefeatures/TableFeatures.java | 207 +++++++++++++- .../internal/util/CaseInsensitiveMap.java | 106 ++++++++ 7 files changed, 613 insertions(+), 124 deletions(-) delete mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/BaseTableFeatureImpl.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CaseInsensitiveMap.java diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java index 29752160bbe..1d8206eca0f 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java @@ -155,9 +155,9 @@ public Transaction build(Engine engine) { if (!newWriterFeatures.isEmpty()) { logger.info("Automatically enabling writer features: {}", newWriterFeatures); shouldUpdateProtocol = true; - List oldWriterFeatures = protocol.getWriterFeatures(); + Set oldWriterFeatures = protocol.getWriterFeatures(); protocol = protocol.withNewWriterFeatures(newWriterFeatures); - List curWriterFeatures = protocol.getWriterFeatures(); + Set curWriterFeatures = protocol.getWriterFeatures(); checkArgument(!Objects.equals(oldWriterFeatures, curWriterFeatures)); TableFeatures.validateWriteSupportedTable( protocol, metadata, metadata.getSchema(), table.getPath(engine)); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java index fa35d020c4b..feb002f25b9 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java @@ -15,10 +15,13 @@ */ package io.delta.kernel.internal.actions; +import static io.delta.kernel.internal.tablefeatures.TableFeatures.TABLE_FEATURES_MIN_WRITER_VERSION; import static io.delta.kernel.internal.util.VectorUtils.stringArrayValue; +import static java.util.Collections.emptySet; import io.delta.kernel.data.*; import io.delta.kernel.internal.data.GenericRow; +import io.delta.kernel.internal.tablefeatures.TableFeature; import io.delta.kernel.internal.tablefeatures.TableFeatures; import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.internal.util.VectorUtils; @@ -27,6 +30,7 @@ import io.delta.kernel.types.StringType; import io.delta.kernel.types.StructType; import java.util.*; +import java.util.stream.Collectors; public class Protocol { @@ -39,11 +43,11 @@ public static Protocol fromColumnVector(ColumnVector vector, int rowId) { vector.getChild(0).getInt(rowId), vector.getChild(1).getInt(rowId), vector.getChild(2).isNullAt(rowId) - ? Collections.emptyList() - : VectorUtils.toJavaList(vector.getChild(2).getArray(rowId)), + ? emptySet() + : new HashSet<>(VectorUtils.toJavaList(vector.getChild(2).getArray(rowId))), vector.getChild(3).isNullAt(rowId) - ? Collections.emptyList() - : VectorUtils.toJavaList(vector.getChild(3).getArray(rowId))); + ? emptySet() + : new HashSet<>(VectorUtils.toJavaList(vector.getChild(3).getArray(rowId)))); } public static final StructType FULL_SCHEMA = @@ -55,14 +59,18 @@ public static Protocol fromColumnVector(ColumnVector vector, int rowId) { private final int minReaderVersion; private final int minWriterVersion; - private final List readerFeatures; - private final List writerFeatures; + private final Set readerFeatures; + private final Set writerFeatures; + + public Protocol(int minReaderVersion, int minWriterVersion) { + this(minReaderVersion, minWriterVersion, emptySet(), emptySet()); + } public Protocol( int minReaderVersion, int minWriterVersion, - List readerFeatures, - List writerFeatures) { + Set readerFeatures, + Set writerFeatures) { this.minReaderVersion = minReaderVersion; this.minWriterVersion = minWriterVersion; this.readerFeatures = readerFeatures; @@ -77,14 +85,41 @@ public int getMinWriterVersion() { return minWriterVersion; } - public List getReaderFeatures() { + public Set getReaderFeatures() { return readerFeatures; } - public List getWriterFeatures() { + public Set getWriterFeatures() { return writerFeatures; } + public Set getImplicitlyEnabledFeatures() { + if (supportsReaderFeatures() && supportsWriterFeatures()) { + return emptySet(); + } else { + return TableFeatures.TABLE_FEATURES.stream() + .filter(f -> !supportsReaderFeatures() && f.minReaderVersion() <= minReaderVersion) + .filter(f -> !supportsReaderFeatures() && f.minWriterVersion() <= minWriterVersion) + .collect(Collectors.toSet()); + } + } + + public Set getExplicitlyEnabledFeatures() { + return TableFeatures.TABLE_FEATURES.stream() + .filter( + f -> + readerFeatures.contains(f.featureName()) + || writerFeatures.contains(f.featureName())) + .collect(Collectors.toSet()); + } + + public Set getImplicitAndExplicitlyEnabledFeatures() { + Set enabledFeatures = new HashSet<>(); + enabledFeatures.addAll(getImplicitlyEnabledFeatures()); + enabledFeatures.addAll(getExplicitlyEnabledFeatures()); + return enabledFeatures; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("Protocol{"); @@ -105,8 +140,8 @@ public Row toRow() { Map protocolMap = new HashMap<>(); protocolMap.put(0, minReaderVersion); protocolMap.put(1, minWriterVersion); - protocolMap.put(2, stringArrayValue(readerFeatures)); - protocolMap.put(3, stringArrayValue(writerFeatures)); + protocolMap.put(2, stringArrayValue(new ArrayList<>(readerFeatures))); + protocolMap.put(3, stringArrayValue(new ArrayList<>(writerFeatures))); return new GenericRow(Protocol.FULL_SCHEMA, protocolMap); } @@ -114,14 +149,207 @@ public Row toRow() { public Protocol withNewWriterFeatures(Set writerFeatures) { Tuple2 newProtocolVersions = TableFeatures.minProtocolVersionFromAutomaticallyEnabledFeatures(writerFeatures); - List newWriterFeatures = new ArrayList<>(writerFeatures); + Set newWriterFeatures = new HashSet<>(writerFeatures); if (this.writerFeatures != null) { newWriterFeatures.addAll(this.writerFeatures); } return new Protocol( newProtocolVersions._1, newProtocolVersions._2, - this.readerFeatures == null ? null : new ArrayList<>(this.readerFeatures), + this.readerFeatures == null ? null : new HashSet<>(this.readerFeatures), + newWriterFeatures); + } + + /** Create a new {@link Protocol} object with the given {@link TableFeature} enabled. */ + public Protocol withFeatures(Iterable newFeatures) { + Protocol result = this; + for (TableFeature feature : newFeatures) { + result = result.withFeature(feature); + } + return result; + } + + /** + * Get a new Protocol object that has `feature` supported. Writer-only features will be added to + * `writerFeatures` field, and reader-writer features will be added to `readerFeatures` and + * `writerFeatures` fields. + * + *

    If `feature` is already implicitly supported in the current protocol's legacy reader or + * writer protocol version, the new protocol will not modify the original protocol version, i.e., + * the feature will not be explicitly added to the protocol's `readerFeatures` or + * `writerFeatures`. This is to avoid unnecessary protocol upgrade for feature that it already + * supports. + */ + public Protocol withFeature(TableFeature feature) { + // Add required dependencies of the feature + Protocol protocolWithDependencies = withFeatures(feature.requiredFeatures()); + + if (feature.minReaderVersion() > protocolWithDependencies.minReaderVersion) { + throw new UnsupportedOperationException( + "TableFeature requires higher reader protocol version"); + } + + if (feature.minWriterVersion() > protocolWithDependencies.minWriterVersion) { + throw new UnsupportedOperationException( + "TableFeature requires higher writer protocol version"); + } + + boolean shouldAddToReaderFeatures = + feature.isReaderWriterFeature() + && + // protocol already has support for `readerFeatures` set and the new feature + // can be explicitly added to the protocol's `readerFeatures` + supportsReaderFeatures(); + + boolean shouldAddToWriterFeatures = supportsWriterFeatures(); + + Set newReaderFeatures = protocolWithDependencies.readerFeatures; + Set newWriterFeatures = protocolWithDependencies.writerFeatures; + + if (shouldAddToReaderFeatures) { + newReaderFeatures = new HashSet<>(protocolWithDependencies.readerFeatures); + newReaderFeatures.add(feature.featureName()); + } + + if (shouldAddToWriterFeatures) { + newWriterFeatures = new HashSet<>(protocolWithDependencies.writerFeatures); + newWriterFeatures.add(feature.featureName()); + } + + return new Protocol( + protocolWithDependencies.minReaderVersion, + protocolWithDependencies.minWriterVersion, + newReaderFeatures, newWriterFeatures); } + + /** + * Determine whether this protocol can be safely upgraded to a new protocol `to`. This means: - + * all features supported by this protocol are supported by `to`. + * + *

    Examples regarding feature status: - from `[appendOnly]` to `[appendOnly]` => allowed. - + * from `[appendOnly, changeDataFeed]` to `[appendOnly]` => not allowed. - from `[appendOnly]` to + * `[appendOnly, changeDataFeed]` => allowed. + */ + public boolean canUpgradeTo(Protocol to) { + // All features supported by `this` are supported by `to`. + return to.getImplicitAndExplicitlyEnabledFeatures() + .containsAll(this.getImplicitAndExplicitlyEnabledFeatures()); + } + + /** + * Protocol normalization is the process of converting a table features protocol to the weakest + * possible form. This primarily refers to converting a table features protocol to a legacy + * protocol. A Table Features protocol can be represented with the legacy representation only when + * the features set of the former exactly matches a legacy protocol. + * + *

    Normalization can also decrease the reader version of a table features protocol when it is + * higher than necessary. + * + *

    For example: (1, 7, AppendOnly, Invariants, CheckConstraints) -> (1, 3) (3, 7, RowTracking) + * -> (1, 7, RowTracking) + */ + public Protocol normalized() { + // Normalization can only be applied to table feature protocols. + if (!supportsWriterFeatures()) { + return this; + } + + Tuple2 versions = + TableFeatures.minimumRequiredVersions(getExplicitlyEnabledFeatures()); + int minReaderVersion = versions._1; + int minWriterVersion = versions._2; + Protocol newProtocol = new Protocol(minReaderVersion, minWriterVersion); + + if (this.getImplicitAndExplicitlyEnabledFeatures() + .equals(newProtocol.getImplicitAndExplicitlyEnabledFeatures())) { + return newProtocol; + } else { + // means we have some that is added after table feature support. + // Whatever the feature (reader or readerWriter), it is always going to + // be have minWriterVersion as 7. Overall required minReaderVersion + // should be based on the supported feature requirements. + return new Protocol(minReaderVersion, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeatures(getExplicitlyEnabledFeatures()); + } + } + + /** + * Protocol denormalization is the process of converting a legacy protocol to the equivalent table + * features protocol. This is the inverse of protocol normalization. It can be used to allow + * operations on legacy protocols that yield results which cannot be represented anymore by a + * legacy protocol. + */ + public Protocol denormalized() { + // Denormalization can only be applied to legacy protocols. + if (supportsWriterFeatures()) { + return this; + } + + Tuple2 versions = + TableFeatures.minimumRequiredVersions(getImplicitlyEnabledFeatures()); + int minReaderVersion = versions._1; + + return new Protocol(minReaderVersion, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeatures(getImplicitlyEnabledFeatures()); + } + + /** + * Helper method that applies both denormalization and normalization. This can be used to + * normalize invalid legacy protocols such as (2, 3), (1, 5). A legacy protocol is invalid when + * the version numbers are higher than required to support the implied feature set. + */ + public Protocol denormalizedNormalized() { + return this.denormalized().normalized(); + } + + /** + * Merge this protocol with multiple `protocols` to have the highest reader and writer versions + * plus all explicitly and implicitly supported features. + */ + public Protocol merge(Protocol... others) { + List protocols = new ArrayList<>(); + protocols.add(this); + protocols.addAll(Arrays.asList(others)); + + int mergedReaderVersion = + protocols.stream().mapToInt(Protocol::getMinReaderVersion).max().orElse(0); + + int mergedWriterVersion = + protocols.stream().mapToInt(Protocol::getMinWriterVersion).max().orElse(0); + + Set mergedReaderFeatures = + protocols.stream().flatMap(p -> p.readerFeatures.stream()).collect(Collectors.toSet()); + + Set mergedWriterFeatures = + protocols.stream().flatMap(p -> p.writerFeatures.stream()).collect(Collectors.toSet()); + + Set mergedImplicitFeatures = + protocols.stream() + .flatMap(p -> p.getImplicitlyEnabledFeatures().stream()) + .collect(Collectors.toSet()); + + Protocol mergedProtocol = + new Protocol( + mergedReaderVersion, + mergedWriterVersion, + mergedReaderFeatures, + mergedWriterFeatures) + .withFeatures(mergedImplicitFeatures); + + // The merged protocol is always normalized in order to represent the protocol + // with the weakest possible form. This enables backward compatibility. + // This is preceded by a denormalization step. This allows to fix invalid legacy Protocols. + // For example, (2, 3) is normalized to (1, 3). This is because there is no legacy feature + // in the set with reader version 2 unless the writer version is at least 5. + return mergedProtocol.denormalizedNormalized(); + } + + private boolean supportsReaderFeatures() { + return TableFeatures.supportsReaderFeatures(minReaderVersion); + } + + private boolean supportsWriterFeatures() { + return TableFeatures.supportsWriterFeatures(minWriterVersion); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/BaseTableFeatureImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/BaseTableFeatureImpl.java deleted file mode 100644 index fbee94567b6..00000000000 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/BaseTableFeatureImpl.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (2024) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.kernel.internal.tablefeatures; - -import static io.delta.kernel.internal.util.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; - -import io.delta.kernel.internal.tablefeatures.TableFeatures.LegacyFeatureType; -import io.delta.kernel.internal.tablefeatures.TableFeatures.ReaderWriterFeatureType; -import java.util.Collections; -import java.util.Set; - -/** Base class for table features. See {@link TableFeature} for more information. */ -public abstract class BaseTableFeatureImpl implements TableFeature { - private final String featureName; - private final int minReaderVersion; - private final int minWriterVersion; - - /** - * Constructor. Does validations to make sure: - * - *

      - *
    • Feature name is not null or empty and has valid characters - *
    • minReaderVersion is always 0 for writer features - *
    • all legacy features can be auto applied based on the metadata and protocol - * - * @param featureName a globally-unique string indicator to represent the feature. All characters - * must be letters (a-z, A-Z), digits (0-9), '-', or '_'. Words must be in camelCase. - * @param minReaderVersion the minimum reader version this feature requires. For a feature that - * can only be explicitly supported, this is either `0` or `3` (the reader protocol version - * that supports table features), depending on the feature is writer-only or reader-writer. - * For a legacy feature that can be implicitly supported, this is the first protocol version - * which the feature is introduced. - * @param minWriterVersion the minimum writer version this feature requires. For a feature that - * can only be explicitly supported, this is the writer protocol `7` that supports table - * features. For a legacy feature that can be implicitly supported, this is the first protocol - * version which the feature is introduced. - */ - public BaseTableFeatureImpl(String featureName, int minReaderVersion, int minWriterVersion) { - this.featureName = requireNonNull(featureName, "name is null"); - checkArgument(!featureName.isEmpty(), "name is empty"); - checkArgument( - featureName.chars().allMatch(c -> Character.isLetterOrDigit(c) || c == '-' || c == '_'), - "name contains invalid characters: " + featureName); - this.minReaderVersion = minReaderVersion; - this.minWriterVersion = minWriterVersion; - - validate(); - } - - @Override - public String featureName() { - return featureName; - } - - @Override - public boolean isReaderWriterFeature() { - return this instanceof ReaderWriterFeatureType; - } - - @Override - public int minReaderVersion() { - return minReaderVersion; - } - - public int minWriterVersion() { - return minWriterVersion; - } - - public boolean isLegacyFeature() { - return this instanceof LegacyFeatureType; - } - - @Override - public Set requiredFeatures() { - return Collections.emptySet(); - } -} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnabledByMetadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnabledByMetadata.java index 1e1e4605954..e5430336903 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnabledByMetadata.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnabledByMetadata.java @@ -30,7 +30,7 @@ * and/or `writerFeatures`. Otherwise, a proper protocol version bump must be present in the same * transaction. */ -public interface FeatureAutoEnabledByMetadata extends TableFeature { +public interface FeatureAutoEnabledByMetadata { /** * Whether the feature can automatically update the protocol of an existing table when the * metadata requirements are satisfied. As a rule of thumb, a table feature that requires explicit diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java index 04316de087d..a6e08a92fdd 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java @@ -16,7 +16,9 @@ package io.delta.kernel.internal.tablefeatures; import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; +import java.util.Collections; import java.util.Set; /** @@ -34,31 +36,77 @@ * *

      Separately, a feature can be automatically supported by a table's metadata when certain * feature-specific table properties are set. For example, `changeDataFeed` is automatically - * supported when there's a table property `delta.enableChangeDataFeed=true`. This is independent of + * supported when there's a table property `delta.enableChangeDataFeed=true`. See {@link + * FeatureAutoEnabledByMetadata} for details on how to define such features. This is independent of * the table's enabled features. When a feature is supported (explicitly or implicitly) by the table * protocol but its metadata requirements are not satisfied, then clients still have to understand * the feature (at least to the extent that they can read and preserve the existing data in the * table that uses the feature). */ -public interface TableFeature { +public abstract class TableFeature { + private final String featureName; + private final int minReaderVersion; + private final int minWriterVersion; + + /** + * Constructor. Does validations to make sure: + * + *

        + *
      • Feature name is not null or empty and has valid characters + *
      • minReaderVersion is always 0 for writer features + *
      • all legacy features can be auto applied based on the metadata and protocol + * + * @param featureName a globally-unique string indicator to represent the feature. All characters + * must be letters (a-z, A-Z), digits (0-9), '-', or '_'. Words must be in camelCase. + * @param minReaderVersion the minimum reader version this feature requires. For a feature that + * can only be explicitly supported, this is either `0` or `3` (the reader protocol version + * that supports table features), depending on the feature is writer-only or reader-writer. + * For a legacy feature that can be implicitly supported, this is the first protocol version + * which the feature is introduced. + * @param minWriterVersion the minimum writer version this feature requires. For a feature that + * can only be explicitly supported, this is the writer protocol `7` that supports table + * features. For a legacy feature that can be implicitly supported, this is the first protocol + * version which the feature is introduced. + */ + public TableFeature(String featureName, int minReaderVersion, int minWriterVersion) { + this.featureName = requireNonNull(featureName, "name is null"); + checkArgument(!featureName.isEmpty(), "name is empty"); + checkArgument( + featureName.chars().allMatch(c -> Character.isLetterOrDigit(c) || c == '-' || c == '_'), + "name contains invalid characters: " + featureName); + this.minReaderVersion = minReaderVersion; + this.minWriterVersion = minWriterVersion; + + validate(); + } /** @return the name of the table feature. */ - String featureName(); + public String featureName() { + return featureName; + } /** * @return true if this feature is applicable to both reader and writer, false if it is * writer-only. */ - boolean isReaderWriterFeature(); + public boolean isReaderWriterFeature() { + return this instanceof TableFeatures.ReaderWriterFeatureType; + } /** @return the minimum reader version this feature requires */ - int minReaderVersion(); + public int minReaderVersion() { + return minReaderVersion; + } /** @return the minimum writer version that this feature requires. */ - int minWriterVersion(); + public int minWriterVersion() { + return minWriterVersion; + } /** @return if this feature is a legacy feature? */ - boolean isLegacyFeature(); + public boolean isLegacyFeature() { + return this instanceof TableFeatures.LegacyFeatureType; + } /** * Set of table features that this table feature depends on. I.e. the set of features that need to @@ -66,14 +114,16 @@ public interface TableFeature { * * @return the set of table features that this table feature depends on. */ - Set requiredFeatures(); + public Set requiredFeatures() { + return Collections.emptySet(); + } /** * Validate the table feature. This method should throw an exception if the table feature * properties are invalid. Should be called after the object deriving the {@link TableFeature} is * constructed. */ - default void validate() { + private void validate() { if (!isReaderWriterFeature()) { checkArgument(minReaderVersion() == 0, "Writer-only feature must have minReaderVersion=0"); } @@ -86,4 +136,7 @@ default void validate() { "Legacy feature must be auto-enable capable based on metadata"); } } + + // Important note: uses the default implementation of `equals` and `hashCode` methods. + // We expect that the feature instances are singletons, so we don't need to compare the fields. } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java index b3f5bd6692d..c8f306c31d3 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java @@ -27,6 +27,7 @@ import io.delta.kernel.internal.TableConfig; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.internal.util.CaseInsensitiveMap; import io.delta.kernel.internal.util.ColumnMapping; import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.types.StructType; @@ -36,6 +37,12 @@ /** Contains utility methods related to the Delta table feature support in protocol. */ public class TableFeatures { + /** Min reader version that supports reader features. */ + public static final int TABLE_FEATURES_MIN_READER_VERSION = 3; + + /** Min reader version that supports writer features. */ + public static final int TABLE_FEATURES_MIN_WRITER_VERSION = 7; + ////////////////////////////////////////////////////////////////////// /// Define the {@link TableFeature}s that are supported by Kernel /// ////////////////////////////////////////////////////////////////////// @@ -51,7 +58,7 @@ default boolean automaticallyUpdateProtocolOfExistingTables() { public interface ReaderWriterFeatureType {} /** A base class for all table legacy writer-only features. */ - public abstract static class LegacyWriterFeature extends BaseTableFeatureImpl + public abstract static class LegacyWriterFeature extends TableFeature implements LegacyFeatureType { public LegacyWriterFeature(String featureName, int minWriterVersion) { super(featureName, /* minReaderVersion = */ 0, minWriterVersion); @@ -59,7 +66,7 @@ public LegacyWriterFeature(String featureName, int minWriterVersion) { } /** A base class for all table legacy reader-writer features. */ - public abstract static class LegacyReaderWriterFeature extends BaseTableFeatureImpl + public abstract static class LegacyReaderWriterFeature extends TableFeature implements LegacyFeatureType { public LegacyReaderWriterFeature( String featureName, int minReaderVersion, int minWriterVersion) { @@ -68,15 +75,14 @@ public LegacyReaderWriterFeature( } /** A base class for all non-legacy table writer features. */ - public static class WriterFeature extends BaseTableFeatureImpl { + public static class WriterFeature extends TableFeature { public WriterFeature(String featureName, int minWriterVersion) { super(featureName, /* minReaderVersion = */ 0, minWriterVersion); } } /** A base class for all non-legacy table reader-writer features. */ - public static class ReaderWriterFeature extends BaseTableFeatureImpl - implements ReaderWriterFeatureType { + public static class ReaderWriterFeature extends TableFeature implements ReaderWriterFeatureType { public ReaderWriterFeature(String featureName, int minReaderVersion, int minWriterVersion) { super(featureName, minReaderVersion, minWriterVersion); } @@ -245,6 +251,193 @@ public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata me new ReaderWriterFeature( "vacuumProtocolCheck", /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); + public static final List TABLE_FEATURES = + Arrays.asList( + APPEND_ONLY_FEATURE, + INVARIANTS_FEATURE, + COLUMN_MAPPING_FEATURE, + VARIANT_FEATURE, + VARIANT_PREVIEW_FEATURE, + DOMAIN_METADATA_FEATURE, + ROW_TRACKING_FEATURE, + ICEBERG_COMPAT_V2_TABLE_FEATURE, + TYPE_WIDENING_TABLE_FEATURE, + TYPE_WIDENING_PREVIEW_TABLE_FEATURE, + IN_COMMIT_TIMESTAMP_FEATURE, + VACUUM_PROTOCOL_CHECK_TABLE_FEATURE); + + public static final CaseInsensitiveMap TABLE_FEATURE_MAP = + new CaseInsensitiveMap() { + { + for (TableFeature feature : TABLE_FEATURES) { + put(feature.featureName(), feature); + } + } + }; + + /** + * Does this protocol reader version support specifying the explicit reader feature set in + * protocol? + */ + public static boolean supportsReaderFeatures(int minReaderVersion) { + return minReaderVersion >= TABLE_FEATURES_MIN_READER_VERSION; + } + + /** + * Does this protocol writer version support specifying the explicit writer feature set in + * protocol? + */ + public static boolean supportsWriterFeatures(int minWriterVersion) { + return minWriterVersion >= TABLE_FEATURES_MIN_WRITER_VERSION; + } + + /** Returns the minimum reader/writer versions required to support all provided features. */ + public static Tuple2 minimumRequiredVersions(Set features) { + int minReaderVersion = + features.stream().mapToInt(TableFeature::minReaderVersion).max().orElse(0); + + int minWriterVersion = + features.stream().mapToInt(TableFeature::minWriterVersion).max().orElse(0); + + return new Tuple2<>(Math.max(minReaderVersion, 1), Math.max(minWriterVersion, 2)); + } + + /** + * Upgrade the current protocol to satisfy all auto-update capable features required by the table + * metadata. A Delta error will be thrown if a non-auto-update capable feature is required by the + * metadata and not in the resulting protocol, in such a case the user must run `ALTER TABLE` to + * add support for this feature beforehand using the `delta.feature.featureName` table property. + * + *

        Refer to {@link FeatureAutoEnabledByMetadata#automaticallyUpdateProtocolOfExistingTables()} + * to know more about "auto-update capable" features. + * + *

        Note: this method only considers metadata-enabled features. To avoid confusion, the caller + * must apply and remove protocol-related table properties from the metadata before calling this + * method. + */ + public Optional upgradeProtocolFromMetadataForExistingTable( + Metadata metadata, Protocol current) { + + Protocol required = + new Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeatures(extractAutomaticallyEnabledFeatures(metadata, current)) + .normalized(); + + if (!required.canUpgradeTo(current)) { + // When the current protocol does not satisfy metadata requirement, some additional features + // must be supported by the protocol. We assert those features can actually perform the + // auto-update. + assertMetadataTableFeaturesAutomaticallySupported( + current.getImplicitAndExplicitlyEnabledFeatures(), + required.getImplicitAndExplicitlyEnabledFeatures()); + return Optional.of(required.merge(current)); + } else { + return Optional.empty(); + } + } + + /** + * Extracts all table features that are enabled by the given metadata and the optional protocol. + * This includes all already enabled features (if a protocol is provided), the features enabled + * directly by metadata, and all of their (transitive) dependencies. + */ + public Set extractAutomaticallyEnabledFeatures( + Metadata metadata, Protocol protocol) { + Set protocolEnabledFeatures = + protocol.getWriterFeatures().stream() + .map(TABLE_FEATURE_MAP::get) + .map( + f -> { + if (f == null) { + throw new IllegalArgumentException("Unknown feature in protocol: " + f); + } + return f; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + Set metadataEnabledFeatures = + TableFeatures.TABLE_FEATURES.stream() + .filter(f -> f instanceof FeatureAutoEnabledByMetadata) + .filter( + f -> + ((FeatureAutoEnabledByMetadata) f) + .metadataRequiresFeatureToBeEnabled(protocol, metadata)) + .collect(Collectors.toSet()); + + Set combinedFeatures = new HashSet<>(protocolEnabledFeatures); + combinedFeatures.addAll(metadataEnabledFeatures); + + // Qn for Paddy: Why do we need to add the dependencies here and also in `Protocol.withFeature` + return getDependencyClosure(combinedFeatures); + } + + /** + * Returns the smallest set of table features that contains `features` and that also contains all + * dependencies of all features in the returned set. + */ + private Set getDependencyClosure(Set features) { + Set requiredFeatures = new HashSet<>(features); + features.forEach(feature -> requiredFeatures.addAll(feature.requiredFeatures())); + + if (features.equals(requiredFeatures)) { + return features; + } else { + return getDependencyClosure(requiredFeatures); + } + } + + /** + * Ensure all features listed in `currentFeatures` are also listed in `requiredFeatures`, or, if + * one is not listed, it must be capable to auto-update a protocol. + * + *

        Refer to FeatureAutomaticallyEnabledByMetadata.automaticallyUpdateProtocolOfExistingTables + * to know more about "auto-update capable" features. + * + *

        Note: Caller must make sure `requiredFeatures` is obtained from a min protocol that + * satisfies a table metadata. + */ + private void assertMetadataTableFeaturesAutomaticallySupported( + Set currentFeatures, Set requiredFeatures) { + + Set newFeatures = new HashSet<>(requiredFeatures); + newFeatures.removeAll(currentFeatures); + + Set autoUpdateCapableFeatures = new HashSet<>(); + Set nonAutoUpdateCapableFeatures = new HashSet<>(); + + for (TableFeature feature : newFeatures) { + if (feature instanceof FeatureAutoEnabledByMetadata) { + FeatureAutoEnabledByMetadata autoFeature = (FeatureAutoEnabledByMetadata) feature; + if (autoFeature.automaticallyUpdateProtocolOfExistingTables()) { + autoUpdateCapableFeatures.add(autoFeature); + } else { + nonAutoUpdateCapableFeatures.add(autoFeature); + } + } + } + + if (!nonAutoUpdateCapableFeatures.isEmpty()) { + // The "current features" we give to the user are from the original protocol, plus + // features newly supported by table properties in the current transaction, plus + // metadata-enabled features that are auto-update capable. The first two are provided by + // `currentFeatures`. + Set allCurrentFeatures = new HashSet<>(currentFeatures); + allCurrentFeatures.addAll( + autoUpdateCapableFeatures.stream() + .map(f -> (TableFeature) f) + .collect(Collectors.toSet())); + + // TODO: fix this error message to be more informative + throw new UnsupportedOperationException( + "The current protocol does not support the following features: " + + nonAutoUpdateCapableFeatures + + ". The protocol must be updated to support these features. " + + "The current protocol supports the following features: " + + allCurrentFeatures); + } + } + private static final Set SUPPORTED_WRITER_FEATURES = Collections.unmodifiableSet( new HashSet() { @@ -287,7 +480,7 @@ public static void validateReadSupportedTable( metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode); break; case 3: - List readerFeatures = protocol.getReaderFeatures(); + Set readerFeatures = protocol.getReaderFeatures(); if (!SUPPORTED_READER_FEATURES.containsAll(readerFeatures)) { Set unsupportedFeatures = new HashSet<>(readerFeatures); unsupportedFeatures.removeAll(SUPPORTED_READER_FEATURES); @@ -491,7 +684,7 @@ private static boolean hasCheckConstraints(Metadata metadata) { } private static boolean isWriterFeatureSupported(Protocol protocol, String featureName) { - List writerFeatures = protocol.getWriterFeatures(); + Set writerFeatures = protocol.getWriterFeatures(); if (writerFeatures == null) { return false; } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CaseInsensitiveMap.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CaseInsensitiveMap.java new file mode 100644 index 00000000000..77f947e4299 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CaseInsensitiveMap.java @@ -0,0 +1,106 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.util; + +import static io.delta.kernel.internal.util.Preconditions.checkArgument; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +/** + * A map that is case-insensitive in its keys. This map is not thread-safe, and key is + * case-insensitive and of string type. + * + * @param + */ +public class CaseInsensitiveMap implements Map { + private final Map innerMap = new HashMap<>(); + + @Override + public V get(Object key) { + return innerMap.get(toLowerCase(key)); + } + + @Override + public V put(String key, V value) { + return innerMap.put(toLowerCase(key), value); + } + + @Override + public void putAll(Map m) { + // behavior of this method is not defined on how to handle duplicates + // don't support this use case, as it is not needed in Kernel + throw new UnsupportedOperationException("putAll"); + } + + @Override + public V remove(Object key) { + return innerMap.remove(toLowerCase(key)); + } + + @Override + public boolean containsKey(Object key) { + return innerMap.containsKey(toLowerCase(key)); + } + + @Override + public boolean containsValue(Object value) { + return innerMap.containsValue(value); + } + + @Override + public Set keySet() { + // no need to convert to lower case here as the inserted keys are already in lower case + return innerMap.keySet(); + } + + @Override + public Set> entrySet() { + // no need to convert to lower case here as the inserted keys are already in lower case + return innerMap.entrySet(); + } + + @Override + public Collection values() { + return innerMap.values(); + } + + @Override + public int size() { + return innerMap.size(); + } + + @Override + public boolean isEmpty() { + return innerMap.isEmpty(); + } + + @Override + public void clear() { + innerMap.clear(); + } + + private String toLowerCase(Object key) { + if (key == null) { + return null; + } + checkArgument(key instanceof String, "Key must be a string"); + return ((String) key).toLowerCase(Locale.ROOT); + } +} From 6a816fafa6af3ffe2db6e9a3f666731d3ad130d1 Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Tue, 11 Feb 2025 14:24:33 -0800 Subject: [PATCH 4/5] fix --- .../io/delta/kernel/internal/tablefeatures/TableFeatures.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java index c8f306c31d3..5a7a1b589c3 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java @@ -59,7 +59,7 @@ public interface ReaderWriterFeatureType {} /** A base class for all table legacy writer-only features. */ public abstract static class LegacyWriterFeature extends TableFeature - implements LegacyFeatureType { + implements ReaderWriterFeatureType, LegacyFeatureType { public LegacyWriterFeature(String featureName, int minWriterVersion) { super(featureName, /* minReaderVersion = */ 0, minWriterVersion); } From c905fa0fa0ff4012be70f16d170020a2b044549a Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Wed, 12 Feb 2025 00:39:39 -0800 Subject: [PATCH 5/5] updates --- .../io/delta/kernel/internal/TableConfig.java | 17 + .../io/delta/kernel/internal/TableImpl.java | 4 +- .../internal/TransactionBuilderImpl.java | 30 +- .../kernel/internal/actions/Protocol.java | 48 +- .../kernel/internal/replay/LogReplay.java | 3 +- .../internal/snapshot/SnapshotManager.java | 2 +- .../internal/tablefeatures/TableFeature.java | 18 + .../internal/tablefeatures/TableFeatures.java | 516 ++++++++++-------- .../kernel/internal/TableFeaturesSuite.scala | 8 +- 9 files changed, 357 insertions(+), 289 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java index b14bf6a3bbd..eebb635f88f 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java @@ -232,6 +232,15 @@ public class TableConfig { "needs to be a boolean.", true); + public static final TableConfig CHECKPOINT_POLICY = + new TableConfig<>( + "delta.checkpointPolicy", + "classic", + v -> v, + value -> value.equals("classic") || value.equals("v2"), + "needs to be a string and one of 'classic' or 'v2'.", + true); + /** All the valid properties that can be set on the table. */ private static final Map> VALID_PROPERTIES = Collections.unmodifiableMap( @@ -245,6 +254,14 @@ public class TableConfig { addConfig(this, COLUMN_MAPPING_MODE); addConfig(this, ICEBERG_COMPAT_V2_ENABLED); addConfig(this, COLUMN_MAPPING_MAX_COLUMN_ID); + addConfig(this, APPEND_ONLY); + addConfig(this, CHANGE_DATA_FEED); + addConfig(this, ENABLE_DELETION_VECTORS_CREATION); + addConfig(this, ROW_TRACKING_ENABLED); + addConfig(this, LOG_RETENTION); + addConfig(this, EXPIRED_LOG_CLEANUP_ENABLED); + addConfig(this, ENABLE_TYPE_WIDENING); + addConfig(this, CHECKPOINT_POLICY); } }); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java index 0d9844d9841..2b2a3d2cadf 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java @@ -40,7 +40,6 @@ import java.io.UncheckedIOException; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -193,8 +192,7 @@ public CloseableIterator getChanges( for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) { if (!protocolVector.isNullAt(rowId)) { Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId); - TableFeatures.validateReadSupportedTable( - protocol, getDataPath().toString(), Optional.empty()); + TableFeatures.validateReadSupportedTable(protocol, getDataPath().toString()); } } if (shouldDropProtocolColumn) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java index 1d8206eca0f..a483e8dc583 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java @@ -19,6 +19,7 @@ import static io.delta.kernel.internal.DeltaErrors.tableAlreadyExists; import static io.delta.kernel.internal.TransactionImpl.DEFAULT_READ_VERSION; import static io.delta.kernel.internal.TransactionImpl.DEFAULT_WRITE_VERSION; +import static io.delta.kernel.internal.tablefeatures.TableFeatures.extractAutomaticallyEnabledFeatures; import static io.delta.kernel.internal.util.ColumnMapping.isColumnMappingModeEnabled; import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames; @@ -36,6 +37,7 @@ import io.delta.kernel.internal.replay.LogReplay; import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.snapshot.SnapshotHint; +import io.delta.kernel.internal.tablefeatures.TableFeature; import io.delta.kernel.internal.tablefeatures.TableFeatures; import io.delta.kernel.internal.util.ColumnMapping; import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode; @@ -136,6 +138,7 @@ public Transaction build(Engine engine) { boolean shouldUpdateProtocol = false; Metadata metadata = snapshot.getMetadata(); Protocol protocol = snapshot.getProtocol(); + if (tableProperties.isPresent()) { Map validatedProperties = TableConfig.validateDeltaProperties(tableProperties.get()); @@ -150,17 +153,17 @@ public Transaction build(Engine engine) { metadata = metadata.withNewConfiguration(newProperties); } - Set newWriterFeatures = - TableFeatures.extractAutomaticallyEnabledWriterFeatures(metadata, protocol); - if (!newWriterFeatures.isEmpty()) { - logger.info("Automatically enabling writer features: {}", newWriterFeatures); - shouldUpdateProtocol = true; - Set oldWriterFeatures = protocol.getWriterFeatures(); - protocol = protocol.withNewWriterFeatures(newWriterFeatures); - Set curWriterFeatures = protocol.getWriterFeatures(); - checkArgument(!Objects.equals(oldWriterFeatures, curWriterFeatures)); - TableFeatures.validateWriteSupportedTable( - protocol, metadata, metadata.getSchema(), table.getPath(engine)); + Set enabledFeatures = extractAutomaticallyEnabledFeatures(metadata, protocol); + if (!enabledFeatures.isEmpty()) { + logger.info("Automatically enabling features: {}", enabledFeatures); + Protocol oldProtocol = protocol; + protocol = protocol.withFeatures(enabledFeatures); + + if (protocol != oldProtocol) { + logger.info("Updated protocol: {}", protocol); + shouldUpdateProtocol = true; + } + TableFeatures.validateWriteSupportedTable(protocol, metadata, table.getPath(engine)); } } @@ -185,10 +188,7 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable) String tablePath = table.getPath(engine); // Validate the table has no features that Kernel doesn't yet support writing into it. TableFeatures.validateWriteSupportedTable( - snapshot.getProtocol(), - snapshot.getMetadata(), - snapshot.getMetadata().getSchema(), - tablePath); + snapshot.getProtocol(), snapshot.getMetadata(), tablePath); if (!isNewTable) { if (schema.isPresent()) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java index feb002f25b9..56a29a348d1 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java @@ -43,10 +43,10 @@ public static Protocol fromColumnVector(ColumnVector vector, int rowId) { vector.getChild(0).getInt(rowId), vector.getChild(1).getInt(rowId), vector.getChild(2).isNullAt(rowId) - ? emptySet() + ? null : new HashSet<>(VectorUtils.toJavaList(vector.getChild(2).getArray(rowId))), vector.getChild(3).isNullAt(rowId) - ? emptySet() + ? null : new HashSet<>(VectorUtils.toJavaList(vector.getChild(3).getArray(rowId)))); } @@ -63,7 +63,7 @@ public static Protocol fromColumnVector(ColumnVector vector, int rowId) { private final Set writerFeatures; public Protocol(int minReaderVersion, int minWriterVersion) { - this(minReaderVersion, minWriterVersion, emptySet(), emptySet()); + this(minReaderVersion, minWriterVersion, null, null); } public Protocol( @@ -105,6 +105,7 @@ public Set getImplicitlyEnabledFeatures() { } public Set getExplicitlyEnabledFeatures() { + // TODO: Should we throw an exception if we encouter a feature that is not known to Kernel yet? return TableFeatures.TABLE_FEATURES.stream() .filter( f -> @@ -113,7 +114,7 @@ public Set getExplicitlyEnabledFeatures() { .collect(Collectors.toSet()); } - public Set getImplicitAndExplicitlyEnabledFeatures() { + public Set getImplicitlyAndExplicitlyEnabledFeatures() { Set enabledFeatures = new HashSet<>(); enabledFeatures.addAll(getImplicitlyEnabledFeatures()); enabledFeatures.addAll(getExplicitlyEnabledFeatures()); @@ -146,20 +147,6 @@ public Row toRow() { return new GenericRow(Protocol.FULL_SCHEMA, protocolMap); } - public Protocol withNewWriterFeatures(Set writerFeatures) { - Tuple2 newProtocolVersions = - TableFeatures.minProtocolVersionFromAutomaticallyEnabledFeatures(writerFeatures); - Set newWriterFeatures = new HashSet<>(writerFeatures); - if (this.writerFeatures != null) { - newWriterFeatures.addAll(this.writerFeatures); - } - return new Protocol( - newProtocolVersions._1, - newProtocolVersions._2, - this.readerFeatures == null ? null : new HashSet<>(this.readerFeatures), - newWriterFeatures); - } - /** Create a new {@link Protocol} object with the given {@link TableFeature} enabled. */ public Protocol withFeatures(Iterable newFeatures) { Protocol result = this; @@ -233,8 +220,8 @@ public Protocol withFeature(TableFeature feature) { */ public boolean canUpgradeTo(Protocol to) { // All features supported by `this` are supported by `to`. - return to.getImplicitAndExplicitlyEnabledFeatures() - .containsAll(this.getImplicitAndExplicitlyEnabledFeatures()); + return to.getImplicitlyAndExplicitlyEnabledFeatures() + .containsAll(this.getImplicitlyAndExplicitlyEnabledFeatures()); } /** @@ -261,8 +248,8 @@ public Protocol normalized() { int minWriterVersion = versions._2; Protocol newProtocol = new Protocol(minReaderVersion, minWriterVersion); - if (this.getImplicitAndExplicitlyEnabledFeatures() - .equals(newProtocol.getImplicitAndExplicitlyEnabledFeatures())) { + if (this.getImplicitlyAndExplicitlyEnabledFeatures() + .equals(newProtocol.getImplicitlyAndExplicitlyEnabledFeatures())) { return newProtocol; } else { // means we have some that is added after table feature support. @@ -352,4 +339,21 @@ private boolean supportsReaderFeatures() { private boolean supportsWriterFeatures() { return TableFeatures.supportsWriterFeatures(minWriterVersion); } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + Protocol protocol = (Protocol) o; + return minReaderVersion == protocol.minReaderVersion + && minWriterVersion == protocol.minWriterVersion + && Objects.equals(readerFeatures, protocol.readerFeatures) + && Objects.equals(writerFeatures, protocol.writerFeatures); + } + + @Override + public int hashCode() { + return Objects.hash(minReaderVersion, minWriterVersion, readerFeatures, writerFeatures); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index 4c681aa0785..3c0a79f3bc0 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -297,8 +297,7 @@ protected Tuple2 loadTableProtocolAndMetadata( if (protocol != null) { // Stop since we have found the latest Protocol and Metadata. - TableFeatures.validateReadSupportedTable( - protocol, dataPath.toString(), Optional.of(metadata)); + TableFeatures.validateReadSupportedTable(protocol, dataPath.toString()); return new Tuple2<>(protocol, metadata); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 11774741e8a..6fcf3aa016f 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -161,7 +161,7 @@ public void checkpoint(Engine engine, Clock clock, long version) // Check if writing to the given table protocol version/features is supported in Kernel validateWriteSupportedTable( - snapshot.getProtocol(), snapshot.getMetadata(), snapshot.getSchema(), tablePath.toString()); + snapshot.getProtocol(), snapshot.getMetadata(), tablePath.toString()); Path checkpointPath = FileNames.checkpointFileSingular(logPath, version); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java index a6e08a92fdd..ba4580e5e7d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java @@ -18,6 +18,7 @@ import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import io.delta.kernel.internal.actions.Metadata; import java.util.Collections; import java.util.Set; @@ -118,6 +119,23 @@ public Set requiredFeatures() { return Collections.emptySet(); } + /** + * Does Kernel has support to read a table containing this feature? If the feature is a + * writer-only feature, this method should always return true. + * + * @return true if Kernel has support to read a table containing this feature. + */ + public abstract boolean hasKernelReadSupport(); + + /** + * Does Kernel has support to write a table containing this feature? + * + * @param metadata the metadata of the table. Sometimes checking the metadata is necessary to know + * the Kernel can write the table or not. + * @return true if Kernel has support to write a table containing this feature. + */ + public abstract boolean hasKernelWriteSupport(Metadata metadata); + /** * Validate the table feature. This method should throw an exception if the table feature * properties are invalid. Should be called after the object deriving the {@link TableFeature} is diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java index 5a7a1b589c3..8663f4d09f4 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java @@ -22,17 +22,15 @@ import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED; import static io.delta.kernel.internal.TableConfig.ROW_TRACKING_ENABLED; import static io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode.NONE; +import static java.util.stream.Collectors.toSet; -import io.delta.kernel.internal.DeltaErrors; import io.delta.kernel.internal.TableConfig; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.util.CaseInsensitiveMap; -import io.delta.kernel.internal.util.ColumnMapping; import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.types.StructType; import java.util.*; -import java.util.stream.Collectors; /** Contains utility methods related to the Delta table feature support in protocol. */ public class TableFeatures { @@ -63,6 +61,11 @@ public abstract static class LegacyWriterFeature extends TableFeature public LegacyWriterFeature(String featureName, int minWriterVersion) { super(featureName, /* minReaderVersion = */ 0, minWriterVersion); } + + @Override + public boolean hasKernelReadSupport() { + return true; + } } /** A base class for all table legacy reader-writer features. */ @@ -75,14 +78,20 @@ public LegacyReaderWriterFeature( } /** A base class for all non-legacy table writer features. */ - public static class WriterFeature extends TableFeature { + public abstract static class WriterFeature extends TableFeature { public WriterFeature(String featureName, int minWriterVersion) { super(featureName, /* minReaderVersion = */ 0, minWriterVersion); } + + @Override + public boolean hasKernelReadSupport() { + return true; + } } /** A base class for all non-legacy table reader-writer features. */ - public static class ReaderWriterFeature extends TableFeature implements ReaderWriterFeatureType { + public abstract static class ReaderWriterFeature extends TableFeature + implements ReaderWriterFeatureType { public ReaderWriterFeature(String featureName, int minReaderVersion, int minWriterVersion) { super(featureName, minReaderVersion, minWriterVersion); } @@ -99,6 +108,11 @@ private static class AppendOnlyFeature extends LegacyWriterFeature { public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { return APPEND_ONLY.fromMetadata(metadata); } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return true; + } } public static final TableFeature INVARIANTS_FEATURE = new InvariantsFeature(); @@ -112,6 +126,31 @@ private static class InvariantsFeature extends LegacyWriterFeature { public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { return hasInvariants(metadata.getSchema()); } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + // If there is no invariant, then the table is supported + return !hasInvariants(metadata.getSchema()); + } + } + + public static final TableFeature CONSTRAINTS_FEATURE = new ConstraintsFeature(); + + private static class ConstraintsFeature extends LegacyWriterFeature { + ConstraintsFeature() { + super("constraints", /* minWriterVersion = */ 3); + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + // If there is no check constraint, then the table is supported + return hasCheckConstraints(metadata); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return hasCheckConstraints(metadata); + } } public static final TableFeature COLUMN_MAPPING_FEATURE = new ColumnMappingFeature(); @@ -128,6 +167,34 @@ private static class ColumnMappingFeature extends LegacyReaderWriterFeature { public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { return COLUMN_MAPPING_MODE.fromMetadata(metadata) != NONE; } + + @Override + public boolean hasKernelReadSupport() { + return true; + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return true; + } + } + + public static final TableFeature IDENTITY_COLUMNS_FEATURE = new IdentityColumnsFeature(); + + private static class IdentityColumnsFeature extends LegacyWriterFeature { + IdentityColumnsFeature() { + super("identity", /* minWriterVersion = */ 6); + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return !hasIdentityColumns(metadata); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return hasIdentityColumns(metadata); + } } public static final TableFeature VARIANT_FEATURE = new VariantTypeTableFeature("variantType"); @@ -151,10 +218,30 @@ public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata me // TODO: check schema recursively for variant types return false; } + + @Override + public boolean hasKernelReadSupport() { + return true; + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return false; // TODO: yet to be implemented in Kernel + } } - public static final TableFeature DOMAIN_METADATA_FEATURE = - new WriterFeature("domainMetadata", /* minWriterVersion = */ 7); + public static final TableFeature DOMAIN_METADATA_FEATURE = new DomainMetadataFeature(); + + private static class DomainMetadataFeature extends WriterFeature { + DomainMetadataFeature() { + super("domainMetadata", /* minWriterVersion = */ 7); + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return true; + } + } public static final TableFeature ROW_TRACKING_FEATURE = new RowTrackingFeature(); @@ -178,6 +265,43 @@ public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata me public Set requiredFeatures() { return Collections.singleton(DOMAIN_METADATA_FEATURE); } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return true; + } + } + + public static final TableFeature DELETION_VECTORS_FEATURE = new DeletionVectorsTableFeature(); + + private static class DeletionVectorsTableFeature extends ReaderWriterFeature + implements FeatureAutoEnabledByMetadata { + DeletionVectorsTableFeature() { + super("deletionVectors", /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); + } + + @Override + public boolean hasKernelReadSupport() { + return true; + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + // We currently only support blind appends. So we don't need to do anything special for + // writing + // into a table with deletion vectors enabled. + return true; + } + + @Override + public boolean automaticallyUpdateProtocolOfExistingTables() { + return true; + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return TableConfig.ENABLE_DELETION_VECTORS_CREATION.fromMetadata(metadata); + } } public static final TableFeature ICEBERG_COMPAT_V2_TABLE_FEATURE = @@ -202,6 +326,11 @@ public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata me public @Override Set requiredFeatures() { return Collections.singleton(COLUMN_MAPPING_FEATURE); } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return true; + } } public static final TableFeature TYPE_WIDENING_TABLE_FEATURE = @@ -225,6 +354,16 @@ public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata me return TableConfig.ENABLE_TYPE_WIDENING.fromMetadata(metadata); // TODO: there is more. Need to check if any of the fields } + + @Override + public boolean hasKernelReadSupport() { + return true; + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return false; // TODO: yet to support it. + } } public static final TableFeature IN_COMMIT_TIMESTAMP_FEATURE = @@ -245,11 +384,94 @@ public boolean automaticallyUpdateProtocolOfExistingTables() { public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { return IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata); } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return true; + } + } + + public static final TableFeature TIMESTAMP_NTZ_FEATURE = new TimestampNtzTableFeature(); + + private static class TimestampNtzTableFeature extends ReaderWriterFeature + implements FeatureAutoEnabledByMetadata { + TimestampNtzTableFeature() { + super("timestampNtz", /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); + } + + @Override + public boolean hasKernelReadSupport() { + return true; + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return false; // TODO: yet to be implemented in Kernel + } + + @Override + public boolean automaticallyUpdateProtocolOfExistingTables() { + return false; // we can't allow feature upgrade without manually adding the feature + // through `delta.feature.timestampNtz = supported` table property. + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + // TODO: check schema recursively for timestamp_ntz types + return false; + } } - public static final TableFeature VACUUM_PROTOCOL_CHECK_TABLE_FEATURE = - new ReaderWriterFeature( - "vacuumProtocolCheck", /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); + public static final TableFeature CHECKPOINT_V2_FEATURE = new CheckpointV2TableFeature(); + + private static class CheckpointV2TableFeature extends ReaderWriterFeature + implements FeatureAutoEnabledByMetadata { + CheckpointV2TableFeature() { + super("v2Checkpoint", /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); + } + + @Override + public boolean hasKernelReadSupport() { + return true; + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return true; // In order to commit, there is no extra work required. This affects + // the checkpoint format only. When v2 is enabled, writing classic checkpoints is + // still allowed. + } + + @Override + public boolean automaticallyUpdateProtocolOfExistingTables() { + return true; + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + // TODO: define an enum for checkpoint policy when we start supporting writing v2 checkpoints + return "v2".equals(TableConfig.CHECKPOINT_POLICY.fromMetadata(metadata)); + } + } + + public static final TableFeature VACUUM_PROTOCOL_CHECK_FEATURE = + new VacuumProtocolCheckTableFeature(); + + private static class VacuumProtocolCheckTableFeature extends ReaderWriterFeature { + VacuumProtocolCheckTableFeature() { + super("vacuumProtocolCheck", /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); + } + + @Override + public boolean hasKernelReadSupport() { + return true; + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return true; + } + } public static final List TABLE_FEATURES = Arrays.asList( @@ -260,11 +482,16 @@ public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata me VARIANT_PREVIEW_FEATURE, DOMAIN_METADATA_FEATURE, ROW_TRACKING_FEATURE, + IDENTITY_COLUMNS_FEATURE, + CONSTRAINTS_FEATURE, ICEBERG_COMPAT_V2_TABLE_FEATURE, TYPE_WIDENING_TABLE_FEATURE, TYPE_WIDENING_PREVIEW_TABLE_FEATURE, IN_COMMIT_TIMESTAMP_FEATURE, - VACUUM_PROTOCOL_CHECK_TABLE_FEATURE); + TIMESTAMP_NTZ_FEATURE, + VACUUM_PROTOCOL_CHECK_FEATURE, + DELETION_VECTORS_FEATURE, + CHECKPOINT_V2_FEATURE); public static final CaseInsensitiveMap TABLE_FEATURE_MAP = new CaseInsensitiveMap() { @@ -328,8 +555,8 @@ public Optional upgradeProtocolFromMetadataForExistingTable( // must be supported by the protocol. We assert those features can actually perform the // auto-update. assertMetadataTableFeaturesAutomaticallySupported( - current.getImplicitAndExplicitlyEnabledFeatures(), - required.getImplicitAndExplicitlyEnabledFeatures()); + current.getImplicitlyAndExplicitlyEnabledFeatures(), + required.getImplicitlyAndExplicitlyEnabledFeatures()); return Optional.of(required.merge(current)); } else { return Optional.empty(); @@ -341,7 +568,7 @@ public Optional upgradeProtocolFromMetadataForExistingTable( * This includes all already enabled features (if a protocol is provided), the features enabled * directly by metadata, and all of their (transitive) dependencies. */ - public Set extractAutomaticallyEnabledFeatures( + public static Set extractAutomaticallyEnabledFeatures( Metadata metadata, Protocol protocol) { Set protocolEnabledFeatures = protocol.getWriterFeatures().stream() @@ -353,8 +580,8 @@ public Set extractAutomaticallyEnabledFeatures( } return f; }) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); + .filter(Objects::nonNull) // TODO: throw error if we can't find them + .collect(toSet()); Set metadataEnabledFeatures = TableFeatures.TABLE_FEATURES.stream() @@ -363,7 +590,7 @@ public Set extractAutomaticallyEnabledFeatures( f -> ((FeatureAutoEnabledByMetadata) f) .metadataRequiresFeatureToBeEnabled(protocol, metadata)) - .collect(Collectors.toSet()); + .collect(toSet()); Set combinedFeatures = new HashSet<>(protocolEnabledFeatures); combinedFeatures.addAll(metadataEnabledFeatures); @@ -376,7 +603,7 @@ public Set extractAutomaticallyEnabledFeatures( * Returns the smallest set of table features that contains `features` and that also contains all * dependencies of all features in the returned set. */ - private Set getDependencyClosure(Set features) { + private static Set getDependencyClosure(Set features) { Set requiredFeatures = new HashSet<>(features); features.forEach(feature -> requiredFeatures.addAll(feature.requiredFeatures())); @@ -424,9 +651,7 @@ private void assertMetadataTableFeaturesAutomaticallySupported( // `currentFeatures`. Set allCurrentFeatures = new HashSet<>(currentFeatures); allCurrentFeatures.addAll( - autoUpdateCapableFeatures.stream() - .map(f -> (TableFeature) f) - .collect(Collectors.toSet())); + autoUpdateCapableFeatures.stream().map(f -> (TableFeature) f).collect(toSet())); // TODO: fix this error message to be more informative throw new UnsupportedOperationException( @@ -438,237 +663,48 @@ private void assertMetadataTableFeaturesAutomaticallySupported( } } - private static final Set SUPPORTED_WRITER_FEATURES = - Collections.unmodifiableSet( - new HashSet() { - { - add("appendOnly"); - add("inCommitTimestamp"); - add("columnMapping"); - add("typeWidening-preview"); - add("typeWidening"); - add("domainMetadata"); - add("rowTracking"); - } - }); - - private static final Set SUPPORTED_READER_FEATURES = - Collections.unmodifiableSet( - new HashSet() { - { - add("columnMapping"); - add("deletionVectors"); - add("timestampNtz"); - add("typeWidening-preview"); - add("typeWidening"); - add("vacuumProtocolCheck"); - add("variantType"); - add("variantType-preview"); - add("v2Checkpoint"); - } - }); - //////////////////// // Helper Methods // - //////////////////// - public static void validateReadSupportedTable( - Protocol protocol, String tablePath, Optional metadata) { - switch (protocol.getMinReaderVersion()) { - case 1: - break; - case 2: - metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode); - break; - case 3: - Set readerFeatures = protocol.getReaderFeatures(); - if (!SUPPORTED_READER_FEATURES.containsAll(readerFeatures)) { - Set unsupportedFeatures = new HashSet<>(readerFeatures); - unsupportedFeatures.removeAll(SUPPORTED_READER_FEATURES); - throw DeltaErrors.unsupportedReaderFeature(tablePath, unsupportedFeatures); - } - if (readerFeatures.contains("columnMapping")) { - metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode); - } - break; - default: - throw DeltaErrors.unsupportedReaderProtocol(tablePath, protocol.getMinReaderVersion()); + /// ///////////////// + public static void validateReadSupportedTable(Protocol protocol, String tablePath) { + Set unsupportedFeatures = + protocol.getImplicitlyAndExplicitlyEnabledFeatures().stream() + .filter(f -> !f.hasKernelReadSupport()) + .collect(toSet()); + + if (!unsupportedFeatures.isEmpty()) { + throw unsupportedReaderFeature( + tablePath, unsupportedFeatures.stream().map(TableFeature::featureName).collect(toSet())); } } /** * Utility method to validate whether the given table is supported for writing from Kernel. - * Currently, the support is as follows: - * - *

          - *
        • protocol writer version 1. - *
        • protocol writer version 2 only with appendOnly feature enabled. - *
        • protocol writer version 7 with {@code appendOnly}, {@code inCommitTimestamp}, {@code - * columnMapping}, {@code typeWidening}, {@code domainMetadata}, {@code rowTracking} feature - * enabled. - *
        * * @param protocol Table protocol * @param metadata Table metadata - * @param tableSchema Table schema */ public static void validateWriteSupportedTable( - Protocol protocol, Metadata metadata, StructType tableSchema, String tablePath) { - int minWriterVersion = protocol.getMinWriterVersion(); - switch (minWriterVersion) { - case 1: - break; - case 2: - // Append-only and column invariants are the writer features added in version 2 - // Append-only is supported, but not the invariants - validateNoInvariants(tableSchema); - break; - case 3: - // Check constraints are added in version 3 - throw unsupportedWriterProtocol(tablePath, minWriterVersion); - case 4: - // CDF and generated columns are writer features added in version 4 - throw unsupportedWriterProtocol(tablePath, minWriterVersion); - case 5: - // Column mapping is the only one writer feature added in version 5 - throw unsupportedWriterProtocol(tablePath, minWriterVersion); - case 6: - // Identity is the only one writer feature added in version 6 - throw unsupportedWriterProtocol(tablePath, minWriterVersion); - case 7: - for (String writerFeature : protocol.getWriterFeatures()) { - if (writerFeature.equals("invariants")) { - // For version 7, we allow 'invariants' to be present in the protocol's writerFeatures - // to unblock certain use cases, provided that no invariants are defined in the schema. - validateNoInvariants(tableSchema); - } else if (!SUPPORTED_WRITER_FEATURES.contains(writerFeature)) { - throw unsupportedWriterFeature(tablePath, writerFeature); - } - } - - // Eventually we may have a way to declare and enforce dependencies between features. - // By putting this check for row tracking here, it makes it easier to spot that row - // tracking defines such a dependency that can be implicitly checked. - if (isRowTrackingSupported(protocol) && !isDomainMetadataSupported(protocol)) { - throw DeltaErrors.rowTrackingSupportedWithDomainMetadataUnsupported(); - } - break; - default: - throw unsupportedWriterProtocol(tablePath, minWriterVersion); - } - } - - /** - * Given the automatically enabled features from Delta table metadata, returns the minimum - * required reader and writer version that satisfies all enabled table features in the metadata. - * - * @param enabledFeatures the automatically enabled features from the Delta table metadata - * @return the minimum required reader and writer version that satisfies all enabled table - */ - public static Tuple2 minProtocolVersionFromAutomaticallyEnabledFeatures( - Set enabledFeatures) { - - int readerVersion = 0; - int writerVersion = 0; - - for (String feature : enabledFeatures) { - readerVersion = Math.max(readerVersion, getMinReaderVersion(feature)); - writerVersion = Math.max(writerVersion, getMinWriterVersion(feature)); - } - - return new Tuple2<>(readerVersion, writerVersion); - } - - /** - * Extract the writer features that should be enabled automatically based on the metadata which - * are not already enabled. For example, the {@code inCommitTimestamp} feature should be enabled - * when the delta property name (delta.enableInCommitTimestamps) is set to true in the metadata if - * it is not already enabled. - * - * @param metadata the metadata of the table - * @param protocol the protocol of the table - * @return the writer features that should be enabled automatically - */ - public static Set extractAutomaticallyEnabledWriterFeatures( - Metadata metadata, Protocol protocol) { - return TableFeatures.SUPPORTED_WRITER_FEATURES.stream() - .filter(f -> metadataRequiresWriterFeatureToBeEnabled(metadata, f)) - .filter( - f -> protocol.getWriterFeatures() == null || !protocol.getWriterFeatures().contains(f)) - .collect(Collectors.toSet()); - } - - /** - * Checks if the table protocol supports the "domainMetadata" writer feature. - * - * @param protocol the protocol to check - * @return true if the "domainMetadata" feature is supported, false otherwise - */ + Protocol protocol, Metadata metadata, String tablePath) { + // TODO: should we check the table is writer and reader support? + protocol + .getImplicitlyAndExplicitlyEnabledFeatures() + .forEach( + f -> { + if (!f.hasKernelWriteSupport(metadata)) { + throw unsupportedWriterFeature(tablePath, f.featureName()); + } + }); + } + + /** Checks if the table protocol supports the "domainMetadata" writer feature. */ public static boolean isDomainMetadataSupported(Protocol protocol) { - return isWriterFeatureSupported(protocol, "domainMetadata"); + return protocol.getImplicitlyAndExplicitlyEnabledFeatures().contains(DOMAIN_METADATA_FEATURE); } - /** - * Check if the table protocol supports the "rowTracking" writer feature. - * - * @param protocol the protocol to check - * @return true if the protocol supports row tracking, false otherwise - */ + /** Check if the table protocol supports the "rowTracking" writer feature. */ public static boolean isRowTrackingSupported(Protocol protocol) { - return isWriterFeatureSupported(protocol, "rowTracking"); - } - - /** - * Get the minimum reader version required for a feature. - * - * @param feature the feature - * @return the minimum reader version required for the feature - */ - private static int getMinReaderVersion(String feature) { - switch (feature) { - case "inCommitTimestamp": - return 3; - default: - return 1; - } - } - - /** - * Get the minimum writer version required for a feature. - * - * @param feature the feature - * @return the minimum writer version required for the feature - */ - private static int getMinWriterVersion(String feature) { - switch (feature) { - case "inCommitTimestamp": - return 7; - default: - return 2; - } - } - - /** - * Determine whether a writer feature must be supported and enabled to satisfy the metadata - * requirements. - * - * @param metadata the table metadata - * @param feature the writer feature to check - * @return whether the writer feature must be enabled - */ - private static boolean metadataRequiresWriterFeatureToBeEnabled( - Metadata metadata, String feature) { - switch (feature) { - case "inCommitTimestamp": - return IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata); - default: - return false; - } - } - - private static void validateNoInvariants(StructType tableSchema) { - if (hasInvariants(tableSchema)) { - throw columnInvariantsNotSupported(); - } + return protocol.getImplicitlyAndExplicitlyEnabledFeatures().contains(ROW_TRACKING_FEATURE); } private static boolean hasInvariants(StructType tableSchema) { @@ -683,11 +719,7 @@ private static boolean hasCheckConstraints(Metadata metadata) { .orElse(false); } - private static boolean isWriterFeatureSupported(Protocol protocol, String featureName) { - Set writerFeatures = protocol.getWriterFeatures(); - if (writerFeatures == null) { - return false; - } - return writerFeatures.contains(featureName) && protocol.getMinWriterVersion() >= 7; + private static boolean hasIdentityColumns(Metadata metadata) { + return false; // TODO: implement. } } diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala index 967b4b4d606..ecf79ab91bd 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala @@ -108,7 +108,7 @@ class TableFeaturesSuite extends AnyFunSuite { protocol: Protocol, metadata: Metadata = null, schema: StructType = createTestSchema()): Unit = { - validateWriteSupportedTable(protocol, metadata, schema, "/test/table") + validateWriteSupportedTable(protocol, metadata, "/test/table") } def checkUnsupported( @@ -116,7 +116,7 @@ class TableFeaturesSuite extends AnyFunSuite { metadata: Metadata = null, schema: StructType = createTestSchema()): Unit = { intercept[KernelException] { - validateWriteSupportedTable(protocol, metadata, schema, "/test/table") + validateWriteSupportedTable(protocol, metadata, "/test/table") } } @@ -126,8 +126,8 @@ class TableFeaturesSuite extends AnyFunSuite { 0, minWriterVersion, // reader features - it doesn't matter as the read fails anyway before the writer check - Collections.emptyList(), - writerFeatures.toSeq.asJava + null, + writerFeatures.toSet.asJava ) }