Skip to content

Commit

Permalink
Add explicit check for modifying data in delta table with CDF enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
homar authored and findepi committed Oct 10, 2022
1 parent 481c21b commit 17bf26c
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE;
import static io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId.OPTIMIZE;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.APPEND_ONLY_CONFIGURATION_KEY;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.changeDataFeedEnabled;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractColumnMetadata;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
Expand Down Expand Up @@ -1395,6 +1396,9 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkUnsupportedGeneratedColumns(handle.getMetadataEntry());
if (changeDataFeedEnabled(handle.getMetadataEntry())) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with Change Data Feed enabled is not supported");
}
checkSupportedWriterVersion(session, handle.getSchemaTableName());

return DeltaLakeTableHandle.forDelete(
Expand Down Expand Up @@ -1459,6 +1463,9 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkUnsupportedGeneratedColumns(handle.getMetadataEntry());
if (changeDataFeedEnabled(handle.getMetadataEntry())) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with Change Data Feed enabled is not supported");
}
checkSupportedWriterVersion(session, handle.getSchemaTableName());

List<DeltaLakeColumnHandle> updatedColumnHandles = updatedColumns.stream()
Expand Down Expand Up @@ -1530,6 +1537,9 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkUnsupportedGeneratedColumns(handle.getMetadataEntry());
if (changeDataFeedEnabled(handle.getMetadataEntry())) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with Change Data Feed enabled is not supported");
}
checkSupportedWriterVersion(session, handle.getSchemaTableName());

ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.Boolean.parseBoolean;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;

Expand Down Expand Up @@ -106,7 +107,7 @@ public enum ColumnMappingMode

public static boolean isAppendOnly(MetadataEntry metadataEntry)
{
return Boolean.parseBoolean(metadataEntry.getConfiguration().getOrDefault(APPEND_ONLY_CONFIGURATION_KEY, "false"));
return parseBoolean(metadataEntry.getConfiguration().getOrDefault(APPEND_ONLY_CONFIGURATION_KEY, "false"));
}

public static ColumnMappingMode getColumnMappingMode(MetadataEntry metadata)
Expand Down Expand Up @@ -434,6 +435,12 @@ public static Map<String, String> getCheckConstraints(MetadataEntry metadataEntr
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
}

public static boolean changeDataFeedEnabled(MetadataEntry metadataEntry)
{
String enableChangeDataFeed = metadataEntry.getConfiguration().getOrDefault("delta.enableChangeDataFeed", "false");
return parseBoolean(enableChangeDataFeed);
}

public static Map<String, Map<String, Object>> getColumnsMetadata(MetadataEntry metadataEntry)
{
return getColumnProperties(metadataEntry, node -> OBJECT_MAPPER.convertValue(node.get("metadata"), new TypeReference<>(){}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,4 +541,29 @@ public void testWritesToTableWithGeneratedColumnFails()
onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS})
public void testWritesToTableWithCDFFails()
{
String tableName = "test_writes_into_table_with_CDF_" + randomTableSuffix();
try {
onDelta().executeQuery("CREATE TABLE default." + tableName + " (a INT, b INT) " +
"USING DELTA " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
"TBLPROPERTIES (delta.enableChangeDataFeed = true)");

assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 2)"))
.hasMessageMatching(".* Table .* requires Delta Lake writer version 4 which is not supported");
assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 3 WHERE b = 3"))
.hasMessageContaining("Writing to tables with Change Data Feed enabled is not supported");
assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a = 3"))
.hasMessageContaining("Writing to tables with Change Data Feed enabled is not supported");
assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " +
"ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = 42"))
.hasMessageContaining("Writing to tables with Change Data Feed enabled is not supported");
}
finally {
onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName);
}
}
}

0 comments on commit 17bf26c

Please sign in to comment.