diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 0f92016badcf..92af4b99e2d7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -49,8 +49,11 @@ import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.procedure.IcebergAddFilesFromTableHandle; import io.trino.plugin.iceberg.procedure.IcebergAddFilesHandle; +import io.trino.plugin.iceberg.procedure.IcebergCreateBranchHandle; +import io.trino.plugin.iceberg.procedure.IcebergDropBranchHandle; import io.trino.plugin.iceberg.procedure.IcebergDropExtendedStatsHandle; import io.trino.plugin.iceberg.procedure.IcebergExpireSnapshotsHandle; +import io.trino.plugin.iceberg.procedure.IcebergFastForwardHandle; import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle; import io.trino.plugin.iceberg.procedure.IcebergRemoveOrphanFilesHandle; import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle; @@ -89,6 +92,7 @@ import io.trino.spi.connector.DiscretePredicates; import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.PointerType; import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RelationColumnsMetadata; import io.trino.spi.connector.RelationCommentMetadata; @@ -228,10 +232,12 @@ import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.Maps.transformValues; import static com.google.common.collect.Sets.difference; +import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.base.filter.UtcConstraintExtractor.extractTupleDomain; import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns; import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables; import static io.trino.plugin.base.util.ExecutorUtil.processWithAdditionalThreads; +import static io.trino.plugin.base.util.Functions.checkFunctionArgument; import static io.trino.plugin.base.util.Procedures.checkProcedureArgument; import static io.trino.plugin.hive.HiveMetadata.TRANSACTIONAL; import static io.trino.plugin.hive.HiveTimestampPrecision.DEFAULT_PRECISION; @@ -318,17 +324,22 @@ import static io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog.TRINO_QUERY_START_TIME; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ADD_FILES; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.ADD_FILES_FROM_TABLE; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.CREATE_BRANCH; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DROP_BRANCH; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DROP_EXTENDED_STATS; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS; +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.FAST_FORWARD; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.REMOVE_ORPHAN_FILES; import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFiles; import static io.trino.plugin.iceberg.procedure.MigrationUtils.addFilesFromTable; +import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.COLUMN_ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.COLUMN_NOT_FOUND; import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY; import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; +import static io.trino.spi.StandardErrorCode.NOT_FOUND; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED; import static io.trino.spi.StandardErrorCode.QUERY_REJECTED; @@ -360,6 +371,7 @@ import static java.util.stream.Collectors.joining; import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations; import static org.apache.iceberg.ReachableFileUtil.statisticsFilesLocations; +import static org.apache.iceberg.SnapshotRef.MAIN_BRANCH; import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP; @@ -473,7 +485,8 @@ public ConnectorTableHandle getTableHandle( ConnectorSession session, SchemaTableName tableName, Optional startVersion, - Optional endVersion) + Optional endVersion, + Optional branch) { if (startVersion.isPresent()) { throw new TrinoException(NOT_SUPPORTED, "Read table with start version is not supported"); @@ -494,7 +507,7 @@ public ConnectorTableHandle getTableHandle( BaseTable storageTable = catalog.getMaterializedViewStorageTable(session, materializedViewName) .orElseThrow(() -> new TrinoException(TABLE_NOT_FOUND, "Storage table metadata not found for materialized view " + tableName)); - return tableHandleForCurrentSnapshot(session, tableName, storageTable); + return tableHandleForCurrentSnapshot(session, tableName, storageTable, Optional.empty()); } if (!isDataTable(tableName.getTableName())) { @@ -518,20 +531,28 @@ public ConnectorTableHandle getTableHandle( throw e; } - if (endVersion.isPresent()) { - long snapshotId = getSnapshotIdFromVersion(session, table, endVersion.get()); + if (endVersion.isPresent() || branch.isPresent()) { + checkArgument(endVersion.isEmpty() || branch.isEmpty(), "Cannot specify both end version and branch"); + ConnectorTableVersion version = endVersion.orElseGet(() -> new ConnectorTableVersion(PointerType.TARGET_ID, VARCHAR, utf8Slice(branch.get()))); + long snapshotId = getSnapshotIdFromVersion(session, table, version); + Optional partitionSpec = Optional.empty(); + if (branch.isPresent()) { + int schemaId = table.snapshot(snapshotId).schemaId(); + partitionSpec = Optional.of(table.specs().get(schemaId)); + } return tableHandleForSnapshot( session, tableName, table, Optional.of(snapshotId), schemaFor(table, snapshotId), - Optional.empty()); + partitionSpec, + branch); } - return tableHandleForCurrentSnapshot(session, tableName, table); + return tableHandleForCurrentSnapshot(session, tableName, table, branch); } - private IcebergTableHandle tableHandleForCurrentSnapshot(ConnectorSession session, SchemaTableName tableName, BaseTable table) + private IcebergTableHandle tableHandleForCurrentSnapshot(ConnectorSession session, SchemaTableName tableName, BaseTable table, Optional branch) { return tableHandleForSnapshot( session, @@ -539,7 +560,8 @@ private IcebergTableHandle tableHandleForCurrentSnapshot(ConnectorSession sessio table, Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId), table.schema(), - Optional.of(table.spec())); + Optional.of(table.spec()), + branch); } private IcebergTableHandle tableHandleForSnapshot( @@ -548,7 +570,8 @@ private IcebergTableHandle tableHandleForSnapshot( BaseTable table, Optional tableSnapshotId, Schema tableSchema, - Optional partitionSpec) + Optional partitionSpec, + Optional branch) { Map tableProperties = table.properties(); return new IcebergTableHandle( @@ -568,6 +591,7 @@ private IcebergTableHandle tableHandleForSnapshot( table.location(), table.properties(), getTablePartitioning(session, table), + branch, false, Optional.empty(), ImmutableSet.of(), @@ -606,7 +630,7 @@ private Optional getTablePartitioning(ConnectorSession IntStream.range(0, partitioningHandle.partitionFunctions().size()).boxed().collect(toImmutableList()))); } - private static long getSnapshotIdFromVersion(ConnectorSession session, Table table, ConnectorTableVersion version) + public static long getSnapshotIdFromVersion(ConnectorSession session, Table table, ConnectorTableVersion version) { io.trino.spi.type.Type versionType = version.getVersionType(); return switch (version.getPointerType()) { @@ -1073,7 +1097,7 @@ public void dropSchema(ConnectorSession session, String schemaName, boolean casc dropView(session, viewName); } for (SchemaTableName tableName : listTables(session, Optional.of(schemaName))) { - dropTable(session, getTableHandle(session, tableName, Optional.empty(), Optional.empty())); + dropTable(session, getTableHandle(session, tableName, Optional.empty(), Optional.empty(), Optional.empty())); } } catalog.dropNamespace(session, schemaName); @@ -1183,7 +1207,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con String tableLocation = null; if (replace) { - ConnectorTableHandle tableHandle = getTableHandle(session, tableMetadata.getTableSchema().getTable(), Optional.empty(), Optional.empty()); + ConnectorTableHandle tableHandle = getTableHandle(session, tableMetadata.getTableSchema().getTable(), Optional.empty(), Optional.empty(), Optional.empty()); if (tableHandle != null) { checkValidTableHandle(tableHandle); IcebergTableHandle table = (IcebergTableHandle) tableHandle; @@ -1211,7 +1235,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con "Cannot create a table on a non-empty location: %s, set 'iceberg.unique-table-location=true' in your Iceberg catalog properties " + "to use unique table locations for every table.", location)); } - return newWritableTableHandle(tableMetadata.getTable(), transaction.table(), retryMode); + return newWritableTableHandle(tableMetadata.getTable(), transaction.table(), SchemaParser.toJson(transaction.table().schema()), Optional.empty(), retryMode); } catch (IOException e) { throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking new table's location: " + location, e); @@ -1313,12 +1337,15 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto { IcebergTableHandle table = (IcebergTableHandle) tableHandle; Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + Optional branch = table.getBranch(); - validateNotModifyingOldSnapshot(table, icebergTable); + branch.ifPresentOrElse( + name -> checkBranch(icebergTable, name), + () -> validateNotModifyingOldSnapshot(table, icebergTable)); beginTransaction(icebergTable); - return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); + return newWritableTableHandle(table.getSchemaTableName(), icebergTable, table.getTableSchemaJson(), branch, retryMode); } private List getChildNamespaces(ConnectorSession session, String parentNamespace) @@ -1334,20 +1361,22 @@ private List getChildNamespaces(ConnectorSession session, String parentN .collect(toImmutableList()); } - private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName name, Table table, RetryMode retryMode) + private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName schemaTableName, Table table, String schemaAsJson, Optional branch, RetryMode retryMode) { + Schema schema = SchemaParser.fromJson(schemaAsJson); return new IcebergWritableTableHandle( - name, - SchemaParser.toJson(table.schema()), + schemaTableName, + schemaAsJson, transformValues(table.specs(), PartitionSpecParser::toJson), table.spec().specId(), - getSupportedSortFields(table.schema(), table.sortOrder()), - getProjectedColumns(table.schema(), typeManager), + getSupportedSortFields(schema, table.sortOrder()), + getProjectedColumns(schema, typeManager), table.location(), getFileFormat(table), table.properties(), retryMode, - table.io().properties()); + table.io().properties(), + branch); } private static List getSupportedSortFields(Schema schema, SortOrder sortOrder) @@ -1417,6 +1446,7 @@ public Optional finishInsert( } appendFiles.appendFile(builder.build()); + table.branch().ifPresent(appendFiles::toBranch); writtenFiles.add(task.path()); } @@ -1427,7 +1457,7 @@ public Optional finishInsert( commitUpdateAndTransaction(appendFiles, session, transaction, "insert"); // TODO (https://github.com/trinodb/trino/issues/15439) this may not exactly be the snapshot we committed, if there is another writer - long newSnapshotId = transaction.table().currentSnapshot().snapshotId(); + long newSnapshotId = table.branch().isEmpty() ? transaction.table().currentSnapshot().snapshotId() : transaction.table().refs().get(table.branch().get()).snapshotId(); transaction = null; // TODO (https://github.com/trinodb/trino/issues/15439): it would be good to publish data and stats atomically @@ -1577,6 +1607,9 @@ public Optional getTableHandleForExecute( case REMOVE_ORPHAN_FILES -> getTableHandleForRemoveOrphanFiles(session, tableHandle, executeProperties); case ADD_FILES -> getTableHandleForAddFiles(session, accessControl, tableHandle, executeProperties); case ADD_FILES_FROM_TABLE -> getTableHandleForAddFilesFromTable(session, accessControl, tableHandle, executeProperties); + case CREATE_BRANCH -> getTableHandleForCreateBranch(session, accessControl, tableHandle, executeProperties); + case DROP_BRANCH -> getTableHandleForDropBranch(session, accessControl, tableHandle, executeProperties); + case FAST_FORWARD -> getTableHandleForFastForward(session, tableHandle, executeProperties); }; } @@ -1645,6 +1678,51 @@ private Optional getTableHandleForRemoveOrphanFiles icebergTable.io().properties())); } + private Optional getTableHandleForCreateBranch(ConnectorSession session, ConnectorAccessControl accessControl, IcebergTableHandle tableHandle, Map executeProperties) + { + String name = (String) executeProperties.get("name"); + accessControl.checkCanCreateBranchAndTag(null, tableHandle.getSchemaTableName(), name); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + CREATE_BRANCH, + new IcebergCreateBranchHandle(name), + icebergTable.location(), + icebergTable.io().properties())); + } + + private Optional getTableHandleForDropBranch(ConnectorSession session, ConnectorAccessControl accessControl, IcebergTableHandle tableHandle, Map executeProperties) + { + String name = (String) executeProperties.get("name"); + accessControl.checkCanDropBranchAndTag(null, tableHandle.getSchemaTableName(), name); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + DROP_BRANCH, + new IcebergDropBranchHandle(name), + icebergTable.location(), + icebergTable.io().properties())); + } + + private Optional getTableHandleForFastForward(ConnectorSession session, IcebergTableHandle tableHandle, Map executeProperties) + { + String from = (String) executeProperties.get("from"); + String to = (String) executeProperties.get("to"); + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + if (!icebergTable.refs().containsKey(from)) { + throw new TrinoException(ALREADY_EXISTS, "Branch '%s' does not exit".formatted(from)); + } + if (!icebergTable.refs().containsKey(to)) { + throw new TrinoException(ALREADY_EXISTS, "Branch '%s' does not exit".formatted(to)); + } + return Optional.of(new IcebergTableExecuteHandle( + tableHandle.getSchemaTableName(), + FAST_FORWARD, + new IcebergFastForwardHandle(from, to), + icebergTable.location(), + icebergTable.io().properties())); + } + private Optional getTableHandleForAddFiles(ConnectorSession session, ConnectorAccessControl accessControl, IcebergTableHandle tableHandle, Map executeProperties) { if (!addFilesProcedureEnabled) { @@ -1762,6 +1840,9 @@ public Optional getLayoutForTableExecute(ConnectorSession case REMOVE_ORPHAN_FILES: case ADD_FILES: case ADD_FILES_FROM_TABLE: + case CREATE_BRANCH: + case DROP_BRANCH: + case FAST_FORWARD: // handled via executeTableExecute } throw new IllegalArgumentException("Unknown procedure '" + executeHandle.procedureId() + "'"); @@ -1791,6 +1872,9 @@ public BeginTableExecuteResult> readerForManifest(Table table, ManifestFile manifest) { return switch (manifest.content()) { @@ -2652,7 +2806,7 @@ public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Connector return TableStatisticsMetadata.empty(); } - ConnectorTableHandle tableHandle = getTableHandle(session, tableMetadata.getTable(), Optional.empty(), Optional.empty()); + ConnectorTableHandle tableHandle = getTableHandle(session, tableMetadata.getTable(), Optional.empty(), Optional.empty(), Optional.empty()); if (tableHandle == null) { // Assume new table (CTAS), collect NDV stats on all columns return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {}); @@ -2842,11 +2996,15 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT verifyTableVersionForUpdate(table); Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); - validateNotModifyingOldSnapshot(table, icebergTable); + Optional branch = table.getBranch(); + + branch.ifPresentOrElse( + name -> checkBranch(icebergTable, name), + () -> validateNotModifyingOldSnapshot(table, icebergTable)); beginTransaction(icebergTable); - IcebergWritableTableHandle insertHandle = newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); + IcebergWritableTableHandle insertHandle = newWritableTableHandle(table.getSchemaTableName(), icebergTable, table.getTableSchemaJson(), branch, retryMode); return new IcebergMergeTableHandle(table, insertHandle); } @@ -2855,8 +3013,9 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg { IcebergMergeTableHandle mergeHandle = (IcebergMergeTableHandle) mergeTableHandle; IcebergTableHandle handle = mergeHandle.getTableHandle(); + Optional branch = mergeHandle.getInsertTableHandle().branch(); RetryMode retryMode = mergeHandle.getInsertTableHandle().retryMode(); - finishWrite(session, handle, fragments, retryMode); + finishWrite(session, handle, branch, fragments, retryMode); } private static void verifyTableVersionForUpdate(IcebergTableHandle table) @@ -2873,7 +3032,7 @@ private static void validateNotModifyingOldSnapshot(IcebergTableHandle table, Ta } } - private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection fragments, RetryMode retryMode) + private void finishWrite(ConnectorSession session, IcebergTableHandle table, Optional branch, Collection fragments, RetryMode retryMode) { Table icebergTable = transaction.table(); @@ -2890,6 +3049,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col Schema schema = SchemaParser.fromJson(table.getTableSchemaJson()); RowDelta rowDelta = transaction.newRowDelta(); + branch.ifPresent(rowDelta::toBranch); table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId())); TupleDomain dataColumnPredicate = table.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId())); TupleDomain convertibleUnenforcedPredicate = table.getUnenforcedPredicate().filter((_, domain) -> isConvertibleToIcebergExpression(domain)); @@ -3024,6 +3184,10 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle DeleteFiles deleteFiles = icebergTable.newDelete() .deleteFromRowFilter(toIcebergExpression(handle.getEnforcedPredicate())); + handle.getBranch().ifPresent(branch -> { + checkBranch(icebergTable, branch); + deleteFiles.toBranch(branch); + }); commit(deleteFiles, session); Map summary = icebergTable.currentSnapshot().summary(); @@ -3082,6 +3246,7 @@ public Optional> applyLimit(Connect table.getTableLocation(), table.getStorageProperties(), table.getTablePartitioning(), + table.getBranch(), table.isRecordScannedFiles(), table.getMaxScannedFileSize(), table.getConstraintColumns(), @@ -3191,6 +3356,7 @@ else if (isMetadataColumnId(columnHandle.getId())) { table.getTableLocation(), table.getStorageProperties(), table.getTablePartitioning(), + table.getBranch(), table.isRecordScannedFiles(), table.getMaxScannedFileSize(), newConstraintColumns, @@ -3342,6 +3508,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab originalHandle.getTableLocation(), originalHandle.getStorageProperties(), Optional.empty(), // requiredTablePartitioning does not affect stats + originalHandle.getBranch(), false, // recordScannedFiles does not affect stats originalHandle.getMaxScannedFileSize(), ImmutableSet.of(), // constraintColumns do not affect stats @@ -3437,7 +3604,7 @@ && getOnlyElement(sourceTableHandles) instanceof IcebergTableHandle handle fromSnapshotForRefresh = Optional.of(Long.parseLong(sourceTable.getValue())); } - return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); + return newWritableTableHandle(table.getSchemaTableName(), icebergTable, table.getTableSchemaJson(), Optional.empty(), retryMode); } @Override @@ -3637,7 +3804,7 @@ else if (strings.size() != 2) { String schema = strings.get(0); String name = strings.get(1); SchemaTableName schemaTableName = new SchemaTableName(schema, name); - ConnectorTableHandle tableHandle = getTableHandle(session, schemaTableName, Optional.empty(), Optional.empty()); + ConnectorTableHandle tableHandle = getTableHandle(session, schemaTableName, Optional.empty(), Optional.empty(), Optional.empty()); if (tableHandle == null || tableHandle instanceof CorruptedIcebergTableHandle) { // Base table is gone or table is corrupted @@ -3852,4 +4019,15 @@ private static TableStatistics mergeColumnStatistics(TableStatistics currentStat newStats.getColumnStatistics().forEach(statisticsBuilder::setColumnStatistics); return statisticsBuilder.build(); } + + private static void checkBranch(Table table, String branch) + { + SnapshotRef ref = table.refs().get(branch); + if (ref == null) { + throw new TrinoException(NOT_FOUND, "Branch does not exist: '%s'".formatted(branch)); + } + if (ref.isTag()) { + throw new TrinoException(NOT_SUPPORTED, "Branch '%s' does not exist, but a tag with that name exists".formatted(branch)); + } + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index 0992e51d2375..f7ce2830d85e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -44,8 +44,11 @@ import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProvider; import io.trino.plugin.iceberg.procedure.AddFilesTableFromTableProcedure; import io.trino.plugin.iceberg.procedure.AddFilesTableProcedure; +import io.trino.plugin.iceberg.procedure.CreateBranchProcedure; +import io.trino.plugin.iceberg.procedure.DropBranchProcedure; import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure; import io.trino.plugin.iceberg.procedure.ExpireSnapshotsTableProcedure; +import io.trino.plugin.iceberg.procedure.FastForwardProcedure; import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure; import io.trino.plugin.iceberg.procedure.RegisterTableProcedure; import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure; @@ -134,6 +137,9 @@ public void configure(Binder binder) tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON); tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON); tableProcedures.addBinding().toProvider(AddFilesTableFromTableProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(CreateBranchProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(DropBranchProcedure.class).in(Scopes.SINGLETON); + tableProcedures.addBinding().toProvider(FastForwardProcedure.class).in(Scopes.SINGLETON); newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON); binder.bind(FunctionProvider.class).to(IcebergFunctionProvider.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java index c1f6e4c6de7d..8c8d8ca99974 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java @@ -151,6 +151,9 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa case REMOVE_ORPHAN_FILES: case ADD_FILES: case ADD_FILES_FROM_TABLE: + case CREATE_BRANCH: + case DROP_BRANCH: + case FAST_FORWARD: // handled via ConnectorMetadata.executeTableExecute } throw new IllegalArgumentException("Unknown procedure: " + executeHandle.procedureId()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java index 21e1e0535e1e..c4b41761fce7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java @@ -49,6 +49,7 @@ public class IcebergTableHandle private final int formatVersion; private final String tableLocation; private final Map storageProperties; + private final Optional branch; // Filter used during split generation and table scan, but not required to be strictly enforced by Iceberg Connector private final TupleDomain unenforcedPredicate; @@ -92,7 +93,8 @@ public static IcebergTableHandle fromJsonForDeserializationOnly( @JsonProperty("projectedColumns") Set projectedColumns, @JsonProperty("nameMappingJson") Optional nameMappingJson, @JsonProperty("tableLocation") String tableLocation, - @JsonProperty("storageProperties") Map storageProperties) + @JsonProperty("storageProperties") Map storageProperties, + @JsonProperty("branch") Optional branch) { return new IcebergTableHandle( catalog, @@ -111,6 +113,7 @@ public static IcebergTableHandle fromJsonForDeserializationOnly( tableLocation, storageProperties, Optional.empty(), + branch, false, Optional.empty(), ImmutableSet.of(), @@ -134,6 +137,7 @@ public IcebergTableHandle( String tableLocation, Map storageProperties, Optional tablePartitioning, + Optional branch, boolean recordScannedFiles, Optional maxScannedFileSize, Set constraintColumns, @@ -155,6 +159,7 @@ public IcebergTableHandle( this.tableLocation = requireNonNull(tableLocation, "tableLocation is null"); this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null")); this.tablePartitioning = requireNonNull(tablePartitioning, "tablePartitioning is null"); + this.branch = requireNonNull(branch, "branch is null"); this.recordScannedFiles = recordScannedFiles; this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null")); @@ -261,6 +266,12 @@ public Optional getTablePartitioning() return tablePartitioning; } + @JsonProperty + public Optional getBranch() + { + return branch; + } + @JsonIgnore public boolean isRecordScannedFiles() { @@ -314,6 +325,7 @@ public IcebergTableHandle withProjectedColumns(Set projecte tableLocation, storageProperties, tablePartitioning, + branch, recordScannedFiles, maxScannedFileSize, constraintColumns, @@ -339,6 +351,7 @@ public IcebergTableHandle forAnalyze() tableLocation, storageProperties, tablePartitioning, + branch, recordScannedFiles, maxScannedFileSize, constraintColumns, @@ -364,6 +377,7 @@ public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxSc tableLocation, storageProperties, tablePartitioning, + branch, recordScannedFiles, Optional.of(maxScannedFileSize), constraintColumns, @@ -389,6 +403,7 @@ public IcebergTableHandle withTablePartitioning(Optional SUPPORTED_PROPERTIES = ImmutableSet.builder() @@ -73,6 +74,7 @@ public class IcebergTableProperties .add(ORC_BLOOM_FILTER_FPP_PROPERTY) .add(OBJECT_STORE_LAYOUT_ENABLED_PROPERTY) .add(DATA_LOCATION_PROPERTY) + .add(TARGET_BRANCH_PROPERTY) .add(EXTRA_PROPERTIES_PROPERTY) .add(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY) .build(); @@ -190,6 +192,11 @@ public IcebergTableProperties( "File system location URI for the table's data files", null, false)) + .add(stringProperty( + TARGET_BRANCH_PROPERTY, + "Target branch name", + null, + true)) .build(); checkState(SUPPORTED_PROPERTIES.containsAll(tableProperties.stream() @@ -274,6 +281,11 @@ public static Optional getDataLocation(Map tableProperti return Optional.ofNullable((String) tableProperties.get(DATA_LOCATION_PROPERTY)); } + public static Optional getTargetBranch(Map tableProperties) + { + return Optional.ofNullable((String) tableProperties.get(TARGET_BRANCH_PROPERTY)); + } + public static Optional> getExtraProperties(Map tableProperties) { return Optional.ofNullable((Map) tableProperties.get(EXTRA_PROPERTIES_PROPERTY)); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java index 7f347564c3b7..831deb830771 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -37,7 +38,8 @@ public record IcebergWritableTableHandle( IcebergFileFormat fileFormat, Map storageProperties, RetryMode retryMode, - Map fileIoProperties) + Map fileIoProperties, + Optional branch) implements ConnectorInsertTableHandle, ConnectorOutputTableHandle { public IcebergWritableTableHandle @@ -53,6 +55,7 @@ public record IcebergWritableTableHandle( requireNonNull(retryMode, "retryMode is null"); checkArgument(partitionsSpecsAsJson.containsKey(partitionSpecId), "partitionSpecId missing from partitionSpecs"); fileIoProperties = ImmutableMap.copyOf(requireNonNull(fileIoProperties, "fileIoProperties is null")); + requireNonNull(branch, "branch is null"); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java new file mode 100644 index 000000000000..81cdf9f830b6 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/CreateBranchProcedure.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Provider; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.session.PropertyMetadata; + +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.CREATE_BRANCH; +import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.stringProperty; + +public class CreateBranchProcedure + implements Provider +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + CREATE_BRANCH.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(stringProperty( + "name", + "Branch name", + null, + false)) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java new file mode 100644 index 000000000000..123dbe678b90 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/DropBranchProcedure.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Provider; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.session.PropertyMetadata; + +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DROP_BRANCH; +import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.stringProperty; + +public class DropBranchProcedure + implements Provider +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + DROP_BRANCH.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(stringProperty( + "name", + "Branch name", + null, + false)) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java new file mode 100644 index 000000000000..e3ddd8af7f39 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/FastForwardProcedure.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Provider; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.session.PropertyMetadata; + +import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.FAST_FORWARD; +import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.stringProperty; + +public class FastForwardProcedure + implements Provider +{ + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + FAST_FORWARD.name(), + coordinatorOnly(), + ImmutableList.>builder() + .add(stringProperty( + "from", + "Branch to fast-forward", + null, + false)) + .add(stringProperty( + "to", + "Ref for the from branch to be fast forwarded to", + null, + false)) + .build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java new file mode 100644 index 000000000000..3cd9a0986bfd --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergCreateBranchHandle.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import static java.util.Objects.requireNonNull; + +public record IcebergCreateBranchHandle(String name) + implements IcebergProcedureHandle +{ + public IcebergCreateBranchHandle + { + requireNonNull(name, "name is null"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java new file mode 100644 index 000000000000..27de2deb8385 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergDropBranchHandle.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import static java.util.Objects.requireNonNull; + +public record IcebergDropBranchHandle(String name) + implements IcebergProcedureHandle +{ + public IcebergDropBranchHandle + { + requireNonNull(name, "name is null"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java new file mode 100644 index 000000000000..23dc9daf0e45 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergFastForwardHandle.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import static java.util.Objects.requireNonNull; + +public record IcebergFastForwardHandle(String from, String to) + implements IcebergProcedureHandle +{ + public IcebergFastForwardHandle + { + requireNonNull(from, "from is null"); + requireNonNull(to, "to is null"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java index 1577b867363e..d0f242b6b2e3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergProcedureHandle.java @@ -26,5 +26,8 @@ @JsonSubTypes.Type(value = IcebergRemoveOrphanFilesHandle.class, name = "remove_orphan_files"), @JsonSubTypes.Type(value = IcebergAddFilesHandle.class, name = "add_files"), @JsonSubTypes.Type(value = IcebergAddFilesFromTableHandle.class, name = "add_files_from_table"), + @JsonSubTypes.Type(value = IcebergCreateBranchHandle.class, name = "create_branch"), + @JsonSubTypes.Type(value = IcebergDropBranchHandle.class, name = "drop_branch"), + @JsonSubTypes.Type(value = IcebergFastForwardHandle.class, name = "fast_forward"), }) public interface IcebergProcedureHandle {} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java index 31884f752469..bc555708fab9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergTableProcedureId.java @@ -21,4 +21,7 @@ public enum IcebergTableProcedureId REMOVE_ORPHAN_FILES, ADD_FILES, ADD_FILES_FROM_TABLE, + CREATE_BRANCH, + DROP_BRANCH, + FAST_FORWARD, } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java new file mode 100644 index 000000000000..98fccc939007 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergBranching.java @@ -0,0 +1,382 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.metastore.HiveMetastore; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import io.trino.testing.sql.TestTable; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.Table; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; +import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +final class TestIcebergBranching + extends AbstractTestQueryFramework +{ + private HiveMetastore metastore; + private TrinoFileSystemFactory fileSystemFactory; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + QueryRunner queryRunner = IcebergQueryRunner.builder().build(); + metastore = getHiveMetastore(queryRunner); + fileSystemFactory = getFileSystemFactory(queryRunner); + return queryRunner; + } + + @Test + void testCreateBranch() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_create_branch", "(x int)")) { + assertBranch(table.getName(), "main"); + + createBranch(table.getName(), "test-branch"); + assertBranch(table.getName(), "main", "test-branch"); + + createBranch(table.getName(), "TEST-BRANCH"); + assertBranch(table.getName(), "main", "test-branch", "TEST-BRANCH"); + + assertQueryFails("ALTER TABLE " + table.getName() + " EXECUTE create_branch('test-branch')", "Branch 'test-branch' already exists"); + } + } + + @Test + void testDropBranch() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_drop_branch", "(x int)")) { + assertBranch(table.getName(), "main"); + + createBranch(table.getName(), "test-branch"); + createBranch(table.getName(), "TEST-BRANCH"); + assertBranch(table.getName(), "main", "test-branch", "TEST-BRANCH"); + + dropBranch(table.getName(), "test-branch"); + dropBranch(table.getName(), "TEST-BRANCH"); + assertBranch(table.getName(), "main"); + + assertQueryFails("ALTER TABLE " + table.getName() + " EXECUTE drop_branch('test-branch')", "Branch 'test-branch' does not exit"); + assertQueryFails("ALTER TABLE " + table.getName() + " EXECUTE drop_branch('main')", "Cannot drop 'main' branch"); + } + } + + @Test + void testFastForward() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_fast_forward", "(x int)")) { + assertBranch(table.getName(), "main"); + + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES 1, 2, 3", 3); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(0L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + fastForward(table.getName(), "main", "test-branch"); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(3L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + assertQueryFails( + "ALTER TABLE " + table.getName() + " EXECUTE fast_forward('non-existing-branch', 'main')", + "Branch 'non-existing-branch' does not exit"); + assertQueryFails( + "ALTER TABLE " + table.getName() + " EXECUTE fast_forward('main', 'non-existing-branch')", + "Branch 'non-existing-branch' does not exit"); + } + } + + @Test + void testFastForwardNotAncestor() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_fast_forward", "(x int)")) { + createBranch(table.getName(), "test-branch"); + + assertUpdate("INSERT INTO " + table.getName() + " VALUES 1", 1); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES 1, 2, 3", 3); + + assertQueryFails( + "ALTER TABLE " + table.getName() + " EXECUTE fast_forward('main', 'test-branch')", + "Branch 'main' is not an ancestor of 'test-branch'"); + } + } + + @Test + void testInsert() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_insert_into_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + + // insert into main (default) branch + assertUpdate("INSERT INTO " + table.getName() + " @ 'main' VALUES (1, 2)", 1); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(1L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(0L); + + // insert into another branch + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (10, 20), (30, 40)", 2); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(1L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(2L); + + // insert into another branch with a partial column + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' (x) VALUES 50", 1); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(1L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + assertQueryFails( + "INSERT INTO " + table.getName() + " @ 'non-existing' VALUES (1, 2, 3)", + "Cannot find snapshot with reference name: non-existing"); + } + } + + @Test + void testInsertAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_insert_into_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 2)", 1); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN z int"); + + assertQueryFails( + "INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (1, 2, 3)", + "\\Qline 1:1: Insert query has mismatched column types: Table: [integer, integer], Query: [integer, integer, integer]"); + + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' SELECT x + 10, y + 10 FROM " + table.getName(), 1); + assertThat(query("SELECT * FROM " + table.getName())) + .matches("VALUES (1, 2, CAST(NULL AS integer))"); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES (11, 12, CAST(NULL AS integer))"); + } + } + + @Test + void testInsertIntoTag() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_tag", "(x int, y int)")) { + createTag(table.getName(), "test-tag"); + assertQueryFails( + "INSERT INTO " + table.getName() + " @ 'test-tag' VALUES (1, 2)", + "Branch 'test-tag' does not exist, but a tag with that name exists"); + } + } + + @Test + void testDelete() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_from_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (1, 10), (2, 20), (3, 30)", 3); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(0L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(3L); + + assertUpdate("DELETE FROM " + table.getName() + " @ 'test-branch'"); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName())) + .isEqualTo(0L); + assertThat(computeScalar("SELECT count(*) FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .isEqualTo(0L); + + assertQueryFails( + "DELETE FROM " + table.getName() + " @ 'non-existing'", + "Cannot find snapshot with reference name: non-existing"); + } + } + + @Test + void testDeleteAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_from_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (1, 10), (2, 20), (3, 30)", 3); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN y"); + + // TODO This should be fixed after once https://github.com/trinodb/trino/issues/23601 is resolved + assertThat(query("DELETE FROM " + table.getName() + " @ 'test-branch' WHERE y = 30")).nonTrinoExceptionFailure() + .hasMessageContaining("Invalid metadata file") + .hasStackTraceContaining("Cannot find field 'y'"); + + // branch returns the latest schema once a new snapshot is created + assertUpdate("DELETE FROM " + table.getName() + " @ 'test-branch' WHERE x = 1", 1); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 2, 3"); + } + } + + @Test + void testDeleteFromTag() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_tag", "(x int, y int)")) { + createTag(table.getName(), "test-tag"); + assertQueryFails( + "DELETE FROM " + table.getName() + " @ 'test-tag'", + "Branch 'test-tag' does not exist, but a tag with that name exists"); + } + } + + @Test + void testUpdate() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_branch", "(x int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES 1, 2, 3", 3); + + assertUpdate("UPDATE " + table.getName() + " @ 'test-branch' SET x = x * 2", 3); + assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName()); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 2, 4, 6"); + + assertQueryFails( + "UPDATE " + table.getName() + " @ 'non-existing' SET x = x * 2", + "Cannot find snapshot with reference name: non-existing"); + } + } + + @Test + void testUpdateAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + assertUpdate("INSERT INTO " + table.getName() + " @ 'test-branch' VALUES (1, 10), (2, 20), (3, 30)", 3); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN y"); + assertUpdate("UPDATE " + table.getName() + " @ 'test-branch' SET y = 10", 3); + + // branch returns the latest schema once a new snapshot is created + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 1, 2, 3"); + } + } + + @Test + void testUpdateTag() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_tag", "(x int, y int)")) { + createTag(table.getName(), "test-tag"); + assertQueryFails( + "UPDATE " + table.getName() + " @ 'test-tag' SET x = 2", + "Branch 'test-tag' does not exist, but a tag with that name exists"); + } + } + + @Test + void testMerge() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_merge_branch", "(x int)")) { + createBranch(table.getName(), "test-branch"); + + assertUpdate("MERGE INTO " + table.getName() + " @ 'test-branch' USING (VALUES 42) t(dummy) ON false " + + " WHEN NOT MATCHED THEN INSERT VALUES (1)", 1); + assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName()); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 1"); + + assertUpdate("MERGE INTO " + table.getName() + " @ 'test-branch' USING (VALUES 42) t(dummy) ON true " + + " WHEN MATCHED THEN UPDATE SET x = 10", 1); + assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName()); + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 10"); + + assertQueryFails( + "MERGE INTO " + table.getName() + " @ 'not-existing' USING (VALUES 42) t(dummy) ON false " + + " WHEN NOT MATCHED THEN INSERT VALUES (1)", + "Cannot find snapshot with reference name: not-existing"); + } + } + + @Test + void testMergeAfterSchemaEvolution() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_merge_branch", "(x int, y int)")) { + createBranch(table.getName(), "test-branch"); + + // change table definition on main branch + assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN y"); + assertUpdate("MERGE INTO " + table.getName() + " @ 'test-branch' USING (VALUES 42) t(dummy) ON false " + + " WHEN NOT MATCHED THEN INSERT VALUES (1, 2)", 1); + + // branch returns the latest schema once a new snapshot is created + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF 'test-branch'")) + .matches("VALUES 1"); + } + } + + @Test + void testMergeIntoTag() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_tag", "(x int, y int)")) { + createTag(table.getName(), "test-tag"); + assertQueryFails( + "MERGE INTO " + table.getName() + " @ 'test-tag' USING (VALUES 42) t(dummy) ON false WHEN NOT MATCHED THEN INSERT VALUES (1, 2)", + "Branch 'test-tag' does not exist, but a tag with that name exists"); + } + } + + private void createBranch(String table, String branch) + { + assertUpdate("ALTER TABLE " + table + " EXECUTE create_branch('" + branch + "')"); + } + + private void dropBranch(String table, String branch) + { + assertUpdate("ALTER TABLE " + table + " EXECUTE drop_branch('" + branch + "')"); + } + + private void fastForward(String table, String from, String to) + { + assertUpdate("ALTER TABLE " + table + " EXECUTE fast_forward('" + from + "', '" + to + "')"); + } + + private void createTag(String table, String tag) + { + BaseTable icebergTable = loadTable(table); + icebergTable.manageSnapshots() + .createTag(tag, icebergTable.currentSnapshot().snapshotId()) + .commit(); + } + + private void assertBranch(String tableName, String... branchNames) + { + Table table = loadTable(tableName); + table.refresh(); + assertThat(table.refs()).containsOnlyKeys(branchNames); + } + + private BaseTable loadTable(String tableName) + { + return IcebergTestUtils.loadTable(tableName, metastore, fileSystemFactory, "iceberg", "tpch"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index 01bcecfe8d50..e55852b6f8a4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -172,6 +172,7 @@ public void testDynamicSplitPruningOnUnpartitionedTable() tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -233,6 +234,7 @@ public void testDynamicSplitPruningOnUnpartitionedTable() tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -344,6 +346,7 @@ public void testDynamicSplitPruningWithExplicitPartitionFilter() tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -506,6 +509,7 @@ public void testDynamicSplitPruningWithExplicitPartitionFilterPartitionEvolution tablePath, ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index 62be3d6672c3..8acf4813c473 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -490,6 +490,7 @@ private static IcebergTableHandle createTableHandle(SchemaTableName schemaTableN nationTable.location(), nationTable.properties(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java index 676f4cf1f7d2..fd273b86868c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java @@ -176,6 +176,7 @@ public void testProjectionPushdown() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -261,6 +262,7 @@ public void testPredicatePushdown() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -313,6 +315,7 @@ public void testColumnPruningProjectionPushdown() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(), @@ -375,6 +378,7 @@ public void testPushdownWithDuplicateExpressions() "", ImmutableMap.of(), Optional.empty(), + Optional.empty(), false, Optional.empty(), ImmutableSet.of(),