Skip to content

Commit

Permalink
Add support for branching in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Dec 21, 2024
1 parent 23a612f commit 2a6ee7f
Show file tree
Hide file tree
Showing 18 changed files with 853 additions and 31 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProvider;
import io.trino.plugin.iceberg.procedure.AddFilesTableFromTableProcedure;
import io.trino.plugin.iceberg.procedure.AddFilesTableProcedure;
import io.trino.plugin.iceberg.procedure.CreateBranchProcedure;
import io.trino.plugin.iceberg.procedure.DropBranchProcedure;
import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure;
import io.trino.plugin.iceberg.procedure.ExpireSnapshotsTableProcedure;
import io.trino.plugin.iceberg.procedure.FastForwardProcedure;
import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure;
import io.trino.plugin.iceberg.procedure.RegisterTableProcedure;
import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure;
Expand Down Expand Up @@ -134,6 +137,9 @@ public void configure(Binder binder)
tableProcedures.addBinding().toProvider(RemoveOrphanFilesTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(AddFilesTableFromTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(CreateBranchProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(DropBranchProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(FastForwardProcedure.class).in(Scopes.SINGLETON);

newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON);
binder.bind(FunctionProvider.class).to(IcebergFunctionProvider.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
case REMOVE_ORPHAN_FILES:
case ADD_FILES:
case ADD_FILES_FROM_TABLE:
case CREATE_BRANCH:
case DROP_BRANCH:
case FAST_FORWARD:
// handled via ConnectorMetadata.executeTableExecute
}
throw new IllegalArgumentException("Unknown procedure: " + executeHandle.procedureId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class IcebergTableHandle
private final int formatVersion;
private final String tableLocation;
private final Map<String, String> storageProperties;
private final Optional<String> branch;

// Filter used during split generation and table scan, but not required to be strictly enforced by Iceberg Connector
private final TupleDomain<IcebergColumnHandle> unenforcedPredicate;
Expand Down Expand Up @@ -92,7 +93,8 @@ public static IcebergTableHandle fromJsonForDeserializationOnly(
@JsonProperty("projectedColumns") Set<IcebergColumnHandle> projectedColumns,
@JsonProperty("nameMappingJson") Optional<String> nameMappingJson,
@JsonProperty("tableLocation") String tableLocation,
@JsonProperty("storageProperties") Map<String, String> storageProperties)
@JsonProperty("storageProperties") Map<String, String> storageProperties,
@JsonProperty("branch") Optional<String> branch)
{
return new IcebergTableHandle(
catalog,
Expand All @@ -111,6 +113,7 @@ public static IcebergTableHandle fromJsonForDeserializationOnly(
tableLocation,
storageProperties,
Optional.empty(),
branch,
false,
Optional.empty(),
ImmutableSet.of(),
Expand All @@ -134,6 +137,7 @@ public IcebergTableHandle(
String tableLocation,
Map<String, String> storageProperties,
Optional<IcebergTablePartitioning> tablePartitioning,
Optional<String> branch,
boolean recordScannedFiles,
Optional<DataSize> maxScannedFileSize,
Set<IcebergColumnHandle> constraintColumns,
Expand All @@ -155,6 +159,7 @@ public IcebergTableHandle(
this.tableLocation = requireNonNull(tableLocation, "tableLocation is null");
this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null"));
this.tablePartitioning = requireNonNull(tablePartitioning, "tablePartitioning is null");
this.branch = requireNonNull(branch, "branch is null");
this.recordScannedFiles = recordScannedFiles;
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null"));
Expand Down Expand Up @@ -261,6 +266,12 @@ public Optional<IcebergTablePartitioning> getTablePartitioning()
return tablePartitioning;
}

@JsonProperty
public Optional<String> getBranch()
{
return branch;
}

@JsonIgnore
public boolean isRecordScannedFiles()
{
Expand Down Expand Up @@ -314,6 +325,7 @@ public IcebergTableHandle withProjectedColumns(Set<IcebergColumnHandle> projecte
tableLocation,
storageProperties,
tablePartitioning,
branch,
recordScannedFiles,
maxScannedFileSize,
constraintColumns,
Expand All @@ -339,6 +351,7 @@ public IcebergTableHandle forAnalyze()
tableLocation,
storageProperties,
tablePartitioning,
branch,
recordScannedFiles,
maxScannedFileSize,
constraintColumns,
Expand All @@ -364,6 +377,7 @@ public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxSc
tableLocation,
storageProperties,
tablePartitioning,
branch,
recordScannedFiles,
Optional.of(maxScannedFileSize),
constraintColumns,
Expand All @@ -389,6 +403,7 @@ public IcebergTableHandle withTablePartitioning(Optional<IcebergTablePartitionin
tableLocation,
storageProperties,
requiredTablePartitioning,
branch,
recordScannedFiles,
maxScannedFileSize,
constraintColumns,
Expand Down Expand Up @@ -422,6 +437,7 @@ public boolean equals(Object o)
Objects.equals(nameMappingJson, that.nameMappingJson) &&
Objects.equals(tableLocation, that.tableLocation) &&
Objects.equals(storageProperties, that.storageProperties) &&
Objects.equals(branch, that.branch) &&
Objects.equals(maxScannedFileSize, that.maxScannedFileSize) &&
Objects.equals(constraintColumns, that.constraintColumns) &&
Objects.equals(forAnalyze, that.forAnalyze);
Expand All @@ -446,6 +462,7 @@ public int hashCode()
nameMappingJson,
tableLocation,
storageProperties,
branch,
recordScannedFiles,
maxScannedFileSize,
constraintColumns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class IcebergTableProperties
public static final String PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY = "parquet_bloom_filter_columns";
public static final String OBJECT_STORE_LAYOUT_ENABLED_PROPERTY = "object_store_layout_enabled";
public static final String DATA_LOCATION_PROPERTY = "data_location";
public static final String TARGET_BRANCH_PROPERTY = "target_branch";
public static final String EXTRA_PROPERTIES_PROPERTY = "extra_properties";

public static final Set<String> SUPPORTED_PROPERTIES = ImmutableSet.<String>builder()
Expand All @@ -73,6 +74,7 @@ public class IcebergTableProperties
.add(ORC_BLOOM_FILTER_FPP_PROPERTY)
.add(OBJECT_STORE_LAYOUT_ENABLED_PROPERTY)
.add(DATA_LOCATION_PROPERTY)
.add(TARGET_BRANCH_PROPERTY)
.add(EXTRA_PROPERTIES_PROPERTY)
.add(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY)
.build();
Expand Down Expand Up @@ -190,6 +192,11 @@ public IcebergTableProperties(
"File system location URI for the table's data files",
null,
false))
.add(stringProperty(
TARGET_BRANCH_PROPERTY,
"Target branch name",
null,
true))
.build();

checkState(SUPPORTED_PROPERTIES.containsAll(tableProperties.stream()
Expand Down Expand Up @@ -274,6 +281,11 @@ public static Optional<String> getDataLocation(Map<String, Object> tableProperti
return Optional.ofNullable((String) tableProperties.get(DATA_LOCATION_PROPERTY));
}

public static Optional<String> getTargetBranch(Map<String, Object> tableProperties)
{
return Optional.ofNullable((String) tableProperties.get(TARGET_BRANCH_PROPERTY));
}

public static Optional<Map<String, String>> getExtraProperties(Map<String, Object> tableProperties)
{
return Optional.ofNullable((Map<String, String>) tableProperties.get(EXTRA_PROPERTIES_PROPERTY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
Expand All @@ -37,7 +38,8 @@ public record IcebergWritableTableHandle(
IcebergFileFormat fileFormat,
Map<String, String> storageProperties,
RetryMode retryMode,
Map<String, String> fileIoProperties)
Map<String, String> fileIoProperties,
Optional<String> branch)
implements ConnectorInsertTableHandle, ConnectorOutputTableHandle
{
public IcebergWritableTableHandle
Expand All @@ -53,6 +55,7 @@ public record IcebergWritableTableHandle(
requireNonNull(retryMode, "retryMode is null");
checkArgument(partitionsSpecsAsJson.containsKey(partitionSpecId), "partitionSpecId missing from partitionSpecs");
fileIoProperties = ImmutableMap.copyOf(requireNonNull(fileIoProperties, "fileIoProperties is null"));
requireNonNull(branch, "branch is null");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.procedure;

import com.google.common.collect.ImmutableList;
import com.google.inject.Provider;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.session.PropertyMetadata;

import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.CREATE_BRANCH;
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
import static io.trino.spi.session.PropertyMetadata.stringProperty;

public class CreateBranchProcedure
implements Provider<TableProcedureMetadata>
{
@Override
public TableProcedureMetadata get()
{
return new TableProcedureMetadata(
CREATE_BRANCH.name(),
coordinatorOnly(),
ImmutableList.<PropertyMetadata<?>>builder()
.add(stringProperty(
"name",
"Branch name",
null,
false))
.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.procedure;

import com.google.common.collect.ImmutableList;
import com.google.inject.Provider;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.session.PropertyMetadata;

import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DROP_BRANCH;
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
import static io.trino.spi.session.PropertyMetadata.stringProperty;

public class DropBranchProcedure
implements Provider<TableProcedureMetadata>
{
@Override
public TableProcedureMetadata get()
{
return new TableProcedureMetadata(
DROP_BRANCH.name(),
coordinatorOnly(),
ImmutableList.<PropertyMetadata<?>>builder()
.add(stringProperty(
"name",
"Branch name",
null,
false))
.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.procedure;

import com.google.common.collect.ImmutableList;
import com.google.inject.Provider;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.session.PropertyMetadata;

import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.FAST_FORWARD;
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
import static io.trino.spi.session.PropertyMetadata.stringProperty;

public class FastForwardProcedure
implements Provider<TableProcedureMetadata>
{
@Override
public TableProcedureMetadata get()
{
return new TableProcedureMetadata(
FAST_FORWARD.name(),
coordinatorOnly(),
ImmutableList.<PropertyMetadata<?>>builder()
.add(stringProperty(
"from",
"Branch to fast-forward",
null,
false))
.add(stringProperty(
"to",
"Ref for the from branch to be fast forwarded to",
null,
false))
.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.procedure;

import static java.util.Objects.requireNonNull;

public record IcebergCreateBranchHandle(String name)
implements IcebergProcedureHandle
{
public IcebergCreateBranchHandle
{
requireNonNull(name, "name is null");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.procedure;

import static java.util.Objects.requireNonNull;

public record IcebergDropBranchHandle(String name)
implements IcebergProcedureHandle
{
public IcebergDropBranchHandle
{
requireNonNull(name, "name is null");
}
}
Loading

0 comments on commit 2a6ee7f

Please sign in to comment.