diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index f2eabdb88d06..61ac309d8a6f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -173,6 +173,7 @@ import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE; import static io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId.OPTIMIZE; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.APPEND_ONLY_CONFIGURATION_KEY; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.changeDataFeedEnabled; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractColumnMetadata; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; @@ -1395,6 +1396,9 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported"); } checkUnsupportedGeneratedColumns(handle.getMetadataEntry()); + if (changeDataFeedEnabled(handle.getMetadataEntry())) { + throw new TrinoException(NOT_SUPPORTED, "Writing to tables with Change Data Feed enabled is not supported"); + } checkSupportedWriterVersion(session, handle.getSchemaTableName()); return DeltaLakeTableHandle.forDelete( @@ -1459,6 +1463,9 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported"); } checkUnsupportedGeneratedColumns(handle.getMetadataEntry()); + if (changeDataFeedEnabled(handle.getMetadataEntry())) { + throw new TrinoException(NOT_SUPPORTED, "Writing to tables with Change Data Feed enabled is not supported"); + } checkSupportedWriterVersion(session, handle.getSchemaTableName()); List updatedColumnHandles = updatedColumns.stream() @@ -1530,6 +1537,9 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported"); } checkUnsupportedGeneratedColumns(handle.getMetadataEntry()); + if (changeDataFeedEnabled(handle.getMetadataEntry())) { + throw new TrinoException(NOT_SUPPORTED, "Writing to tables with Change Data Feed enabled is not supported"); + } checkSupportedWriterVersion(session, handle.getSchemaTableName()); ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java index ed1e4dc00335..ac4511642d64 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java @@ -70,6 +70,7 @@ import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.VarbinaryType.VARBINARY; import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.Boolean.parseBoolean; import static java.lang.String.format; import static java.util.Locale.ENGLISH; @@ -106,7 +107,7 @@ public enum ColumnMappingMode public static boolean isAppendOnly(MetadataEntry metadataEntry) { - return Boolean.parseBoolean(metadataEntry.getConfiguration().getOrDefault(APPEND_ONLY_CONFIGURATION_KEY, "false")); + return parseBoolean(metadataEntry.getConfiguration().getOrDefault(APPEND_ONLY_CONFIGURATION_KEY, "false")); } public static ColumnMappingMode getColumnMappingMode(MetadataEntry metadata) @@ -434,6 +435,12 @@ public static Map getCheckConstraints(MetadataEntry metadataEntr .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); } + public static boolean changeDataFeedEnabled(MetadataEntry metadataEntry) + { + String enableChangeDataFeed = metadataEntry.getConfiguration().getOrDefault("delta.enableChangeDataFeed", "false"); + return parseBoolean(enableChangeDataFeed); + } + public static Map> getColumnsMetadata(MetadataEntry metadataEntry) { return getColumnProperties(metadataEntry, node -> OBJECT_MAPPER.convertValue(node.get("metadata"), new TypeReference<>(){})); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java index db244d30337f..2354da0f33bc 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksInsertCompatibility.java @@ -541,4 +541,29 @@ public void testWritesToTableWithGeneratedColumnFails() onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); } } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + public void testWritesToTableWithCDFFails() + { + String tableName = "test_writes_into_table_with_CDF_" + randomTableSuffix(); + try { + onDelta().executeQuery("CREATE TABLE default." + tableName + " (a INT, b INT) " + + "USING DELTA " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + "TBLPROPERTIES (delta.enableChangeDataFeed = true)"); + + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 2)")) + .hasMessageMatching(".* Table .* requires Delta Lake writer version 4 which is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 3 WHERE b = 3")) + .hasMessageContaining("Writing to tables with Change Data Feed enabled is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a = 3")) + .hasMessageContaining("Writing to tables with Change Data Feed enabled is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " + + "ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = 42")) + .hasMessageContaining("Writing to tables with Change Data Feed enabled is not supported"); + } + finally { + onDelta().executeQuery("DROP TABLE IF EXISTS default." + tableName); + } + } }