Skip to content

Commit

Permalink
[Kernel] Minor cleanup of APIs used for checking read/write supported…
Browse files Browse the repository at this point in the history
… protocol (#4149)

## Description
Currently, we pass optional `Metadata` to `validateReadSupportedTable`,
which is just used for checking whether it has supported column mapping
mode. We don't need to check that as if an unknown mode is passed, it
will anyway fail later on. Also if a new mode is added, it will be part
of a new table feature

Also for `validateWriteSupportedTable`, we pass the `schema` and
`metadata` separately. Instead just passing the `metadata` (which has
the `schema`) should be sufficient.

The implementation of these APIs going to change in the subsequent PRs.

## How was this patch tested?
Existing tests.
  • Loading branch information
vkorukanti authored Feb 12, 2025
1 parent b02edc8 commit c3f2cb0
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.types.StructType;
import java.util.*;
Expand Down Expand Up @@ -73,13 +72,11 @@ public class TableFeatures {
// Helper Methods //
////////////////////

public static void validateReadSupportedTable(
Protocol protocol, String tablePath, Optional<Metadata> metadata) {
public static void validateReadSupportedTable(Protocol protocol, String tablePath) {
switch (protocol.getMinReaderVersion()) {
case 1:
break;
case 2:
metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode);
break;
case 3:
List<String> readerFeatures = protocol.getReaderFeatures();
Expand All @@ -88,9 +85,6 @@ public static void validateReadSupportedTable(
unsupportedFeatures.removeAll(SUPPORTED_READER_FEATURES);
throw DeltaErrors.unsupportedReaderFeature(tablePath, unsupportedFeatures);
}
if (readerFeatures.contains("columnMapping")) {
metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode);
}
break;
default:
throw DeltaErrors.unsupportedReaderProtocol(tablePath, protocol.getMinReaderVersion());
Expand All @@ -111,18 +105,17 @@ public static void validateReadSupportedTable(
*
* @param protocol Table protocol
* @param metadata Table metadata
* @param tableSchema Table schema
*/
public static void validateWriteSupportedTable(
Protocol protocol, Metadata metadata, StructType tableSchema, String tablePath) {
Protocol protocol, Metadata metadata, String tablePath) {
int minWriterVersion = protocol.getMinWriterVersion();
switch (minWriterVersion) {
case 1:
break;
case 2:
// Append-only and column invariants are the writer features added in version 2
// Append-only is supported, but not the invariants
validateNoInvariants(tableSchema);
validateNoInvariants(metadata.getSchema());
break;
case 3:
// Check constraints are added in version 3
Expand All @@ -141,7 +134,7 @@ public static void validateWriteSupportedTable(
if (writerFeature.equals(INVARIANTS_FEATURE_NAME)) {
// For version 7, we allow 'invariants' to be present in the protocol's writerFeatures
// to unblock certain use cases, provided that no invariants are defined in the schema.
validateNoInvariants(tableSchema);
validateNoInvariants(metadata.getSchema());
} else if (!SUPPORTED_WRITER_FEATURES.contains(writerFeature)) {
throw unsupportedWriterFeature(tablePath, writerFeature);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand Down Expand Up @@ -197,8 +196,7 @@ public CloseableIterator<ColumnarBatch> getChanges(
for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) {
if (!protocolVector.isNullAt(rowId)) {
Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId);
TableFeatures.validateReadSupportedTable(
protocol, getDataPath().toString(), Optional.empty());
TableFeatures.validateReadSupportedTable(protocol, getDataPath().toString());
}
}
if (shouldDropProtocolColumn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ public Transaction build(Engine engine) {
protocol = protocol.withNewWriterFeatures(newWriterFeatures);
List<String> curWriterFeatures = protocol.getWriterFeatures();
checkArgument(!Objects.equals(oldWriterFeatures, curWriterFeatures));
TableFeatures.validateWriteSupportedTable(
protocol, metadata, metadata.getSchema(), table.getPath(engine));
TableFeatures.validateWriteSupportedTable(protocol, metadata, table.getPath(engine));
}
}

Expand All @@ -184,10 +183,7 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
String tablePath = table.getPath(engine);
// Validate the table has no features that Kernel doesn't yet support writing into it.
TableFeatures.validateWriteSupportedTable(
snapshot.getProtocol(),
snapshot.getMetadata(),
snapshot.getMetadata().getSchema(),
tablePath);
snapshot.getProtocol(), snapshot.getMetadata(), tablePath);

if (!isNewTable) {
if (schema.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ public static void checkpoint(Engine engine, Clock clock, SnapshotImpl snapshot)

// Check if writing to the given table protocol version/features is supported in Kernel
validateWriteSupportedTable(
snapshot.getProtocol(),
snapshot.getMetadata(),
snapshot.getSchema(),
snapshot.getDataPath().toString());
snapshot.getProtocol(), snapshot.getMetadata(), snapshot.getDataPath().toString());

final Path checkpointPath = FileNames.checkpointFileSingular(logPath, version);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(

if (protocol != null) {
// Stop since we have found the latest Protocol and Metadata.
TableFeatures.validateReadSupportedTable(
protocol, dataPath.toString(), Optional.of(metadata));
TableFeatures.validateReadSupportedTable(protocol, dataPath.toString());
return new Tuple2<>(protocol, metadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,6 @@ public static ColumnMappingMode getColumnMappingMode(Map<String, String> configu
.orElse(ColumnMappingMode.NONE);
}

/**
* Checks if the given column mapping mode in the given table metadata is supported. Throws on
* unsupported modes.
*
* @param metadata Metadata of the table
*/
public static void throwOnUnsupportedColumnMappingMode(Metadata metadata) {
getColumnMappingMode(metadata.getConfiguration());
}

/**
* Helper method that converts the logical schema (requested by the connector) to physical schema
* of the data stored in data files based on the table's column mapping mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,13 @@ class TableFeaturesSuite extends AnyFunSuite {
test("validateWriteSupported: protocol 2 with invariants") {
checkUnsupported(
createTestProtocol(minWriterVersion = 2),
metadata = createTestMetadata(),
schema = createTestSchema(includeInvariant = true))
metadata = createTestMetadata(includeVariant = true))
}

test("validateWriteSupported: protocol 2, with appendOnly and invariants") {
checkUnsupported(
createTestProtocol(minWriterVersion = 2),
metadata = createTestMetadata(),
schema = createTestSchema(includeInvariant = true))
metadata = createTestMetadata(includeVariant = true))
}

Seq(3, 4, 5, 6).foreach { minWriterVersion =>
Expand Down Expand Up @@ -90,33 +88,28 @@ class TableFeaturesSuite extends AnyFunSuite {

test("validateWriteSupported: protocol 7 with invariants, schema doesn't contain invariants") {
checkSupported(
createTestProtocol(minWriterVersion = 7, "invariants"),
metadata = createTestMetadata(),
schema = createTestSchema(includeInvariant = false)
createTestProtocol(minWriterVersion = 7, "invariants")
)
}

test("validateWriteSupported: protocol 7 with invariants, schema contains invariants") {
checkUnsupported(
createTestProtocol(minWriterVersion = 7, "invariants"),
metadata = createTestMetadata(),
schema = createTestSchema(includeInvariant = true)
metadata = createTestMetadata(includeVariant = true)
)
}

def checkSupported(
protocol: Protocol,
metadata: Metadata = null,
schema: StructType = createTestSchema()): Unit = {
validateWriteSupportedTable(protocol, metadata, schema, "/test/table")
metadata: Metadata = createTestMetadata()): Unit = {
validateWriteSupportedTable(protocol, metadata, "/test/table")
}

def checkUnsupported(
protocol: Protocol,
metadata: Metadata = null,
schema: StructType = createTestSchema()): Unit = {
metadata: Metadata = createTestMetadata()): Unit = {
intercept[KernelException] {
validateWriteSupportedTable(protocol, metadata, schema, "/test/table")
validateWriteSupportedTable(protocol, metadata, "/test/table")
}
}

Expand All @@ -131,18 +124,20 @@ class TableFeaturesSuite extends AnyFunSuite {
)
}

def createTestMetadata(withAppendOnly: Boolean = false): Metadata = {
def createTestMetadata(
withAppendOnly: Boolean = false, includeVariant: Boolean = false): Metadata = {
var config: Map[String, String] = Map()
if (withAppendOnly) {
config = Map("delta.appendOnly" -> "true");
}
val testSchema = createTestSchema(includeVariant);
new Metadata(
"id",
Optional.of("name"),
Optional.of("description"),
new Format("parquet", Collections.emptyMap()),
"sss",
new StructType(),
testSchema.toJson,
testSchema,
new ArrayValue() { // partitionColumns
override def getSize = 1

Expand Down

0 comments on commit c3f2cb0

Please sign in to comment.