Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Feb 12, 2025
1 parent 6a816fa commit c905fa0
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 289 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,15 @@ public class TableConfig<T> {
"needs to be a boolean.",
true);

public static final TableConfig<String> CHECKPOINT_POLICY =
new TableConfig<>(
"delta.checkpointPolicy",
"classic",
v -> v,
value -> value.equals("classic") || value.equals("v2"),
"needs to be a string and one of 'classic' or 'v2'.",
true);

/** All the valid properties that can be set on the table. */
private static final Map<String, TableConfig<?>> VALID_PROPERTIES =
Collections.unmodifiableMap(
Expand All @@ -245,6 +254,14 @@ public class TableConfig<T> {
addConfig(this, COLUMN_MAPPING_MODE);
addConfig(this, ICEBERG_COMPAT_V2_ENABLED);
addConfig(this, COLUMN_MAPPING_MAX_COLUMN_ID);
addConfig(this, APPEND_ONLY);
addConfig(this, CHANGE_DATA_FEED);
addConfig(this, ENABLE_DELETION_VECTORS_CREATION);
addConfig(this, ROW_TRACKING_ENABLED);
addConfig(this, LOG_RETENTION);
addConfig(this, EXPIRED_LOG_CLEANUP_ENABLED);
addConfig(this, ENABLE_TYPE_WIDENING);
addConfig(this, CHECKPOINT_POLICY);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand Down Expand Up @@ -193,8 +192,7 @@ public CloseableIterator<ColumnarBatch> getChanges(
for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) {
if (!protocolVector.isNullAt(rowId)) {
Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId);
TableFeatures.validateReadSupportedTable(
protocol, getDataPath().toString(), Optional.empty());
TableFeatures.validateReadSupportedTable(protocol, getDataPath().toString());
}
}
if (shouldDropProtocolColumn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static io.delta.kernel.internal.DeltaErrors.tableAlreadyExists;
import static io.delta.kernel.internal.TransactionImpl.DEFAULT_READ_VERSION;
import static io.delta.kernel.internal.TransactionImpl.DEFAULT_WRITE_VERSION;
import static io.delta.kernel.internal.tablefeatures.TableFeatures.extractAutomaticallyEnabledFeatures;
import static io.delta.kernel.internal.util.ColumnMapping.isColumnMappingModeEnabled;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames;
Expand All @@ -36,6 +37,7 @@
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.snapshot.SnapshotHint;
import io.delta.kernel.internal.tablefeatures.TableFeature;
import io.delta.kernel.internal.tablefeatures.TableFeatures;
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode;
Expand Down Expand Up @@ -136,6 +138,7 @@ public Transaction build(Engine engine) {
boolean shouldUpdateProtocol = false;
Metadata metadata = snapshot.getMetadata();
Protocol protocol = snapshot.getProtocol();

if (tableProperties.isPresent()) {
Map<String, String> validatedProperties =
TableConfig.validateDeltaProperties(tableProperties.get());
Expand All @@ -150,17 +153,17 @@ public Transaction build(Engine engine) {
metadata = metadata.withNewConfiguration(newProperties);
}

Set<String> newWriterFeatures =
TableFeatures.extractAutomaticallyEnabledWriterFeatures(metadata, protocol);
if (!newWriterFeatures.isEmpty()) {
logger.info("Automatically enabling writer features: {}", newWriterFeatures);
shouldUpdateProtocol = true;
Set<String> oldWriterFeatures = protocol.getWriterFeatures();
protocol = protocol.withNewWriterFeatures(newWriterFeatures);
Set<String> curWriterFeatures = protocol.getWriterFeatures();
checkArgument(!Objects.equals(oldWriterFeatures, curWriterFeatures));
TableFeatures.validateWriteSupportedTable(
protocol, metadata, metadata.getSchema(), table.getPath(engine));
Set<TableFeature> enabledFeatures = extractAutomaticallyEnabledFeatures(metadata, protocol);
if (!enabledFeatures.isEmpty()) {
logger.info("Automatically enabling features: {}", enabledFeatures);
Protocol oldProtocol = protocol;
protocol = protocol.withFeatures(enabledFeatures);

if (protocol != oldProtocol) {
logger.info("Updated protocol: {}", protocol);
shouldUpdateProtocol = true;
}
TableFeatures.validateWriteSupportedTable(protocol, metadata, table.getPath(engine));
}
}

Expand All @@ -185,10 +188,7 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
String tablePath = table.getPath(engine);
// Validate the table has no features that Kernel doesn't yet support writing into it.
TableFeatures.validateWriteSupportedTable(
snapshot.getProtocol(),
snapshot.getMetadata(),
snapshot.getMetadata().getSchema(),
tablePath);
snapshot.getProtocol(), snapshot.getMetadata(), tablePath);

if (!isNewTable) {
if (schema.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ public static Protocol fromColumnVector(ColumnVector vector, int rowId) {
vector.getChild(0).getInt(rowId),
vector.getChild(1).getInt(rowId),
vector.getChild(2).isNullAt(rowId)
? emptySet()
? null
: new HashSet<>(VectorUtils.toJavaList(vector.getChild(2).getArray(rowId))),
vector.getChild(3).isNullAt(rowId)
? emptySet()
? null
: new HashSet<>(VectorUtils.toJavaList(vector.getChild(3).getArray(rowId))));
}

Expand All @@ -63,7 +63,7 @@ public static Protocol fromColumnVector(ColumnVector vector, int rowId) {
private final Set<String> writerFeatures;

public Protocol(int minReaderVersion, int minWriterVersion) {
this(minReaderVersion, minWriterVersion, emptySet(), emptySet());
this(minReaderVersion, minWriterVersion, null, null);
}

public Protocol(
Expand Down Expand Up @@ -105,6 +105,7 @@ public Set<TableFeature> getImplicitlyEnabledFeatures() {
}

public Set<TableFeature> getExplicitlyEnabledFeatures() {
// TODO: Should we throw an exception if we encouter a feature that is not known to Kernel yet?
return TableFeatures.TABLE_FEATURES.stream()
.filter(
f ->
Expand All @@ -113,7 +114,7 @@ public Set<TableFeature> getExplicitlyEnabledFeatures() {
.collect(Collectors.toSet());
}

public Set<TableFeature> getImplicitAndExplicitlyEnabledFeatures() {
public Set<TableFeature> getImplicitlyAndExplicitlyEnabledFeatures() {
Set<TableFeature> enabledFeatures = new HashSet<>();
enabledFeatures.addAll(getImplicitlyEnabledFeatures());
enabledFeatures.addAll(getExplicitlyEnabledFeatures());
Expand Down Expand Up @@ -146,20 +147,6 @@ public Row toRow() {
return new GenericRow(Protocol.FULL_SCHEMA, protocolMap);
}

public Protocol withNewWriterFeatures(Set<String> writerFeatures) {
Tuple2<Integer, Integer> newProtocolVersions =
TableFeatures.minProtocolVersionFromAutomaticallyEnabledFeatures(writerFeatures);
Set<String> newWriterFeatures = new HashSet<>(writerFeatures);
if (this.writerFeatures != null) {
newWriterFeatures.addAll(this.writerFeatures);
}
return new Protocol(
newProtocolVersions._1,
newProtocolVersions._2,
this.readerFeatures == null ? null : new HashSet<>(this.readerFeatures),
newWriterFeatures);
}

/** Create a new {@link Protocol} object with the given {@link TableFeature} enabled. */
public Protocol withFeatures(Iterable<TableFeature> newFeatures) {
Protocol result = this;
Expand Down Expand Up @@ -233,8 +220,8 @@ public Protocol withFeature(TableFeature feature) {
*/
public boolean canUpgradeTo(Protocol to) {
// All features supported by `this` are supported by `to`.
return to.getImplicitAndExplicitlyEnabledFeatures()
.containsAll(this.getImplicitAndExplicitlyEnabledFeatures());
return to.getImplicitlyAndExplicitlyEnabledFeatures()
.containsAll(this.getImplicitlyAndExplicitlyEnabledFeatures());
}

/**
Expand All @@ -261,8 +248,8 @@ public Protocol normalized() {
int minWriterVersion = versions._2;
Protocol newProtocol = new Protocol(minReaderVersion, minWriterVersion);

if (this.getImplicitAndExplicitlyEnabledFeatures()
.equals(newProtocol.getImplicitAndExplicitlyEnabledFeatures())) {
if (this.getImplicitlyAndExplicitlyEnabledFeatures()
.equals(newProtocol.getImplicitlyAndExplicitlyEnabledFeatures())) {
return newProtocol;
} else {
// means we have some that is added after table feature support.
Expand Down Expand Up @@ -352,4 +339,21 @@ private boolean supportsReaderFeatures() {
private boolean supportsWriterFeatures() {
return TableFeatures.supportsWriterFeatures(minWriterVersion);
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
Protocol protocol = (Protocol) o;
return minReaderVersion == protocol.minReaderVersion
&& minWriterVersion == protocol.minWriterVersion
&& Objects.equals(readerFeatures, protocol.readerFeatures)
&& Objects.equals(writerFeatures, protocol.writerFeatures);
}

@Override
public int hashCode() {
return Objects.hash(minReaderVersion, minWriterVersion, readerFeatures, writerFeatures);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,7 @@ protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(

if (protocol != null) {
// Stop since we have found the latest Protocol and Metadata.
TableFeatures.validateReadSupportedTable(
protocol, dataPath.toString(), Optional.of(metadata));
TableFeatures.validateReadSupportedTable(protocol, dataPath.toString());
return new Tuple2<>(protocol, metadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void checkpoint(Engine engine, Clock clock, long version)

// Check if writing to the given table protocol version/features is supported in Kernel
validateWriteSupportedTable(
snapshot.getProtocol(), snapshot.getMetadata(), snapshot.getSchema(), tablePath.toString());
snapshot.getProtocol(), snapshot.getMetadata(), tablePath.toString());

Path checkpointPath = FileNames.checkpointFileSingular(logPath, version);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.internal.actions.Metadata;
import java.util.Collections;
import java.util.Set;

Expand Down Expand Up @@ -118,6 +119,23 @@ public Set<TableFeature> requiredFeatures() {
return Collections.emptySet();
}

/**
* Does Kernel has support to read a table containing this feature? If the feature is a
* writer-only feature, this method should always return true.
*
* @return true if Kernel has support to read a table containing this feature.
*/
public abstract boolean hasKernelReadSupport();

/**
* Does Kernel has support to write a table containing this feature?
*
* @param metadata the metadata of the table. Sometimes checking the metadata is necessary to know
* the Kernel can write the table or not.
* @return true if Kernel has support to write a table containing this feature.
*/
public abstract boolean hasKernelWriteSupport(Metadata metadata);

/**
* Validate the table feature. This method should throw an exception if the table feature
* properties are invalid. Should be called after the object deriving the {@link TableFeature} is
Expand Down
Loading

0 comments on commit c905fa0

Please sign in to comment.