Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Minor cleanup of APIs used for checking read/write supported protocol #4149

Merged
merged 2 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.types.StructType;
import java.util.*;
Expand Down Expand Up @@ -73,13 +72,11 @@ public class TableFeatures {
// Helper Methods //
////////////////////

public static void validateReadSupportedTable(
Protocol protocol, String tablePath, Optional<Metadata> metadata) {
public static void validateReadSupportedTable(Protocol protocol, String tablePath) {
switch (protocol.getMinReaderVersion()) {
case 1:
break;
case 2:
metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode);
break;
case 3:
List<String> readerFeatures = protocol.getReaderFeatures();
Expand All @@ -88,9 +85,6 @@ public static void validateReadSupportedTable(
unsupportedFeatures.removeAll(SUPPORTED_READER_FEATURES);
throw DeltaErrors.unsupportedReaderFeature(tablePath, unsupportedFeatures);
}
if (readerFeatures.contains("columnMapping")) {
metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode);
}
break;
default:
throw DeltaErrors.unsupportedReaderProtocol(tablePath, protocol.getMinReaderVersion());
Expand All @@ -111,18 +105,17 @@ public static void validateReadSupportedTable(
*
* @param protocol Table protocol
* @param metadata Table metadata
* @param tableSchema Table schema
*/
public static void validateWriteSupportedTable(
Protocol protocol, Metadata metadata, StructType tableSchema, String tablePath) {
Protocol protocol, Metadata metadata, String tablePath) {
int minWriterVersion = protocol.getMinWriterVersion();
switch (minWriterVersion) {
case 1:
break;
case 2:
// Append-only and column invariants are the writer features added in version 2
// Append-only is supported, but not the invariants
validateNoInvariants(tableSchema);
validateNoInvariants(metadata.getSchema());
break;
case 3:
// Check constraints are added in version 3
Expand All @@ -141,7 +134,7 @@ public static void validateWriteSupportedTable(
if (writerFeature.equals(INVARIANTS_FEATURE_NAME)) {
// For version 7, we allow 'invariants' to be present in the protocol's writerFeatures
// to unblock certain use cases, provided that no invariants are defined in the schema.
validateNoInvariants(tableSchema);
validateNoInvariants(metadata.getSchema());
} else if (!SUPPORTED_WRITER_FEATURES.contains(writerFeature)) {
throw unsupportedWriterFeature(tablePath, writerFeature);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand Down Expand Up @@ -197,8 +196,7 @@ public CloseableIterator<ColumnarBatch> getChanges(
for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) {
if (!protocolVector.isNullAt(rowId)) {
Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId);
TableFeatures.validateReadSupportedTable(
protocol, getDataPath().toString(), Optional.empty());
TableFeatures.validateReadSupportedTable(protocol, getDataPath().toString());
}
}
if (shouldDropProtocolColumn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ public Transaction build(Engine engine) {
protocol = protocol.withNewWriterFeatures(newWriterFeatures);
List<String> curWriterFeatures = protocol.getWriterFeatures();
checkArgument(!Objects.equals(oldWriterFeatures, curWriterFeatures));
TableFeatures.validateWriteSupportedTable(
protocol, metadata, metadata.getSchema(), table.getPath(engine));
TableFeatures.validateWriteSupportedTable(protocol, metadata, table.getPath(engine));
}
}

Expand All @@ -184,10 +183,7 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
String tablePath = table.getPath(engine);
// Validate the table has no features that Kernel doesn't yet support writing into it.
TableFeatures.validateWriteSupportedTable(
snapshot.getProtocol(),
snapshot.getMetadata(),
snapshot.getMetadata().getSchema(),
tablePath);
snapshot.getProtocol(), snapshot.getMetadata(), tablePath);

if (!isNewTable) {
if (schema.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ public static void checkpoint(Engine engine, Clock clock, SnapshotImpl snapshot)

// Check if writing to the given table protocol version/features is supported in Kernel
validateWriteSupportedTable(
snapshot.getProtocol(),
snapshot.getMetadata(),
snapshot.getSchema(),
snapshot.getDataPath().toString());
snapshot.getProtocol(), snapshot.getMetadata(), snapshot.getDataPath().toString());

final Path checkpointPath = FileNames.checkpointFileSingular(logPath, version);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(

if (protocol != null) {
// Stop since we have found the latest Protocol and Metadata.
TableFeatures.validateReadSupportedTable(
protocol, dataPath.toString(), Optional.of(metadata));
TableFeatures.validateReadSupportedTable(protocol, dataPath.toString());
return new Tuple2<>(protocol, metadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,6 @@ public static ColumnMappingMode getColumnMappingMode(Map<String, String> configu
.orElse(ColumnMappingMode.NONE);
}

/**
* Checks if the given column mapping mode in the given table metadata is supported. Throws on
* unsupported modes.
*
* @param metadata Metadata of the table
*/
public static void throwOnUnsupportedColumnMappingMode(Metadata metadata) {
getColumnMappingMode(metadata.getConfiguration());
}

/**
* Helper method that converts the logical schema (requested by the connector) to physical schema
* of the data stored in data files based on the table's column mapping mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,13 @@ class TableFeaturesSuite extends AnyFunSuite {
test("validateWriteSupported: protocol 2 with invariants") {
checkUnsupported(
createTestProtocol(minWriterVersion = 2),
metadata = createTestMetadata(),
schema = createTestSchema(includeInvariant = true))
metadata = createTestMetadata(includeVariant = true))
}

test("validateWriteSupported: protocol 2, with appendOnly and invariants") {
checkUnsupported(
createTestProtocol(minWriterVersion = 2),
metadata = createTestMetadata(),
schema = createTestSchema(includeInvariant = true))
metadata = createTestMetadata(includeVariant = true))
}

Seq(3, 4, 5, 6).foreach { minWriterVersion =>
Expand Down Expand Up @@ -90,33 +88,28 @@ class TableFeaturesSuite extends AnyFunSuite {

test("validateWriteSupported: protocol 7 with invariants, schema doesn't contain invariants") {
checkSupported(
createTestProtocol(minWriterVersion = 7, "invariants"),
metadata = createTestMetadata(),
schema = createTestSchema(includeInvariant = false)
createTestProtocol(minWriterVersion = 7, "invariants")
)
}

test("validateWriteSupported: protocol 7 with invariants, schema contains invariants") {
checkUnsupported(
createTestProtocol(minWriterVersion = 7, "invariants"),
metadata = createTestMetadata(),
schema = createTestSchema(includeInvariant = true)
metadata = createTestMetadata(includeVariant = true)
)
}

def checkSupported(
protocol: Protocol,
metadata: Metadata = null,
schema: StructType = createTestSchema()): Unit = {
validateWriteSupportedTable(protocol, metadata, schema, "/test/table")
metadata: Metadata = createTestMetadata()): Unit = {
validateWriteSupportedTable(protocol, metadata, "/test/table")
}

def checkUnsupported(
protocol: Protocol,
metadata: Metadata = null,
schema: StructType = createTestSchema()): Unit = {
metadata: Metadata = createTestMetadata()): Unit = {
intercept[KernelException] {
validateWriteSupportedTable(protocol, metadata, schema, "/test/table")
validateWriteSupportedTable(protocol, metadata, "/test/table")
}
}

Expand All @@ -131,18 +124,20 @@ class TableFeaturesSuite extends AnyFunSuite {
)
}

def createTestMetadata(withAppendOnly: Boolean = false): Metadata = {
def createTestMetadata(
withAppendOnly: Boolean = false, includeVariant: Boolean = false): Metadata = {
var config: Map[String, String] = Map()
if (withAppendOnly) {
config = Map("delta.appendOnly" -> "true");
}
val testSchema = createTestSchema(includeVariant);
new Metadata(
"id",
Optional.of("name"),
Optional.of("description"),
new Format("parquet", Collections.emptyMap()),
"sss",
new StructType(),
testSchema.toJson,
testSchema,
new ArrayValue() { // partitionColumns
override def getSize = 1

Expand Down
Loading