From 35a3203748e0b51d8cd1fa5d529f3dc615fc8358 Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Tue, 25 Feb 2025 10:51:23 -0800 Subject: [PATCH 1/9] impl --- build.sbt | 34 +- .../org/apache/iceberg/PartitionSpec.java | 659 ++++++++++++++++++ 2 files changed, 680 insertions(+), 13 deletions(-) create mode 100644 icebergShaded/src/main/java/org/apache/iceberg/PartitionSpec.java diff --git a/build.sbt b/build.sbt index 94ddfe4a738..77ed9711b41 100644 --- a/build.sbt +++ b/build.sbt @@ -848,8 +848,6 @@ lazy val iceberg = (project in file("iceberg")) ) // scalastyle:on println -lazy val generateIcebergJarsTask = TaskKey[Unit]("generateIcebergJars", "Generate Iceberg JARs") - lazy val icebergShaded = (project in file("icebergShaded")) .dependsOn(spark % "provided") .disablePlugins(JavaFormatterPlugin, ScalafmtPlugin) @@ -857,17 +855,19 @@ lazy val icebergShaded = (project in file("icebergShaded")) name := "iceberg-shaded", commonSettings, skipReleaseSettings, - - // Compile, patch and generated Iceberg JARs - generateIcebergJarsTask := { - import sys.process._ - val scriptPath = baseDirectory.value / "generate_iceberg_jars.py" - // Download iceberg code in `iceberg_src` dir and generate the JARs in `lib` dir - Seq("python3", scriptPath.getPath)! - }, - Compile / unmanagedJars := (Compile / unmanagedJars).dependsOn(generateIcebergJarsTask).value, - cleanFiles += baseDirectory.value / "iceberg_src", - cleanFiles += baseDirectory.value / "lib", + libraryDependencies ++= Seq( + // Fix Iceberg's legacy java.lang.NoClassDefFoundError: scala/jdk/CollectionConverters$ error + // due to legacy scala. + "org.scala-lang.modules" %% "scala-collection-compat" % "2.1.1", + "org.apache.iceberg" % "iceberg-core" % "1.8.0" excludeAll ( + ExclusionRule("com.fasterxml.jackson.core"), + ExclusionRule("com.fasterxml.jackson.module") + ), + "org.apache.iceberg" % "iceberg-hive-metastore" % "1.8.0" excludeAll ( + ExclusionRule("com.fasterxml.jackson.core"), + ExclusionRule("com.fasterxml.jackson.module") + ), + ), // Generated shaded Iceberg JARs Compile / packageBin := assembly.value, @@ -877,8 +877,16 @@ lazy val icebergShaded = (project in file("icebergShaded")) assembly / assemblyShadeRules := Seq( ShadeRule.rename("org.apache.iceberg.**" -> "shadedForDelta.@0").inAll, ), + assembly / assemblyMergeStrategy := { + case PathList("shadedForDelta", "org", "apache", "iceberg", "PartitionSpec$Builder.class") => + MergeStrategy.first + case PathList("shadedForDelta", "org", "apache", "iceberg", "PartitionSpec.class") => + MergeStrategy.first + case x => (assemblyMergeStrategy in assembly).value(x) + }, assemblyPackageScala / assembleArtifact := false, // Make the 'compile' invoke the 'assembly' task to generate the uber jar. + Compile / packageBin := assembly.value ) lazy val hudi = (project in file("hudi")) diff --git a/icebergShaded/src/main/java/org/apache/iceberg/PartitionSpec.java b/icebergShaded/src/main/java/org/apache/iceberg/PartitionSpec.java new file mode 100644 index 00000000000..20d3e7b4664 --- /dev/null +++ b/icebergShaded/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -0,0 +1,659 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.transforms.UnknownTransform; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.StructType; + +/** + * Represents how to produce partition data for a table. + * + *

Partition data is produced by transforming columns in a table. Each column transform is + * represented by a named {@link PartitionField}. + */ +public class PartitionSpec implements Serializable { + // IDs for partition fields start at 1000 + private static final int PARTITION_DATA_ID_START = 1000; + + private final Schema schema; + + // this is ordered so that DataFile has a consistent schema + private final int specId; + private final PartitionField[] fields; + private transient volatile ListMultimap fieldsBySourceId = null; + private transient volatile Class[] lazyJavaClasses = null; + private transient volatile StructType lazyPartitionType = null; + private transient volatile StructType lazyRawPartitionType = null; + private transient volatile List fieldList = null; + private final int lastAssignedFieldId; + + private PartitionSpec( + Schema schema, int specId, List fields, int lastAssignedFieldId) { + this.schema = schema; + this.specId = specId; + this.fields = fields.toArray(new PartitionField[0]); + this.lastAssignedFieldId = lastAssignedFieldId; + } + + /** Returns the {@link Schema} for this spec. */ + public Schema schema() { + return schema; + } + + /** Returns the ID of this spec. */ + public int specId() { + return specId; + } + + /** Returns the list of {@link PartitionField partition fields} for this spec. */ + public List fields() { + return lazyFieldList(); + } + + public boolean isPartitioned() { + return fields.length > 0 && fields().stream().anyMatch(f -> !f.transform().isVoid()); + } + + public boolean isUnpartitioned() { + return !isPartitioned(); + } + + int lastAssignedFieldId() { + return lastAssignedFieldId; + } + + public UnboundPartitionSpec toUnbound() { + UnboundPartitionSpec.Builder builder = UnboundPartitionSpec.builder().withSpecId(specId); + + for (PartitionField field : fields) { + builder.addField( + field.transform().toString(), field.sourceId(), field.fieldId(), field.name()); + } + + return builder.build(); + } + + /** + * Returns the {@link PartitionField field} that partitions the given source field + * + * @param fieldId a field id from the source schema + * @return the {@link PartitionField field} that partitions the given source field + */ + public List getFieldsBySourceId(int fieldId) { + return lazyFieldsBySourceId().get(fieldId); + } + + /** Returns a {@link StructType} for partition data defined by this spec. */ + public StructType partitionType() { + if (lazyPartitionType == null) { + synchronized (this) { + if (lazyPartitionType == null) { + List structFields = Lists.newArrayListWithExpectedSize(fields.length); + + for (PartitionField field : fields) { + Type sourceType = schema.findType(field.sourceId()); + Type resultType = field.transform().getResultType(sourceType); + structFields.add(Types.NestedField.optional(field.fieldId(), field.name(), resultType)); + } + + this.lazyPartitionType = Types.StructType.of(structFields); + } + } + } + + return lazyPartitionType; + } + + /** + * Returns a struct matching partition information as written into manifest files. See {@link + * #partitionType()} for a struct with field ID's potentially re-assigned to avoid conflict. + */ + public StructType rawPartitionType() { + if (schema.idsToOriginal().isEmpty()) { + // not re-assigned. + return partitionType(); + } + if (lazyRawPartitionType == null) { + synchronized (this) { + if (lazyRawPartitionType == null) { + this.lazyRawPartitionType = + StructType.of( + partitionType().fields().stream() + .map(f -> f.withFieldId(schema.idsToOriginal().get(f.fieldId()))) + .collect(Collectors.toList())); + } + } + } + + return lazyRawPartitionType; + } + + public Class[] javaClasses() { + if (lazyJavaClasses == null) { + synchronized (this) { + if (lazyJavaClasses == null) { + Class[] classes = new Class[fields.length]; + for (int i = 0; i < fields.length; i += 1) { + PartitionField field = fields[i]; + if (field.transform() instanceof UnknownTransform) { + classes[i] = Object.class; + } else { + Type sourceType = schema.findType(field.sourceId()); + Type result = field.transform().getResultType(sourceType); + classes[i] = result.typeId().javaClass(); + } + } + + this.lazyJavaClasses = classes; + } + } + } + + return lazyJavaClasses; + } + + @SuppressWarnings("unchecked") + private T get(StructLike data, int pos, Class javaClass) { + return data.get(pos, (Class) javaClass); + } + + private String escape(String string) { + try { + return URLEncoder.encode(string, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + public String partitionToPath(StructLike data) { + StringBuilder sb = new StringBuilder(); + Class[] javaClasses = javaClasses(); + List outputFields = partitionType().fields(); + for (int i = 0; i < javaClasses.length; i += 1) { + PartitionField field = fields[i]; + Type type = outputFields.get(i).type(); + String valueString = field.transform().toHumanString(type, get(data, i, javaClasses[i])); + + if (i > 0) { + sb.append("/"); + } + sb.append(escape(field.name())).append("=").append(escape(valueString)); + } + return sb.toString(); + } + + /** + * Returns true if this spec is equivalent to the other, with partition field ids ignored. That + * is, if both specs have the same number of fields, field order, field name, source columns, and + * transforms. + * + * @param other another PartitionSpec + * @return true if the specs have the same fields, source columns, and transforms. + */ + public boolean compatibleWith(PartitionSpec other) { + if (equals(other)) { + return true; + } + + if (fields.length != other.fields.length) { + return false; + } + + for (int i = 0; i < fields.length; i += 1) { + PartitionField thisField = fields[i]; + PartitionField thatField = other.fields[i]; + if (thisField.sourceId() != thatField.sourceId() + || !thisField.transform().toString().equals(thatField.transform().toString()) + || !thisField.name().equals(thatField.name())) { + return false; + } + } + + return true; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (!(other instanceof PartitionSpec)) { + return false; + } + + PartitionSpec that = (PartitionSpec) other; + if (this.specId != that.specId) { + return false; + } + return Arrays.equals(fields, that.fields); + } + + @Override + public int hashCode() { + return 31 * Integer.hashCode(specId) + Arrays.hashCode(fields); + } + + private List lazyFieldList() { + if (fieldList == null) { + synchronized (this) { + if (fieldList == null) { + this.fieldList = ImmutableList.copyOf(fields); + } + } + } + return fieldList; + } + + private ListMultimap lazyFieldsBySourceId() { + if (fieldsBySourceId == null) { + synchronized (this) { + if (fieldsBySourceId == null) { + ListMultimap multiMap = + Multimaps.newListMultimap( + Maps.newHashMap(), () -> Lists.newArrayListWithCapacity(fields.length)); + for (PartitionField field : fields) { + multiMap.put(field.sourceId(), field); + } + this.fieldsBySourceId = multiMap; + } + } + } + + return fieldsBySourceId; + } + + /** + * Returns the source field ids for identity partitions. + * + * @return a set of source ids for the identity partitions. + */ + public Set identitySourceIds() { + Set sourceIds = Sets.newHashSet(); + for (PartitionField field : fields()) { + if ("identity".equals(field.transform().toString())) { + sourceIds.add(field.sourceId()); + } + } + + return sourceIds; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("["); + for (PartitionField field : fields) { + sb.append("\n"); + sb.append(" ").append(field); + } + if (fields.length > 0) { + sb.append("\n"); + } + sb.append("]"); + return sb.toString(); + } + + private static final PartitionSpec UNPARTITIONED_SPEC = + new PartitionSpec(new Schema(), 0, ImmutableList.of(), unpartitionedLastAssignedId()); + + /** + * Returns a spec for unpartitioned tables. + * + * @return a partition spec with no partitions + */ + public static PartitionSpec unpartitioned() { + return UNPARTITIONED_SPEC; + } + + private static int unpartitionedLastAssignedId() { + return PARTITION_DATA_ID_START - 1; + } + + /** + * Creates a new {@link Builder partition spec builder} for the given {@link Schema}. + * + * @param schema a schema + * @return a partition spec builder for the given schema + */ + public static Builder builderFor(Schema schema) { + return new Builder(schema); + } + + /** + * Used to create valid {@link PartitionSpec partition specs}. + * + *

Call {@link #builderFor(Schema)} to create a new builder. + */ + public static class Builder { + private final Schema schema; + private final List fields = Lists.newArrayList(); + private final Set partitionNames = Sets.newHashSet(); + private final Map, PartitionField> dedupFields = Maps.newHashMap(); + private int specId = 0; + private final AtomicInteger lastAssignedFieldId = + new AtomicInteger(unpartitionedLastAssignedId()); + // check if there are conflicts between partition and schema field name + private boolean checkConflicts = false; + private boolean caseSensitive = true; + + private Builder(Schema schema) { + this.schema = schema; + } + + private int nextFieldId() { + return lastAssignedFieldId.incrementAndGet(); + } + + private void checkAndAddPartitionName(String name) { + checkAndAddPartitionName(name, null); + } + + Builder checkConflicts(boolean check) { + checkConflicts = check; + return this; + } + + private void checkAndAddPartitionName(String name, Integer sourceColumnId) { + Types.NestedField schemaField = + this.caseSensitive ? schema.findField(name) : schema.caseInsensitiveFindField(name); + if (checkConflicts) { + if (sourceColumnId != null) { + // for identity transform case we allow conflicts between partition and schema field name + // as + // long as they are sourced from the same schema field + Preconditions.checkArgument( + schemaField == null || schemaField.fieldId() == sourceColumnId, + "Cannot create identity partition sourced from different field in schema: %s", + name); + } else { + // for all other transforms we don't allow conflicts between partition name and schema + // field name + Preconditions.checkArgument( + schemaField == null, + "Cannot create partition from name that exists in schema: %s", + name); + } + } + Preconditions.checkArgument(!name.isEmpty(), "Cannot use empty partition name: %s", name); + Preconditions.checkArgument( + !partitionNames.contains(name), "Cannot use partition name more than once: %s", name); + partitionNames.add(name); + } + + private void checkForRedundantPartitions(PartitionField field) { + Map.Entry dedupKey = + new AbstractMap.SimpleEntry<>(field.sourceId(), field.transform().dedupName()); + PartitionField partitionField = dedupFields.get(dedupKey); + Preconditions.checkArgument( + partitionField == null, + "Cannot add redundant partition: %s conflicts with %s", + partitionField, + field); + dedupFields.put(dedupKey, field); + } + + public Builder caseSensitive(boolean sensitive) { + this.caseSensitive = sensitive; + return this; + } + + public Builder withSpecId(int newSpecId) { + this.specId = newSpecId; + return this; + } + + private Types.NestedField findSourceColumn(String sourceName) { + Types.NestedField sourceColumn = + this.caseSensitive + ? schema.findField(sourceName) + : schema.caseInsensitiveFindField(sourceName); + Preconditions.checkArgument( + sourceColumn != null, "Cannot find source column: %s", sourceName); + return sourceColumn; + } + + Builder identity(String sourceName, String targetName) { + return identity(findSourceColumn(sourceName), targetName); + } + + private Builder identity(Types.NestedField sourceColumn, String targetName) { + checkAndAddPartitionName(targetName, sourceColumn.fieldId()); + PartitionField field = + new PartitionField( + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.identity()); + checkForRedundantPartitions(field); + fields.add(field); + return this; + } + + public Builder identity(String sourceName) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + return identity(sourceColumn, schema.findColumnName(sourceColumn.fieldId())); + } + + public Builder year(String sourceName, String targetName) { + return year(findSourceColumn(sourceName), targetName); + } + + private Builder year(Types.NestedField sourceColumn, String targetName) { + checkAndAddPartitionName(targetName); + PartitionField field = + new PartitionField(sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.year()); + checkForRedundantPartitions(field); + fields.add(field); + return this; + } + + public Builder year(String sourceName) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + String columnName = schema.findColumnName(sourceColumn.fieldId()); + return year(sourceColumn, columnName + "_year"); + } + + public Builder month(String sourceName, String targetName) { + return month(findSourceColumn(sourceName), targetName); + } + + private Builder month(Types.NestedField sourceColumn, String targetName) { + checkAndAddPartitionName(targetName); + PartitionField field = + new PartitionField(sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.month()); + checkForRedundantPartitions(field); + fields.add(field); + return this; + } + + public Builder month(String sourceName) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + String columnName = schema.findColumnName(sourceColumn.fieldId()); + return month(sourceColumn, columnName + "_month"); + } + + public Builder day(String sourceName, String targetName) { + return day(findSourceColumn(sourceName), targetName); + } + + private Builder day(Types.NestedField sourceColumn, String targetName) { + checkAndAddPartitionName(targetName); + PartitionField field = + new PartitionField(sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.day()); + checkForRedundantPartitions(field); + fields.add(field); + return this; + } + + public Builder day(String sourceName) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + String columnName = schema.findColumnName(sourceColumn.fieldId()); + return day(sourceColumn, columnName + "_day"); + } + + public Builder hour(String sourceName, String targetName) { + return hour(findSourceColumn(sourceName), targetName); + } + + private Builder hour(Types.NestedField sourceColumn, String targetName) { + checkAndAddPartitionName(targetName); + PartitionField field = + new PartitionField(sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.hour()); + checkForRedundantPartitions(field); + fields.add(field); + return this; + } + + public Builder hour(String sourceName) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + String columnName = schema.findColumnName(sourceColumn.fieldId()); + return hour(sourceColumn, columnName + "_hour"); + } + + public Builder bucket(String sourceName, int numBuckets, String targetName) { + return bucket(findSourceColumn(sourceName), numBuckets, targetName); + } + + private Builder bucket(Types.NestedField sourceColumn, int numBuckets, String targetName) { + checkAndAddPartitionName(targetName); + fields.add( + new PartitionField( + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.bucket(numBuckets))); + return this; + } + + public Builder bucket(String sourceName, int numBuckets) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + String columnName = schema.findColumnName(sourceColumn.fieldId()); + return bucket(sourceColumn, numBuckets, columnName + "_bucket"); + } + + public Builder truncate(String sourceName, int width, String targetName) { + return truncate(findSourceColumn(sourceName), width, targetName); + } + + private Builder truncate(Types.NestedField sourceColumn, int width, String targetName) { + checkAndAddPartitionName(targetName); + fields.add( + new PartitionField( + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.truncate(width))); + return this; + } + + public Builder truncate(String sourceName, int width) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + String columnName = schema.findColumnName(sourceColumn.fieldId()); + return truncate(sourceColumn, width, columnName + "_trunc"); + } + + public Builder alwaysNull(String sourceName, String targetName) { + return alwaysNull(findSourceColumn(sourceName), targetName); + } + + private Builder alwaysNull(Types.NestedField sourceColumn, String targetName) { + checkAndAddPartitionName( + targetName, sourceColumn.fieldId()); // can duplicate a source column name + fields.add( + new PartitionField( + sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.alwaysNull())); + return this; + } + + public Builder alwaysNull(String sourceName) { + Types.NestedField sourceColumn = findSourceColumn(sourceName); + String columnName = schema.findColumnName(sourceColumn.fieldId()); + return alwaysNull(sourceColumn, columnName + "_null"); + } + + // add a partition field with an auto-increment partition field id starting from + // PARTITION_DATA_ID_START + Builder add(int sourceId, String name, Transform transform) { + return add(sourceId, nextFieldId(), name, transform); + } + + Builder add(int sourceId, int fieldId, String name, Transform transform) { + checkAndAddPartitionName(name, sourceId); + fields.add(new PartitionField(sourceId, fieldId, name, transform)); + lastAssignedFieldId.getAndAccumulate(fieldId, Math::max); + return this; + } + + public PartitionSpec build() { + PartitionSpec spec = buildUnchecked(); + checkCompatibility(spec, schema); + return spec; + } + + PartitionSpec buildUnchecked() { + return new PartitionSpec(schema, specId, fields, lastAssignedFieldId.get()); + } + } + + static void checkCompatibility(PartitionSpec spec, Schema schema) { + for (PartitionField field : spec.fields) { + Type sourceType = schema.findType(field.sourceId()); + Transform transform = field.transform(); + // In the case of a Version 1 partition-spec field gets deleted, + // it is replaced with a void transform, see: + // https://iceberg.apache.org/spec/#partition-transforms + // We don't care about the source type since a VoidTransform is always compatible and skip the + // checks + if (!transform.equals(Transforms.alwaysNull())) { + ValidationException.check( + sourceType != null, "Cannot find source column for partition field: %s", field); + ValidationException.check( + sourceType.isPrimitiveType(), + "Cannot partition by non-primitive source field: %s", + sourceType); + ValidationException.check( + transform.canTransform(sourceType), + "Invalid source type %s for transform: %s", + sourceType, + transform); + } + } + } + + static boolean hasSequentialIds(PartitionSpec spec) { + for (int i = 0; i < spec.fields.length; i += 1) { + if (spec.fields[i].fieldId() != PARTITION_DATA_ID_START + i) { + return false; + } + } + return true; + } +} From 5a6324b9821eb2f538417f3d68e073cd063080b9 Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Tue, 25 Feb 2025 10:58:32 -0800 Subject: [PATCH 2/9] change conversion transaction --- .../IcebergConversionTransaction.scala | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala index bebdcc72cab..8cee3b99440 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala @@ -34,6 +34,8 @@ import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.conf.Configuration import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PartitionSpec, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction} import shadedForDelta.org.apache.iceberg.ExpireSnapshots +import shadedForDelta.org.apache.iceberg.MetadataUpdate +import shadedForDelta.org.apache.iceberg.MetadataUpdate.{AddPartitionSpec, AddSchema} import shadedForDelta.org.apache.iceberg.mapping.MappingUtil import shadedForDelta.org.apache.iceberg.mapping.NameMappingParser @@ -64,8 +66,10 @@ class IcebergConversionTransaction( protected val postCommitSnapshot: Snapshot, protected val tableOp: IcebergTableOp = WRITE_TABLE, protected val lastConvertedIcebergSnapshotId: Option[Long] = None, - protected val lastConvertedDeltaVersion: Option[Long] = None - ) extends DeltaLogging { + protected val lastConvertedDeltaVersion: Option[Long] = None, + protected val metadataUpdates: java.util.ArrayList[MetadataUpdate] = + new java.util.ArrayList[MetadataUpdate]() + ) extends DeltaLogging { /////////////////////////// // Nested Helper Classes // @@ -307,7 +311,8 @@ class IcebergConversionTransaction( log" Setting new Iceberg schema:\n ${MDC(DeltaLogKeys.SCHEMA, icebergSchema)}") } - txn.setSchema(icebergSchema).commit() + metadataUpdates.add( + new AddSchema(icebergSchema, postCommitSnapshot.metadata.columnMappingMaxId.toInt)) recordDeltaEvent( postCommitSnapshot.deltaLog, @@ -351,12 +356,7 @@ class IcebergConversionTransaction( assert(fileUpdates.forall(_.hasCommitted), "Cannot commit. You have uncommitted changes.") val nameMapping = NameMappingParser.toJson(MappingUtil.create(icebergSchema)) - - // hard code dummy delta version as -1 for CREATE_TABLE, which will be later - // set to correct version in setSchemaTxn. -1 is chosen because it is less than the smallest - // possible legitimate Delta version which is 0. - val deltaVersion = if (tableOp == CREATE_TABLE) -1 else postCommitSnapshot.version - + val deltaVersion = postCommitSnapshot.version var updateTxn = txn.updateProperties() updateTxn = updateTxn.set(IcebergConverter.DELTA_VERSION_PROPERTY, deltaVersion.toString) .set(IcebergConverter.DELTA_TIMESTAMP_PROPERTY, postCommitSnapshot.timestamp.toString) @@ -392,19 +392,21 @@ class IcebergConversionTransaction( ) } try { - txn.commitTransaction() if (tableOp == CREATE_TABLE) { // Iceberg CREATE_TABLE reassigns the field id in schema, which // is overwritten by setting Delta schema with Delta generated field id to ensure // consistency between field id in Iceberg schema after conversion and field id in // parquet files written by Delta. - val setSchemaTxn = createIcebergTxn(Some(WRITE_TABLE)) - setSchemaTxn.setSchema(icebergSchema).commit() - setSchemaTxn.updateProperties() - .set(IcebergConverter.DELTA_VERSION_PROPERTY, postCommitSnapshot.version.toString) - .commit() - setSchemaTxn.commitTransaction() + metadataUpdates.add( + new AddSchema(icebergSchema, postCommitSnapshot.metadata.columnMappingMaxId.toInt) + ) + if (postCommitSnapshot.metadata.partitionColumns.nonEmpty) { + metadataUpdates.add( + new AddPartitionSpec(partitionSpec) + ) + } } + txn.commitTransaction() recordIcebergCommit() } catch { case NonFatal(e) => From 1bde7acea9ba87186d61a3f022e5c1deb3bdc228 Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Tue, 25 Feb 2025 13:42:26 -0800 Subject: [PATCH 3/9] add HiveCatalog and HiveTableOperations --- build.sbt | 39 + .../org/apache/iceberg/hive/HiveCatalog.java | 904 ++++++++++++++++++ .../iceberg/hive/HiveTableOperations.java | 585 ++++++++++++ 3 files changed, 1528 insertions(+) create mode 100644 icebergShaded/src/main/java/org/apache/iceberg/hive/HiveCatalog.java create mode 100644 icebergShaded/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java diff --git a/build.sbt b/build.sbt index 77ed9711b41..99486644016 100644 --- a/build.sbt +++ b/build.sbt @@ -867,6 +867,33 @@ lazy val icebergShaded = (project in file("icebergShaded")) ExclusionRule("com.fasterxml.jackson.core"), ExclusionRule("com.fasterxml.jackson.module") ), + "org.apache.hadoop" % "hadoop-client" % "2.7.3" excludeAll ( + ExclusionRule("org.apache.avro"), + ExclusionRule("org.slf4j"), + ExclusionRule("commons-beanutils"), + ExclusionRule("org.datanucleus"), + ), + "org.apache.hive" % "hive-metastore" % "2.3.8" excludeAll ( + ExclusionRule("org.apache.avro"), + ExclusionRule("org.slf4j"), + ExclusionRule("org.pentaho"), + ExclusionRule("org.apache.hbase"), + ExclusionRule("org.apache.logging.log4j"), + ExclusionRule("co.cask.tephra"), + ExclusionRule("com.google.code.findbugs"), + ExclusionRule("org.eclipse.jetty.aggregate"), + ExclusionRule("org.eclipse.jetty.orbit"), + ExclusionRule("org.apache.parquet"), + ExclusionRule("com.tdunning"), + ExclusionRule("javax.transaction"), + ExclusionRule("com.zaxxer"), + ExclusionRule("org.apache.ant"), + ExclusionRule("javax.servlet"), + ExclusionRule("javax.jdo"), + ExclusionRule("commons-beanutils"), + ExclusionRule("org.datanucleus"), + + ), ), // Generated shaded Iceberg JARs @@ -882,6 +909,18 @@ lazy val icebergShaded = (project in file("icebergShaded")) MergeStrategy.first case PathList("shadedForDelta", "org", "apache", "iceberg", "PartitionSpec.class") => MergeStrategy.first + case PathList("shadedForDelta", "org", "apache", "iceberg", "hive", "HiveCatalog.class") => + MergeStrategy.first + case PathList("shadedForDelta", "org", "apache", "iceberg", "hive", "HiveCatalog$1.class") => + MergeStrategy.first + case PathList("shadedForDelta", "org", "apache", "iceberg", "hive", "HiveCatalog$ViewAwareTableBuilder.class") => + MergeStrategy.first + case PathList("shadedForDelta", "org", "apache", "iceberg", "hive", "HiveCatalog$TableAwareViewBuilder.class") => + MergeStrategy.first + case PathList("shadedForDelta", "org", "apache", "iceberg", "hive", "HiveTableOperations.class") => + MergeStrategy.first + case PathList("shadedForDelta", "org", "apache", "iceberg", "hive", "HiveTableOperations$1.class") => + MergeStrategy.first case x => (assemblyMergeStrategy in assembly).value(x) }, assemblyPackageScala / assembleArtifact := false, diff --git a/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveCatalog.java new file mode 100644 index 00000000000..d2f49c92c93 --- /dev/null +++ b/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -0,0 +1,904 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hive; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchIcebergViewException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.view.BaseMetastoreViewCatalog; +import org.apache.iceberg.view.View; +import org.apache.iceberg.view.ViewBuilder; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewOperations; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HiveCatalog extends BaseMetastoreViewCatalog + implements SupportsNamespaces, Configurable { + public static final String LIST_ALL_TABLES = "list-all-tables"; + public static final String LIST_ALL_TABLES_DEFAULT = "false"; + + public static final String HMS_TABLE_OWNER = "hive.metastore.table.owner"; + public static final String HMS_DB_OWNER = "hive.metastore.database.owner"; + public static final String HMS_DB_OWNER_TYPE = "hive.metastore.database.owner-type"; + + // MetastoreConf is not available with current Hive version + static final String HIVE_CONF_CATALOG = "metastore.catalog.default"; + + private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + + private String name; + private Configuration conf; + private FileIO fileIO; + private ClientPool clients; + private boolean listAllTables = false; + private Map catalogProperties; + + private List metadataUpdates = new ArrayList(); + + public HiveCatalog() {} + + public void initialize(String inputName, Map properties, List metadataUpdates) { + initialize(inputName, properties); + this.metadataUpdates = metadataUpdates; + } + + @Override + public void initialize(String inputName, Map properties) { + this.catalogProperties = ImmutableMap.copyOf(properties); + this.name = inputName; + if (conf == null) { + LOG.warn("No Hadoop Configuration was set, using the default environment Configuration"); + this.conf = new Configuration(); + } + + if (properties.containsKey(CatalogProperties.URI)) { + this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, properties.get(CatalogProperties.URI)); + } + + if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) { + this.conf.set( + HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + LocationUtil.stripTrailingSlash(properties.get(CatalogProperties.WAREHOUSE_LOCATION))); + } + + this.listAllTables = + Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT)); + + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + this.fileIO = + fileIOImpl == null + ? new HadoopFileIO(conf) + : CatalogUtil.loadFileIO(fileIOImpl, properties, conf); + + this.clients = new CachedClientPool(conf, properties); + } + + @Override + public TableBuilder buildTable(TableIdentifier identifier, Schema schema) { + return new ViewAwareTableBuilder(identifier, schema); + } + + @Override + public ViewBuilder buildView(TableIdentifier identifier) { + return new TableAwareViewBuilder(identifier); + } + + @Override + public List listTables(Namespace namespace) { + Preconditions.checkArgument( + isValidateNamespace(namespace), "Missing database in namespace: %s", namespace); + String database = namespace.level(0); + + try { + List tableNames = clients.run(client -> client.getAllTables(database)); + List tableIdentifiers; + + if (listAllTables) { + tableIdentifiers = + tableNames.stream() + .map(t -> TableIdentifier.of(namespace, t)) + .collect(Collectors.toList()); + } else { + tableIdentifiers = + listIcebergTables( + tableNames, namespace, BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE); + } + + LOG.debug( + "Listing of namespace: {} resulted in the following tables: {}", + namespace, + tableIdentifiers); + return tableIdentifiers; + + } catch (UnknownDBException e) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + + } catch (TException e) { + throw new RuntimeException("Failed to list all tables under namespace " + namespace, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to listTables", e); + } + } + + @Override + public List listViews(Namespace namespace) { + Preconditions.checkArgument( + isValidateNamespace(namespace), "Missing database in namespace: %s", namespace); + + try { + String database = namespace.level(0); + List viewNames = + clients.run(client -> client.getTables(database, "*", TableType.VIRTUAL_VIEW)); + + // Retrieving the Table objects from HMS in batches to avoid OOM + List filteredTableIdentifiers = Lists.newArrayList(); + Iterable> viewNameSets = Iterables.partition(viewNames, 100); + + for (List viewNameSet : viewNameSets) { + filteredTableIdentifiers.addAll( + listIcebergTables(viewNameSet, namespace, HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE)); + } + + return filteredTableIdentifiers; + } catch (UnknownDBException e) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + + } catch (TException e) { + throw new RuntimeException("Failed to list all views under namespace " + namespace, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to listViews", e); + } + } + + @Override + public String name() { + return name; + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + if (!isValidIdentifier(identifier)) { + return false; + } + + String database = identifier.namespace().level(0); + + TableOperations ops = newTableOps(identifier); + TableMetadata lastMetadata = null; + if (purge) { + try { + lastMetadata = ops.current(); + } catch (NotFoundException e) { + LOG.warn( + "Failed to load table metadata for table: {}, continuing drop without purge", + identifier, + e); + } + } + + try { + clients.run( + client -> { + client.dropTable( + database, + identifier.name(), + false /* do not delete data */, + false /* throw NoSuchObjectException if the table doesn't exist */); + return null; + }); + + if (purge && lastMetadata != null) { + CatalogUtil.dropTableData(ops.io(), lastMetadata); + } + + LOG.info("Dropped table: {}", identifier); + return true; + + } catch (NoSuchTableException | NoSuchObjectException e) { + LOG.info("Skipping drop, table does not exist: {}", identifier, e); + return false; + + } catch (TException e) { + throw new RuntimeException("Failed to drop " + identifier, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to dropTable", e); + } + } + + @Override + public boolean dropView(TableIdentifier identifier) { + if (!isValidIdentifier(identifier)) { + return false; + } + + try { + String database = identifier.namespace().level(0); + String viewName = identifier.name(); + + HiveViewOperations ops = (HiveViewOperations) newViewOps(identifier); + ViewMetadata lastViewMetadata = null; + try { + lastViewMetadata = ops.current(); + } catch (NotFoundException e) { + LOG.warn("Failed to load view metadata for view: {}", identifier, e); + } + + clients.run( + client -> { + client.dropTable(database, viewName, false, false); + return null; + }); + + if (lastViewMetadata != null) { + CatalogUtil.dropViewMetadata(ops.io(), lastViewMetadata); + } + + LOG.info("Dropped view: {}", identifier); + return true; + } catch (NoSuchObjectException e) { + LOG.info("Skipping drop, view does not exist: {}", identifier, e); + return false; + } catch (TException e) { + throw new RuntimeException("Failed to drop view " + identifier, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to dropView", e); + } + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier originalTo) { + renameTableOrView(from, originalTo, HiveOperationsBase.ContentType.TABLE); + } + + @Override + public void renameView(TableIdentifier from, TableIdentifier to) { + renameTableOrView(from, to, HiveOperationsBase.ContentType.VIEW); + } + + private List listIcebergTables( + List tableNames, Namespace namespace, String tableTypeProp) + throws TException, InterruptedException { + List tableObjects = + clients.run(client -> client.getTableObjectsByName(namespace.level(0), tableNames)); + return tableObjects.stream() + .filter( + table -> + table.getParameters() != null + && tableTypeProp.equalsIgnoreCase( + table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP))) + .map(table -> TableIdentifier.of(namespace, table.getTableName())) + .collect(Collectors.toList()); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + private void renameTableOrView( + TableIdentifier from, + TableIdentifier originalTo, + HiveOperationsBase.ContentType contentType) { + Preconditions.checkArgument(isValidIdentifier(from), "Invalid identifier: %s", from); + + TableIdentifier to = removeCatalogName(originalTo); + Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to); + if (!namespaceExists(to.namespace())) { + throw new NoSuchNamespaceException( + "Cannot rename %s to %s. Namespace does not exist: %s", from, to, to.namespace()); + } + + if (tableExists(to)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Cannot rename %s to %s. Table already exists", from, to); + } + + if (viewExists(to)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Cannot rename %s to %s. View already exists", from, to); + } + + String toDatabase = to.namespace().level(0); + String fromDatabase = from.namespace().level(0); + String fromName = from.name(); + + try { + Table table = clients.run(client -> client.getTable(fromDatabase, fromName)); + validateTableIsIcebergTableOrView(contentType, table, CatalogUtil.fullTableName(name, from)); + + table.setDbName(toDatabase); + table.setTableName(to.name()); + + clients.run( + client -> { + MetastoreUtil.alterTable(client, fromDatabase, fromName, table); + return null; + }); + + LOG.info("Renamed {} from {}, to {}", contentType.value(), from, to); + + } catch (NoSuchObjectException e) { + switch (contentType) { + case TABLE: + throw new NoSuchTableException("Cannot rename %s to %s. Table does not exist", from, to); + case VIEW: + throw new NoSuchViewException("Cannot rename %s to %s. View does not exist", from, to); + } + + } catch (InvalidOperationException e) { + if (e.getMessage() != null + && e.getMessage().contains(String.format("new table %s already exists", to))) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Table already exists: %s", to); + } else { + throw new RuntimeException("Failed to rename " + from + " to " + to, e); + } + + } catch (TException e) { + throw new RuntimeException("Failed to rename " + from + " to " + to, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to rename", e); + } + } + + private void validateTableIsIcebergTableOrView( + HiveOperationsBase.ContentType contentType, Table table, String fullName) { + switch (contentType) { + case TABLE: + HiveOperationsBase.validateTableIsIceberg(table, fullName); + break; + case VIEW: + HiveOperationsBase.validateTableIsIcebergView(table, fullName); + } + } + + /** + * Check whether table or metadata table exists. + * + *

Note: If a hive table with the same identifier exists in catalog, this method will return + * {@code false}. + * + * @param identifier a table identifier + * @return true if the table exists, false otherwise + */ + @Override + public boolean tableExists(TableIdentifier identifier) { + TableIdentifier baseTableIdentifier = identifier; + if (!isValidIdentifier(identifier)) { + if (!isValidMetadataIdentifier(identifier)) { + return false; + } else { + baseTableIdentifier = TableIdentifier.of(identifier.namespace().levels()); + } + } + + String database = baseTableIdentifier.namespace().level(0); + String tableName = baseTableIdentifier.name(); + try { + Table table = clients.run(client -> client.getTable(database, tableName)); + HiveOperationsBase.validateTableIsIceberg(table, fullTableName(name, baseTableIdentifier)); + return true; + } catch (NoSuchTableException | NoSuchObjectException e) { + return false; + } catch (TException e) { + throw new RuntimeException("Failed to check table existence of " + baseTableIdentifier, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to check table existence of " + baseTableIdentifier, e); + } + } + + @Override + public boolean viewExists(TableIdentifier viewIdentifier) { + if (!isValidIdentifier(viewIdentifier)) { + return false; + } + + String database = viewIdentifier.namespace().level(0); + String viewName = viewIdentifier.name(); + try { + Table table = clients.run(client -> client.getTable(database, viewName)); + HiveOperationsBase.validateTableIsIcebergView(table, fullTableName(name, viewIdentifier)); + return true; + } catch (NoSuchIcebergViewException | NoSuchObjectException e) { + return false; + } catch (TException e) { + throw new RuntimeException("Failed to check view existence of " + viewIdentifier, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to check view existence of " + viewIdentifier, e); + } + } + + @Override + public void createNamespace(Namespace namespace, Map meta) { + Preconditions.checkArgument( + !namespace.isEmpty(), "Cannot create namespace with invalid name: %s", namespace); + Preconditions.checkArgument( + isValidateNamespace(namespace), + "Cannot support multi part namespace in Hive Metastore: %s", + namespace); + Preconditions.checkArgument( + meta.get(HMS_DB_OWNER_TYPE) == null || meta.get(HMS_DB_OWNER) != null, + "Create namespace setting %s without setting %s is not allowed", + HMS_DB_OWNER_TYPE, + HMS_DB_OWNER); + try { + clients.run( + client -> { + client.createDatabase(convertToDatabase(namespace, meta)); + return null; + }); + + LOG.info("Created namespace: {}", namespace); + + } catch (AlreadyExistsException e) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + e, "Namespace already exists: %s", namespace); + + } catch (TException e) { + throw new RuntimeException( + "Failed to create namespace " + namespace + " in Hive Metastore", e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to createDatabase(name) " + namespace + " in Hive Metastore", e); + } + } + + @Override + public List listNamespaces(Namespace namespace) { + if (!isValidateNamespace(namespace) && !namespace.isEmpty()) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + if (!namespace.isEmpty()) { + return ImmutableList.of(); + } + try { + List namespaces = + clients.run(IMetaStoreClient::getAllDatabases).stream() + .map(Namespace::of) + .collect(Collectors.toList()); + + LOG.debug("Listing namespace {} returned tables: {}", namespace, namespaces); + return namespaces; + + } catch (TException e) { + throw new RuntimeException( + "Failed to list all namespace: " + namespace + " in Hive Metastore", e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to getAllDatabases() " + namespace + " in Hive Metastore", e); + } + } + + @Override + public boolean dropNamespace(Namespace namespace) { + if (!isValidateNamespace(namespace)) { + return false; + } + + try { + clients.run( + client -> { + client.dropDatabase( + namespace.level(0), + false /* deleteData */, + false /* ignoreUnknownDb */, + false /* cascade */); + return null; + }); + + LOG.info("Dropped namespace: {}", namespace); + return true; + + } catch (InvalidOperationException e) { + throw new NamespaceNotEmptyException( + e, "Namespace %s is not empty. One or more tables exist.", namespace); + + } catch (NoSuchObjectException e) { + return false; + + } catch (TException e) { + throw new RuntimeException("Failed to drop namespace " + namespace + " in Hive Metastore", e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to drop dropDatabase(name) " + namespace + " in Hive Metastore", e); + } + } + + @Override + public boolean setProperties(Namespace namespace, Map properties) { + Preconditions.checkArgument( + (properties.get(HMS_DB_OWNER_TYPE) == null) == (properties.get(HMS_DB_OWNER) == null), + "Setting %s and %s has to be performed together or not at all", + HMS_DB_OWNER_TYPE, + HMS_DB_OWNER); + Map parameter = Maps.newHashMap(); + + parameter.putAll(loadNamespaceMetadata(namespace)); + parameter.putAll(properties); + Database database = convertToDatabase(namespace, parameter); + + alterHiveDataBase(namespace, database); + LOG.debug("Successfully set properties {} for {}", properties.keySet(), namespace); + + // Always successful, otherwise exception is thrown + return true; + } + + @Override + public boolean removeProperties(Namespace namespace, Set properties) { + Preconditions.checkArgument( + properties.contains(HMS_DB_OWNER_TYPE) == properties.contains(HMS_DB_OWNER), + "Removing %s and %s has to be performed together or not at all", + HMS_DB_OWNER_TYPE, + HMS_DB_OWNER); + Map parameter = Maps.newHashMap(); + + parameter.putAll(loadNamespaceMetadata(namespace)); + properties.forEach(key -> parameter.put(key, null)); + Database database = convertToDatabase(namespace, parameter); + + alterHiveDataBase(namespace, database); + LOG.debug("Successfully removed properties {} from {}", properties, namespace); + + // Always successful, otherwise exception is thrown + return true; + } + + private void alterHiveDataBase(Namespace namespace, Database database) { + try { + clients.run( + client -> { + client.alterDatabase(namespace.level(0), database); + return null; + }); + + } catch (NoSuchObjectException | UnknownDBException e) { + throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace); + + } catch (TException e) { + throw new RuntimeException( + "Failed to list namespace under namespace: " + namespace + " in Hive Metastore", e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to getDatabase(name) " + namespace + " in Hive Metastore", e); + } + } + + @Override + public Map loadNamespaceMetadata(Namespace namespace) { + if (!isValidateNamespace(namespace)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + + try { + Database database = clients.run(client -> client.getDatabase(namespace.level(0))); + Map metadata = convertToMetadata(database); + LOG.debug("Loaded metadata for namespace {} found {}", namespace, metadata.keySet()); + return metadata; + + } catch (NoSuchObjectException | UnknownDBException e) { + throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace); + + } catch (TException e) { + throw new RuntimeException( + "Failed to list namespace under namespace: " + namespace + " in Hive Metastore", e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to getDatabase(name) " + namespace + " in Hive Metastore", e); + } + } + + @Override + protected boolean isValidIdentifier(TableIdentifier tableIdentifier) { + return tableIdentifier.namespace().levels().length == 1; + } + + private TableIdentifier removeCatalogName(TableIdentifier to) { + if (isValidIdentifier(to)) { + return to; + } + + // check if the identifier includes the catalog name and remove it + if (to.namespace().levels().length == 2 && name().equalsIgnoreCase(to.namespace().level(0))) { + return TableIdentifier.of(Namespace.of(to.namespace().level(1)), to.name()); + } + + // return the original unmodified + return to; + } + + private boolean isValidateNamespace(Namespace namespace) { + return namespace.levels().length == 1; + } + + @Override + public TableOperations newTableOps(TableIdentifier tableIdentifier) { + String dbName = tableIdentifier.namespace().level(0); + String tableName = tableIdentifier.name(); + return new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName, metadataUpdates); + } + + @Override + protected ViewOperations newViewOps(TableIdentifier identifier) { + return new HiveViewOperations(conf, clients, fileIO, name, identifier); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + // This is a little edgy since we basically duplicate the HMS location generation logic. + // Sadly I do not see a good way around this if we want to keep the order of events, like: + // - Create meta files + // - Create the metadata in HMS, and this way committing the changes + + // Create a new location based on the namespace / database if it is set on database level + try { + Database databaseData = + clients.run(client -> client.getDatabase(tableIdentifier.namespace().levels()[0])); + if (databaseData.getLocationUri() != null) { + // If the database location is set use it as a base. + return String.format("%s/%s", databaseData.getLocationUri(), tableIdentifier.name()); + } + + } catch (NoSuchObjectException e) { + throw new NoSuchNamespaceException( + e, "Namespace does not exist: %s", tableIdentifier.namespace().levels()[0]); + } catch (TException e) { + throw new RuntimeException( + String.format("Metastore operation failed for %s", tableIdentifier), e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during commit", e); + } + + // Otherwise, stick to the {WAREHOUSE_DIR}/{DB_NAME}.db/{TABLE_NAME} path + String databaseLocation = databaseLocation(tableIdentifier.namespace().levels()[0]); + return String.format("%s/%s", databaseLocation, tableIdentifier.name()); + } + + private String databaseLocation(String databaseName) { + String warehouseLocation = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname); + Preconditions.checkNotNull( + warehouseLocation, "Warehouse location is not set: hive.metastore.warehouse.dir=null"); + warehouseLocation = LocationUtil.stripTrailingSlash(warehouseLocation); + return String.format("%s/%s.db", warehouseLocation, databaseName); + } + + private Map convertToMetadata(Database database) { + + Map meta = Maps.newHashMap(); + + meta.putAll(database.getParameters()); + meta.put("location", database.getLocationUri()); + if (database.getDescription() != null) { + meta.put("comment", database.getDescription()); + } + if (database.getOwnerName() != null) { + meta.put(HMS_DB_OWNER, database.getOwnerName()); + if (database.getOwnerType() != null) { + meta.put(HMS_DB_OWNER_TYPE, database.getOwnerType().name()); + } + } + + return meta; + } + + Database convertToDatabase(Namespace namespace, Map meta) { + if (!isValidateNamespace(namespace)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + + Database database = new Database(); + Map parameter = Maps.newHashMap(); + + database.setName(namespace.level(0)); + database.setLocationUri(databaseLocation(namespace.level(0))); + + meta.forEach( + (key, value) -> { + if (key.equals("comment")) { + database.setDescription(value); + } else if (key.equals("location")) { + database.setLocationUri(value); + } else if (key.equals(HMS_DB_OWNER)) { + database.setOwnerName(value); + } else if (key.equals(HMS_DB_OWNER_TYPE) && value != null) { + database.setOwnerType(PrincipalType.valueOf(value)); + } else { + if (value != null) { + parameter.put(key, value); + } + } + }); + + if (database.getOwnerName() == null) { + database.setOwnerName(HiveHadoopUtil.currentUser()); + database.setOwnerType(PrincipalType.USER); + } + + database.setParameters(parameter); + + return database; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("name", name) + .add("uri", this.conf == null ? "" : this.conf.get(HiveConf.ConfVars.METASTOREURIS.varname)) + .toString(); + } + + @Override + public void setConf(Configuration conf) { + this.conf = new Configuration(conf); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + protected Map properties() { + return catalogProperties == null ? ImmutableMap.of() : catalogProperties; + } + + @VisibleForTesting + void setListAllTables(boolean listAllTables) { + this.listAllTables = listAllTables; + } + + @VisibleForTesting + ClientPool clientPool() { + return clients; + } + + /** + * The purpose of this class is to add view detection only for Hive-Specific tables. Hive catalog + * follows checks at different levels: 1. During refresh, it validates if the table is an iceberg + * table or not. 2. During commit, it validates if there is any concurrent commit with table or + * table-name already exists. This class helps to do the validation on an early basis. + */ + private class ViewAwareTableBuilder extends BaseMetastoreViewCatalogTableBuilder { + + private final TableIdentifier identifier; + + private ViewAwareTableBuilder(TableIdentifier identifier, Schema schema) { + super(identifier, schema); + this.identifier = identifier; + } + + @Override + public Transaction createOrReplaceTransaction() { + if (viewExists(identifier)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "View with same name already exists: %s", identifier); + } + return super.createOrReplaceTransaction(); + } + + @Override + public org.apache.iceberg.Table create() { + if (viewExists(identifier)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "View with same name already exists: %s", identifier); + } + return super.create(); + } + } + + /** + * The purpose of this class is to add table detection only for Hive-Specific view. Hive catalog + * follows checks at different levels: 1. During refresh, it validates if the view is an iceberg + * view or not. 2. During commit, it validates if there is any concurrent commit with view or + * view-name already exists. This class helps to do the validation on an early basis. + */ + private class TableAwareViewBuilder extends BaseViewBuilder { + + private final TableIdentifier identifier; + + private TableAwareViewBuilder(TableIdentifier identifier) { + super(identifier); + this.identifier = identifier; + } + + @Override + public View createOrReplace() { + if (tableExists(identifier)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Table with same name already exists: %s", identifier); + } + return super.createOrReplace(); + } + + @Override + public View create() { + if (tableExists(identifier)) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException( + "Table with same name already exists: %s", identifier); + } + return super.create(); + } + } +} diff --git a/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java new file mode 100644 index 00000000000..4c13ce013f9 --- /dev/null +++ b/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hive; + +import static org.apache.iceberg.TableProperties.GC_ENABLED; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.iceberg.BaseMetastoreOperations; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.SortOrderParser; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.hadoop.ConfigProperties; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.collect.BiMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.JsonUtil; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TODO we should be able to extract some more commonalities to BaseMetastoreTableOperations to + * avoid code duplication between this class and Metacat Tables. + */ +public class HiveTableOperations extends BaseMetastoreTableOperations + implements HiveOperationsBase { + private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class); + + private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES = + "iceberg.hive.metadata-refresh-max-retries"; + private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2; + private static final BiMap ICEBERG_TO_HMS_TRANSLATION = + ImmutableBiMap.of( + // gc.enabled in Iceberg and external.table.purge in Hive are meant to do the same things + // but with different names + GC_ENABLED, "external.table.purge"); + + /** + * Provides key translation where necessary between Iceberg and HMS props. This translation is + * needed because some properties control the same behaviour but are named differently in Iceberg + * and Hive. Therefore changes to these property pairs should be synchronized. + * + *

Example: Deleting data files upon DROP TABLE is enabled using gc.enabled=true in Iceberg and + * external.table.purge=true in Hive. Hive and Iceberg users are unaware of each other's control + * flags, therefore inconsistent behaviour can occur from e.g. a Hive user's point of view if + * external.table.purge=true is set on the HMS table but gc.enabled=false is set on the Iceberg + * table, resulting in no data file deletion. + * + * @param hmsProp The HMS property that should be translated to Iceberg property + * @return Iceberg property equivalent to the hmsProp. If no such translation exists, the original + * hmsProp is returned + */ + public static String translateToIcebergProp(String hmsProp) { + return ICEBERG_TO_HMS_TRANSLATION.inverse().getOrDefault(hmsProp, hmsProp); + } + + private final String fullName; + private final String catalogName; + private final String database; + private final String tableName; + private final Configuration conf; + private final long maxHiveTablePropertySize; + private final int metadataRefreshMaxRetries; + private final FileIO fileIO; + private final ClientPool metaClients; + + private List metadataUpdates = new ArrayList(); + + protected HiveTableOperations( + Configuration conf, + ClientPool metaClients, + FileIO fileIO, + String catalogName, + String database, + String table, + List metadataUpdates) { + this(conf, metaClients, fileIO, catalogName, database, table); + this.metadataUpdates = metadataUpdates; + } + + protected HiveTableOperations( + Configuration conf, + ClientPool metaClients, + FileIO fileIO, + String catalogName, + String database, + String table) { + this.conf = conf; + this.metaClients = metaClients; + this.fileIO = fileIO; + this.fullName = catalogName + "." + database + "." + table; + this.catalogName = catalogName; + this.database = database; + this.tableName = table; + this.metadataRefreshMaxRetries = + conf.getInt( + HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES, + HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT); + this.maxHiveTablePropertySize = + conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT); + } + + @Override + protected String tableName() { + return fullName; + } + + @Override + public FileIO io() { + return fileIO; + } + + @Override + protected void doRefresh() { + String metadataLocation = null; + try { + Table table = metaClients.run(client -> client.getTable(database, tableName)); + HiveOperationsBase.validateTableIsIceberg(table, fullName); + + metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP); + + } catch (NoSuchObjectException e) { + if (currentMetadataLocation() != null) { + throw new NoSuchTableException("No such table: %s.%s", database, tableName); + } + + } catch (TException e) { + String errMsg = + String.format("Failed to get table info from metastore %s.%s", database, tableName); + throw new RuntimeException(errMsg, e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during refresh", e); + } + + refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries); + } + + @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"}) + @Override + protected void doCommit(TableMetadata base, TableMetadata metadata) { + boolean newTable = base == null; + + // Apply metadata updates so adjustedMetadata has field id and partition spec created + // from Delta lake + TableMetadata.Builder builder = TableMetadata.buildFrom(metadata); + Schema lastAddedSchema = metadata.schema(); + for (MetadataUpdate update : metadataUpdates) { + if (update instanceof MetadataUpdate.AddSchema) { + MetadataUpdate.AddSchema addSchema = (MetadataUpdate.AddSchema) update; + builder.setCurrentSchema(addSchema.schema(), addSchema.lastColumnId()); + lastAddedSchema = addSchema.schema(); + } else if (update instanceof MetadataUpdate.AddPartitionSpec) { + // regard AddPartitionSpec as replace all existing specs as Delta Uniform only + // support one partition spec + PartitionSpec specToAdd = ((MetadataUpdate.AddPartitionSpec) update).spec().bind(lastAddedSchema); + if (!specToAdd.compatibleWith(metadata.spec())) { + HashSet idsToRemove = new HashSet(); + for (PartitionSpec spec : metadata.specs()) { + idsToRemove.add(spec.specId()); + } + builder.setDefaultPartitionSpec(specToAdd); + MetadataUpdate.RemovePartitionSpecs removeSpecs = new MetadataUpdate.RemovePartitionSpecs(idsToRemove); + removeSpecs.applyTo(builder); + } + } else { + update.applyTo(builder); + } + } + TableMetadata adjustedMetadata = builder.build(); + + String newMetadataLocation = writeNewMetadataIfRequired(newTable, adjustedMetadata); + boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf); + boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false); + + BaseMetastoreOperations.CommitStatus commitStatus = + BaseMetastoreOperations.CommitStatus.FAILURE; + boolean updateHiveTable = false; + + HiveLock lock = lockObject(base); + try { + lock.lock(); + + Table tbl = loadHmsTable(); + + if (tbl != null) { + // If we try to create the table but the metadata location is already set, then we had a + // concurrent commit + if (newTable + && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) + != null) { + if (TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(tbl.getTableType())) { + throw new AlreadyExistsException( + "View with same name already exists: %s.%s", database, tableName); + } + throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName); + } + + updateHiveTable = true; + LOG.debug("Committing existing table: {}", fullName); + } else { + tbl = + newHmsTable( + adjustedMetadata.property(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser())); + LOG.debug("Committing new table: {}", fullName); + } + + tbl.setSd( + HiveOperationsBase.storageDescriptor( + adjustedMetadata.schema(), + adjustedMetadata.location(), + hiveEngineEnabled)); // set to pickup any schema changes + + String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP); + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + if (!Objects.equals(baseMetadataLocation, metadataLocation)) { + throw new CommitFailedException( + "Cannot commit: Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s", + baseMetadataLocation, metadataLocation, database, tableName); + } + + // get Iceberg props that have been removed + Set removedProps = Collections.emptySet(); + if (base != null) { + removedProps = + base.properties().keySet().stream() + .filter(key -> !adjustedMetadata.properties().containsKey(key)) + .collect(Collectors.toSet()); + } + + Map summary = + Optional.ofNullable(adjustedMetadata.currentSnapshot()) + .map(Snapshot::summary) + .orElseGet(ImmutableMap::of); + setHmsTableParameters( + newMetadataLocation, tbl, adjustedMetadata, removedProps, hiveEngineEnabled, summary); + + if (!keepHiveStats) { + tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE); + tbl.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); + } + + lock.ensureActive(); + + try { + persistTable( + tbl, updateHiveTable, hiveLockEnabled(base, conf) ? null : baseMetadataLocation); + lock.ensureActive(); + + commitStatus = BaseMetastoreOperations.CommitStatus.SUCCESS; + } catch (LockException le) { + commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN; + throw new CommitStateUnknownException( + "Failed to heartbeat for hive lock while " + + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. " + + "Please check the commit history. If you are running into this issue, try reducing " + + "iceberg.hive.lock-heartbeat-interval-ms.", + le); + } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { + throw new AlreadyExistsException(e, "Table already exists: %s.%s", database, tableName); + + } catch (InvalidObjectException e) { + throw new ValidationException(e, "Invalid Hive object for %s.%s", database, tableName); + + } catch (CommitFailedException | CommitStateUnknownException e) { + throw e; + + } catch (Throwable e) { + if (e.getMessage() != null + && e.getMessage() + .contains( + "The table has been modified. The parameter value for key '" + + HiveTableOperations.METADATA_LOCATION_PROP + + "' is")) { + throw new CommitFailedException( + e, "The table %s.%s has been modified concurrently", database, tableName); + } + + if (e.getMessage() != null + && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) { + throw new RuntimeException( + "Failed to acquire locks from metastore because the underlying metastore " + + "table 'HIVE_LOCKS' does not exist. This can occur when using an embedded metastore which does not " + + "support transactions. To fix this use an alternative metastore.", + e); + } + + LOG.error( + "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", + database, + tableName, + e); + commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN; + commitStatus = + BaseMetastoreOperations.CommitStatus.valueOf( + checkCommitStatus(newMetadataLocation, adjustedMetadata).name()); + switch (commitStatus) { + case SUCCESS: + break; + case FAILURE: + throw e; + case UNKNOWN: + throw new CommitStateUnknownException(e); + } + } + } catch (TException e) { + throw new RuntimeException( + String.format("Metastore operation failed for %s.%s", database, tableName), e); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during commit", e); + + } catch (LockException e) { + throw new CommitFailedException(e); + + } finally { + HiveOperationsBase.cleanupMetadataAndUnlock(io(), commitStatus, newMetadataLocation, lock); + } + + LOG.info( + "Committed to table {} with the new metadata location {}", fullName, newMetadataLocation); + } + + private void setHmsTableParameters( + String newMetadataLocation, + Table tbl, + TableMetadata metadata, + Set obsoleteProps, + boolean hiveEngineEnabled, + Map summary) { + Map parameters = + Optional.ofNullable(tbl.getParameters()).orElseGet(Maps::newHashMap); + + // push all Iceberg table properties into HMS + metadata.properties().entrySet().stream() + .filter(entry -> !entry.getKey().equalsIgnoreCase(HiveCatalog.HMS_TABLE_OWNER)) + .forEach( + entry -> { + String key = entry.getKey(); + // translate key names between Iceberg and HMS where needed + String hmsKey = ICEBERG_TO_HMS_TRANSLATION.getOrDefault(key, key); + parameters.put(hmsKey, entry.getValue()); + }); + if (metadata.uuid() != null) { + parameters.put(TableProperties.UUID, metadata.uuid()); + } + + // remove any props from HMS that are no longer present in Iceberg table props + obsoleteProps.forEach(parameters::remove); + + parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); + parameters.put(METADATA_LOCATION_PROP, newMetadataLocation); + + if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) { + parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation()); + } + + // If needed set the 'storage_handler' property to enable query from Hive + if (hiveEngineEnabled) { + parameters.put( + hive_metastoreConstants.META_TABLE_STORAGE, + "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler"); + } else { + parameters.remove(hive_metastoreConstants.META_TABLE_STORAGE); + } + + // Set the basic statistics + if (summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP) != null) { + parameters.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP)); + } + if (summary.get(SnapshotSummary.TOTAL_RECORDS_PROP) != null) { + parameters.put(StatsSetupConst.ROW_COUNT, summary.get(SnapshotSummary.TOTAL_RECORDS_PROP)); + } + if (summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP) != null) { + parameters.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); + } + + setSnapshotStats(metadata, parameters); + setSchema(metadata.schema(), parameters); + setPartitionSpec(metadata, parameters); + setSortOrder(metadata, parameters); + + tbl.setParameters(parameters); + } + + @VisibleForTesting + void setSnapshotStats(TableMetadata metadata, Map parameters) { + parameters.remove(TableProperties.CURRENT_SNAPSHOT_ID); + parameters.remove(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP); + parameters.remove(TableProperties.CURRENT_SNAPSHOT_SUMMARY); + + Snapshot currentSnapshot = metadata.currentSnapshot(); + if (exposeInHmsProperties() && currentSnapshot != null) { + parameters.put( + TableProperties.CURRENT_SNAPSHOT_ID, String.valueOf(currentSnapshot.snapshotId())); + parameters.put( + TableProperties.CURRENT_SNAPSHOT_TIMESTAMP, + String.valueOf(currentSnapshot.timestampMillis())); + setSnapshotSummary(parameters, currentSnapshot); + } + + parameters.put(TableProperties.SNAPSHOT_COUNT, String.valueOf(metadata.snapshots().size())); + } + + @VisibleForTesting + void setSnapshotSummary(Map parameters, Snapshot currentSnapshot) { + try { + String summary = JsonUtil.mapper().writeValueAsString(currentSnapshot.summary()); + if (summary.length() <= maxHiveTablePropertySize) { + parameters.put(TableProperties.CURRENT_SNAPSHOT_SUMMARY, summary); + } else { + LOG.warn( + "Not exposing the current snapshot({}) summary in HMS since it exceeds {} characters", + currentSnapshot.snapshotId(), + maxHiveTablePropertySize); + } + } catch (JsonProcessingException e) { + LOG.warn( + "Failed to convert current snapshot({}) summary to a json string", + currentSnapshot.snapshotId(), + e); + } + } + + @VisibleForTesting + void setPartitionSpec(TableMetadata metadata, Map parameters) { + parameters.remove(TableProperties.DEFAULT_PARTITION_SPEC); + if (exposeInHmsProperties() && metadata.spec() != null && metadata.spec().isPartitioned()) { + String spec = PartitionSpecParser.toJson(metadata.spec()); + setField(parameters, TableProperties.DEFAULT_PARTITION_SPEC, spec); + } + } + + @VisibleForTesting + void setSortOrder(TableMetadata metadata, Map parameters) { + parameters.remove(TableProperties.DEFAULT_SORT_ORDER); + if (exposeInHmsProperties() + && metadata.sortOrder() != null + && metadata.sortOrder().isSorted()) { + String sortOrder = SortOrderParser.toJson(metadata.sortOrder()); + setField(parameters, TableProperties.DEFAULT_SORT_ORDER, sortOrder); + } + } + + @Override + public long maxHiveTablePropertySize() { + return maxHiveTablePropertySize; + } + + @Override + public String database() { + return database; + } + + @Override + public String table() { + return tableName; + } + + @Override + public TableType tableType() { + return TableType.EXTERNAL_TABLE; + } + + @Override + public ClientPool metaClients() { + return metaClients; + } + + /** + * Returns if the hive engine related values should be enabled on the table, or not. + * + *

The decision is made like this: + * + *

    + *
  1. Table property value {@link TableProperties#ENGINE_HIVE_ENABLED} + *
  2. If the table property is not set then check the hive-site.xml property value {@link + * ConfigProperties#ENGINE_HIVE_ENABLED} + *
  3. If none of the above is enabled then use the default value {@link + * TableProperties#ENGINE_HIVE_ENABLED_DEFAULT} + *
+ * + * @param metadata Table metadata to use + * @param conf The hive configuration to use + * @return if the hive engine related values should be enabled or not + */ + private static boolean hiveEngineEnabled(TableMetadata metadata, Configuration conf) { + if (metadata.properties().get(TableProperties.ENGINE_HIVE_ENABLED) != null) { + // We know that the property is set, so default value will not be used, + return metadata.propertyAsBoolean(TableProperties.ENGINE_HIVE_ENABLED, false); + } + + return conf.getBoolean( + ConfigProperties.ENGINE_HIVE_ENABLED, TableProperties.ENGINE_HIVE_ENABLED_DEFAULT); + } + + /** + * Returns if the hive locking should be enabled on the table, or not. + * + *

The decision is made like this: + * + *

    + *
  1. Table property value {@link TableProperties#HIVE_LOCK_ENABLED} + *
  2. If the table property is not set then check the hive-site.xml property value {@link + * ConfigProperties#LOCK_HIVE_ENABLED} + *
  3. If none of the above is enabled then use the default value {@link + * TableProperties#HIVE_LOCK_ENABLED_DEFAULT} + *
+ * + * @param metadata Table metadata to use + * @param conf The hive configuration to use + * @return if the hive engine related values should be enabled or not + */ + private static boolean hiveLockEnabled(TableMetadata metadata, Configuration conf) { + if (metadata != null && metadata.properties().get(TableProperties.HIVE_LOCK_ENABLED) != null) { + // We know that the property is set, so default value will not be used, + return metadata.propertyAsBoolean(TableProperties.HIVE_LOCK_ENABLED, false); + } + + return conf.getBoolean( + ConfigProperties.LOCK_HIVE_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT); + } + + @VisibleForTesting + HiveLock lockObject(TableMetadata metadata) { + if (hiveLockEnabled(metadata, conf)) { + return new MetastoreLock(conf, metaClients, catalogName, database, tableName); + } else { + return new NoLock(); + } + } +} From 55c4168b757053ae39bad5ad7e5ae4d21208c7df Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Tue, 25 Feb 2025 14:19:40 -0800 Subject: [PATCH 4/9] add more Exclusion rule --- build.sbt | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 99486644016..02a15febfec 100644 --- a/build.sbt +++ b/build.sbt @@ -858,14 +858,16 @@ lazy val icebergShaded = (project in file("icebergShaded")) libraryDependencies ++= Seq( // Fix Iceberg's legacy java.lang.NoClassDefFoundError: scala/jdk/CollectionConverters$ error // due to legacy scala. - "org.scala-lang.modules" %% "scala-collection-compat" % "2.1.1", + "org.scala-lang.modules" %% "scala-collection-compat" % "2.1.1" % "provided", "org.apache.iceberg" % "iceberg-core" % "1.8.0" excludeAll ( ExclusionRule("com.fasterxml.jackson.core"), - ExclusionRule("com.fasterxml.jackson.module") + ExclusionRule("com.fasterxml.jackson.module"), + ExclusionRule("com.github.ben-manes.caffeine"), ), "org.apache.iceberg" % "iceberg-hive-metastore" % "1.8.0" excludeAll ( ExclusionRule("com.fasterxml.jackson.core"), - ExclusionRule("com.fasterxml.jackson.module") + ExclusionRule("com.fasterxml.jackson.module"), + ExclusionRule("com.github.ben-manes.caffeine"), ), "org.apache.hadoop" % "hadoop-client" % "2.7.3" excludeAll ( ExclusionRule("org.apache.avro"), @@ -892,7 +894,6 @@ lazy val icebergShaded = (project in file("icebergShaded")) ExclusionRule("javax.jdo"), ExclusionRule("commons-beanutils"), ExclusionRule("org.datanucleus"), - ), ), From ec8d8e7a35b08b15bb96f2b0445fde5a19bf9f16 Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Tue, 25 Feb 2025 16:23:56 -0800 Subject: [PATCH 5/9] update version --- examples/scala/build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/scala/build.sbt b/examples/scala/build.sbt index 3bc6e285353..612f0d66ab8 100644 --- a/examples/scala/build.sbt +++ b/examples/scala/build.sbt @@ -20,7 +20,7 @@ organizationName := "example" val scala212 = "2.12.18" val scala213 = "2.13.13" -val icebergVersion = "1.4.1" +val icebergVersion = "1.8.0" val defaultDeltaVersion = { val versionFileContent = IO.read(file("../../version.sbt")) val versionRegex = """.*version\s*:=\s*"([^"]+)".*""".r From 440ae31e33c25874f569029ae0f9cda162048625 Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Fri, 28 Feb 2025 09:48:55 -0800 Subject: [PATCH 6/9] play-around; still does not work --- build.sbt | 27 ++- examples/scala/build.sbt | 7 +- icebergShaded/generate_iceberg_jars.py | 213 ------------------ ...-schema-evolution-with-correct-field.patch | 186 --------------- ...must-not-delete-any-delta-data-files.patch | 177 --------------- ...e-must-not-remove-unknown-table-data.patch | 45 ---- ...takes-updated-source-column-field-id.patch | 40 ---- 7 files changed, 28 insertions(+), 667 deletions(-) delete mode 100644 icebergShaded/generate_iceberg_jars.py delete mode 100644 icebergShaded/iceberg_src_patches/0001-schema-evolution-with-correct-field.patch delete mode 100644 icebergShaded/iceberg_src_patches/0002-iceberg-core-must-not-delete-any-delta-data-files.patch delete mode 100644 icebergShaded/iceberg_src_patches/0003-iceberg-hive-metastore-must-not-remove-unknown-table-data.patch delete mode 100644 icebergShaded/iceberg_src_patches/0005-iceberg-takes-updated-source-column-field-id.patch diff --git a/build.sbt b/build.sbt index 02a15febfec..25369a2a5fc 100644 --- a/build.sbt +++ b/build.sbt @@ -148,7 +148,13 @@ lazy val commonSettings = Seq( "-Ddelta.log.cacheSize=3", "-Dspark.databricks.delta.delta.log.cacheSize=3", "-Dspark.sql.sources.parallelPartitionDiscovery.parallelism=5", - "-Xmx1024m" + "-Xmx1024m", + // For Java 17 + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.net=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED" ), testOptions += Tests.Argument("-oF"), @@ -860,13 +866,13 @@ lazy val icebergShaded = (project in file("icebergShaded")) // due to legacy scala. "org.scala-lang.modules" %% "scala-collection-compat" % "2.1.1" % "provided", "org.apache.iceberg" % "iceberg-core" % "1.8.0" excludeAll ( - ExclusionRule("com.fasterxml.jackson.core"), - ExclusionRule("com.fasterxml.jackson.module"), + ExclusionRule("com.fasterxml.jackson"), + ExclusionRule(organization = "com.fasterxml.jackson.core"), ExclusionRule("com.github.ben-manes.caffeine"), ), "org.apache.iceberg" % "iceberg-hive-metastore" % "1.8.0" excludeAll ( - ExclusionRule("com.fasterxml.jackson.core"), - ExclusionRule("com.fasterxml.jackson.module"), + ExclusionRule("com.fasterxml.jackson"), + ExclusionRule(organization = "com.fasterxml.jackson.core"), ExclusionRule("com.github.ben-manes.caffeine"), ), "org.apache.hadoop" % "hadoop-client" % "2.7.3" excludeAll ( @@ -905,6 +911,17 @@ lazy val icebergShaded = (project in file("icebergShaded")) assembly / assemblyShadeRules := Seq( ShadeRule.rename("org.apache.iceberg.**" -> "shadedForDelta.@0").inAll, ), + assembly / assemblyExcludedJars := { + val cp = (fullClasspath in assembly).value + cp.filter { jar => + val doExclude = jar.data.getName.contains("jackson-annotations") || + jar.data.getName.contains("RoaringBitmap") || + jar.data.getName.contains("jackson") || + jar.data.getName.contains("htrace") + println(s"Excluding jar: ${jar.data.getName} ? $doExclude") + doExclude + } + }, assembly / assemblyMergeStrategy := { case PathList("shadedForDelta", "org", "apache", "iceberg", "PartitionSpec$Builder.class") => MergeStrategy.first diff --git a/examples/scala/build.sbt b/examples/scala/build.sbt index 612f0d66ab8..b6d132f6ee7 100644 --- a/examples/scala/build.sbt +++ b/examples/scala/build.sbt @@ -126,7 +126,12 @@ lazy val extraMavenRepo = sys.env.get("EXTRA_MAVEN_REPO").toSeq.map { repo => lazy val java17Settings = Seq( fork := true, javaOptions ++= Seq( - "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED" + "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED", + // For Java 17 + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.net=ALL-UNNAMED", + "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED" ) ) diff --git a/icebergShaded/generate_iceberg_jars.py b/icebergShaded/generate_iceberg_jars.py deleted file mode 100644 index 2e9b37da487..00000000000 --- a/icebergShaded/generate_iceberg_jars.py +++ /dev/null @@ -1,213 +0,0 @@ -#!/usr/bin/env python3 - -# -# Copyright (2021) The Delta Lake Project Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import argparse -import os -import glob -import re -import subprocess -import shlex -import shutil -from os import path - -iceberg_lib_dir_name = "lib" -iceberg_src_dir_name = "iceberg_src" # this is a git dir -iceberg_patches_dir_name = "iceberg_src_patches" - -iceberg_src_commit_hash = "ede085d0f7529f24acd0c81dd0a43f7bb969b763" -iceberg_src_branch = "main" # only this branch will be downloaded - -# Relative to iceberg_src directory. -# We use * because after applying the patches, a random git hash will be appended to each jar name. -# This, for all usages below, we must search for these jar files using `glob.glob(pattern)` -iceberg_src_compiled_jar_rel_glob_patterns = [ - "bundled-guava/build/libs/iceberg-bundled-guava-*.jar", - "common/build/libs/iceberg-common-*.jar", - "api/build/libs/iceberg-api-*.jar", - "core/build/libs/iceberg-core-*.jar", - "parquet/build/libs/iceberg-parquet-*.jar", - "hive-metastore/build/libs/iceberg-hive-*.jar", - "data/build/libs/iceberg-data-*.jar" -] - -iceberg_root_dir = path.abspath(path.dirname(__file__)) # this is NOT a git dir -iceberg_src_dir = path.join(iceberg_root_dir, iceberg_src_dir_name) -iceberg_patches_dir = path.join(iceberg_root_dir, iceberg_patches_dir_name) -iceberg_lib_dir = path.join(iceberg_root_dir, iceberg_lib_dir_name) - - -def iceberg_jars_exists(): - for compiled_jar_rel_glob_pattern in iceberg_src_compiled_jar_rel_glob_patterns: - jar_file_name_pattern = path.basename(path.normpath(compiled_jar_rel_glob_pattern)) - lib_jar_abs_pattern = path.join(iceberg_lib_dir, jar_file_name_pattern) - results = glob.glob(lib_jar_abs_pattern) - - if len(results) > 1: - raise Exception("More jars than expected: " + str(results)) - - if len(results) == 0: - return False - - return True - - -def add_google_maven_repo_to_gradle_config(): - with WorkingDirectory(iceberg_src_dir): - file_path = 'build.gradle' - - with open(file_path, 'r') as file: - content = file.read() - - # Define the old and new configurations - old_config = r'repositories {\n' - - new_config = 'repositories {\n maven {\n ' + \ - 'url "https://maven-central.storage-download.googleapis.com/maven2"\n }\n' - - # Replace the old configuration with the new one - updated_content = re.sub(old_config, new_config, content, flags=re.DOTALL) - - # Write the updated content back to the file - with open(file_path, 'w') as file: - file.write(updated_content) - - -def prepare_iceberg_source(): - with WorkingDirectory(iceberg_root_dir): - print(">>> Cloning Iceberg repo") - shutil.rmtree(iceberg_src_dir_name, ignore_errors=True) - - # We just want the shallowest, smallest iceberg clone. We will check out the commit later. - run_cmd("git clone --depth 1 --branch %s https://github.com/apache/iceberg.git %s" % - (iceberg_src_branch, iceberg_src_dir_name)) - - with WorkingDirectory(iceberg_src_dir): - run_cmd("git config user.email \"<>\"") - run_cmd("git config user.name \"Anonymous\"") - - # Fetch just the single commit (shallow) - run_cmd("git fetch origin %s --depth 1" % iceberg_src_commit_hash) - run_cmd("git checkout %s" % iceberg_src_commit_hash) - - print(">>> Applying patch files") - patch_files = glob.glob(path.join(iceberg_patches_dir, "*.patch")) - patch_files.sort() - - for patch_file in patch_files: - print(">>> Applying '%s'" % patch_file) - run_cmd("git apply %s" % patch_file) - run_cmd("git add .") - run_cmd("git commit -a -m 'applied %s'" % path.basename(patch_file)) - - add_google_maven_repo_to_gradle_config() - - -def generate_iceberg_jars(): - print(">>> Compiling JARs") - with WorkingDirectory(iceberg_src_dir): - # disable style checks (can fail with patches) and tests - build_args = "-x spotlessCheck -x checkstyleMain -x test -x integrationTest" - run_cmd("./gradlew :iceberg-core:build %s" % build_args) - run_cmd("./gradlew :iceberg-parquet:build %s" % build_args) - run_cmd("./gradlew :iceberg-hive-metastore:build %s" % build_args) - run_cmd("./gradlew :iceberg-data:build %s" % build_args) - - print(">>> Copying JARs to lib directory") - shutil.rmtree(iceberg_lib_dir, ignore_errors=True) - os.mkdir(iceberg_lib_dir) - - # For each relative pattern p ... - for compiled_jar_rel_glob_pattern in iceberg_src_compiled_jar_rel_glob_patterns: - # Get the absolute pattern - compiled_jar_abs_pattern = path.join(iceberg_src_dir, compiled_jar_rel_glob_pattern) - # Search for all glob results - results = glob.glob(compiled_jar_abs_pattern) - # Compiled jars will include tests, sources, javadocs; exclude them - results = list(filter(lambda result: all(x not in result for x in ["tests.jar", "sources.jar", "javadoc.jar"]), results)) - - if len(results) == 0: - raise Exception("Could not find the jar: " + compled_jar_rel_glob_pattern) - if len(results) > 1: - raise Exception("More jars created than expected: " + str(results)) - - # Copy the one jar result into the /lib directory - compiled_jar_abs_path = results[0] - compiled_jar_name = path.basename(path.normpath(compiled_jar_abs_path)) - lib_jar_abs_path = path.join(iceberg_lib_dir, compiled_jar_name) - shutil.copyfile(compiled_jar_abs_path, lib_jar_abs_path) - - if not iceberg_jars_exists(): - raise Exception("JAR copying failed") - - -def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, **kwargs): - if isinstance(cmd, str): - cmd = shlex.split(cmd) - cmd_env = os.environ.copy() - if env: - cmd_env.update(env) - - if stream_output: - child = subprocess.Popen(cmd, env=cmd_env, **kwargs) - exit_code = child.wait() - if throw_on_error and exit_code != 0: - raise Exception("Non-zero exitcode: %s" % (exit_code)) - print("----\n") - return exit_code - else: - child = subprocess.Popen( - cmd, - env=cmd_env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - **kwargs) - (stdout, stderr) = child.communicate() - exit_code = child.wait() - if throw_on_error and exit_code != 0: - raise Exception( - "Non-zero exitcode: %s\n\nSTDOUT:\n%s\n\nSTDERR:%s" % - (exit_code, stdout, stderr)) - return (exit_code, stdout, stderr) - - -# pylint: disable=too-few-public-methods -class WorkingDirectory(object): - def __init__(self, working_directory): - self.working_directory = working_directory - self.old_workdir = os.getcwd() - - def __enter__(self): - os.chdir(self.working_directory) - - def __exit__(self, tpe, value, traceback): - os.chdir(self.old_workdir) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument( - "--force", - required=False, - default=False, - action="store_true", - help="Force the generation even if already generated, useful for testing.") - args = parser.parse_args() - - if args.force or not iceberg_jars_exists(): - prepare_iceberg_source() - generate_iceberg_jars() diff --git a/icebergShaded/iceberg_src_patches/0001-schema-evolution-with-correct-field.patch b/icebergShaded/iceberg_src_patches/0001-schema-evolution-with-correct-field.patch deleted file mode 100644 index 8be6a077109..00000000000 --- a/icebergShaded/iceberg_src_patches/0001-schema-evolution-with-correct-field.patch +++ /dev/null @@ -1,186 +0,0 @@ -Creates a new `SetSchema` pending update that will let us set the latest iceberg schema instead of having to apply incremental/delta changes to the existing schema. - -This PR requires that column mapping ID mode be enabled, and uses the same fieldId on the iceberg schema using the delta schema columnIds. - -This PR also blocks MapType or ArrayType (on the iceberg side). Doing so requires more complicated fieldId calculation, which is out of scope of this PR and of the first milestone. TLDR Delta Map and Array types have their inner elements as DataTypes, but iceberg Map and List types have their inner elements as actual fields (which need a field ID). So even though delta column mapping ID mode will assign IDs to each delta field, this is insufficient as it won't assign IDs for these maps/array types. - ---- - .../java/org/apache/iceberg/SetSchema.java | 25 ++ - .../java/org/apache/iceberg/Transaction.java | 7 + - .../org/apache/iceberg/BaseTransaction.java | 8 + - .../iceberg/CommitCallbackTransaction.java | 5 + - .../org/apache/iceberg/SetSchemaImpl.java | 45 ++++ - .../org/apache/iceberg/TableMetadata.java | 14 +- - .../IcebergConversionTransaction.scala | 232 +++++++++--------- - .../tahoe/iceberg/IcebergSchemaUtils.scala | 55 +++-- - .../iceberg/IcebergTransactionUtils.scala | 16 +- - .../IcebergConversionTransactionSuite.scala | 224 ++++++++++++++++- - .../tahoe/iceberg/IcebergConverterSuite.scala | 3 +- - .../iceberg/IcebergSchemaUtilsSuite.scala | 200 ++++++++------- - .../IcebergTransactionUtilsSuite.scala | 25 +- - 13 files changed, 595 insertions(+), 264 deletions(-) - create mode 100644 api/src/main/java/org/apache/iceberg/SetSchema.java - create mode 100644 core/src/main/java/org/apache/iceberg/SetSchemaImpl.java - -diff --git a/api/src/main/java/org/apache/iceberg/SetSchema.java b/connector/iceberg-core/api/src/main/java/org/apache/iceberg/SetSchema.java -new file mode 100644 -index 00000000000..042a594ae5b ---- /dev/null -+++ b/api/src/main/java/org/apache/iceberg/SetSchema.java -@@ -0,0 +1,25 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one -+ * or more contributor license agreements. See the NOTICE file -+ * distributed with this work for additional information -+ * regarding copyright ownership. The ASF licenses this file -+ * to you under the Apache License, Version 2.0 (the -+ * "License"); you may not use this file except in compliance -+ * with the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, -+ * software distributed under the License is distributed on an -+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -+ * KIND, either express or implied. See the License for the -+ * specific language governing permissions and limitations -+ * under the License. -+ */ -+ -+package org.apache.iceberg; -+ -+/** -+ * API to set the new, latest Iceberg schema. -+ */ -+public interface SetSchema extends PendingUpdate { } -diff --git a/api/src/main/java/org/apache/iceberg/Transaction.java b/connector/iceberg-core/api/src/main/java/org/apache/iceberg/Transaction.java -index 090b5dfe37c..3879c9a9146 100644 ---- a/api/src/main/java/org/apache/iceberg/Transaction.java -+++ b/api/src/main/java/org/apache/iceberg/Transaction.java -@@ -37,6 +37,13 @@ public interface Transaction { - */ - UpdateSchema updateSchema(); - -+ /** -+ * Create a new {@link SetSchema} to set the new table schema. -+ * -+ * @return a new {@link SetSchema} -+ */ -+ SetSchema setSchema(Schema newSchema); -+ - /** - * Create a new {@link UpdatePartitionSpec} to alter the partition spec of this table. - * -diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/BaseTransaction.java -index 241738fedab..e299d04ebbd 100644 ---- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java -+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java -@@ -113,6 +113,14 @@ public class BaseTransaction implements Transaction { - return schemaChange; - } - -+ @Override -+ public SetSchema setSchema(Schema newSchema) { -+ checkLastOperationCommitted("SetSchema"); -+ SetSchema setSchema = new SetSchemaImpl(transactionOps, transactionOps.current(), newSchema); -+ updates.add(setSchema); -+ return setSchema; -+ } -+ - @Override - public UpdatePartitionSpec updateSpec() { - checkLastOperationCommitted("UpdateSpec"); -diff --git a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java -index 19b74a65eca..6a2d7614a82 100644 ---- a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java -+++ b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java -@@ -41,6 +41,11 @@ class CommitCallbackTransaction implements Transaction { - return wrapped.updateSchema(); - } - -+ @Override -+ public SetSchema setSchema(Schema newSchema) { -+ return wrapped.setSchema(newSchema); -+ } -+ - @Override - public UpdatePartitionSpec updateSpec() { - return wrapped.updateSpec(); -diff --git a/core/src/main/java/org/apache/iceberg/SetSchemaImpl.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/SetSchemaImpl.java -new file mode 100644 -index 00000000000..ce6731a4e13 ---- /dev/null -+++ b/core/src/main/java/org/apache/iceberg/SetSchemaImpl.java -@@ -0,0 +1,45 @@ -+/* -+ * Copyright (2021) The Delta Lake Project Authors. -+ * -+ * Licensed under the Apache License, Version 2.0 (the "License"); -+ * you may not use this file except in compliance with the License. -+ * You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+ -+ -+ -+package org.apache.iceberg; -+ -+public class SetSchemaImpl implements SetSchema { -+ -+ private final TableOperations ops; -+ private final TableMetadata base; -+ private final Schema newSchema; -+ -+ public SetSchemaImpl(TableOperations ops, TableMetadata base, Schema newSchema) { -+ this.ops = ops; -+ this.base = base; -+ this.newSchema = newSchema; -+ } -+ -+ @Override -+ public Schema apply() { -+ return newSchema; -+ } -+ -+ @Override -+ public void commit() { -+ // This will override the current schema -+ TableMetadata update = base.updateSchema(apply(), newSchema.highestFieldId()); -+ ops.commit(base, update); -+ } -+} -diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/TableMetadata.java -index afa2c7ac2d5..52546f02a75 100644 ---- a/core/src/main/java/org/apache/iceberg/TableMetadata.java -+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java -@@ -1339,11 +1339,15 @@ public class TableMetadata implements Serializable { - } - - private int addSchemaInternal(Schema schema, int newLastColumnId) { -- Preconditions.checkArgument( -- newLastColumnId >= lastColumnId, -- "Invalid last column ID: %s < %s (previous last column ID)", -- newLastColumnId, -- lastColumnId); -+ // Since we use txn.setSchema instead of txn.updateSchema, we are manually setting the new -+ // schema. Thus, if we drop the last column, it is clearly possible and valid for the -+ // newLastColumnId to be < the previous lastColumnId. Thus, we ignore this check. -+ // -+ // Preconditions.checkArgument( -+ // newLastColumnId >= lastColumnId, -+ // "Invalid last column ID: %s < %s (previous last column ID)", -+ // newLastColumnId, -+ // lastColumnId); - - int newSchemaId = reuseOrCreateNewSchemaId(schema); - boolean schemaFound = schemasById.containsKey(newSchemaId); --- -2.39.2 (Apple Git-143) diff --git a/icebergShaded/iceberg_src_patches/0002-iceberg-core-must-not-delete-any-delta-data-files.patch b/icebergShaded/iceberg_src_patches/0002-iceberg-core-must-not-delete-any-delta-data-files.patch deleted file mode 100644 index a181f065040..00000000000 --- a/icebergShaded/iceberg_src_patches/0002-iceberg-core-must-not-delete-any-delta-data-files.patch +++ /dev/null @@ -1,177 +0,0 @@ -iceberg core must NOT delete any delta data files - ---- - .../iceberg/IncrementalFileCleanup.java | 8 +-- - .../apache/iceberg/ReachableFileCleanup.java | 5 +- - .../apache/iceberg/TestRemoveSnapshots.java | 57 +++++++++++-------- - 3 files changed, 40 insertions(+), 30 deletions(-) - -diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java -index d894dcbf36d..ead7ea6b076 100644 ---- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java -+++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java -@@ -256,10 +256,10 @@ class IncrementalFileCleanup extends FileCleanupStrategy { - } - }); - -- Set filesToDelete = -- findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration); -- -- deleteFiles(filesToDelete, "data"); -+ // iceberg core MUST NOT delete any data files which are managed by delta -+ // Set filesToDelete = -+ // findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration); -+ // deleteFiles(filesToDelete, "data"); - LOG.warn("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete)); - LOG.warn("Manifests Lists to delete: {}", Joiner.on(", ").join(manifestListsToDelete)); - deleteFiles(manifestsToDelete, "manifest"); -diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java -index ccbee78e27b..da888a63b3d 100644 ---- a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java -+++ b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java -@@ -72,8 +72,9 @@ class ReachableFileCleanup extends FileCleanupStrategy { - snapshotsAfterExpiration, deletionCandidates, currentManifests::add); - - if (!manifestsToDelete.isEmpty()) { -- Set dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests); -- deleteFiles(dataFilesToDelete, "data"); -+ // iceberg core MUST NOT delete any data files which are managed by delta -+ // Set dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests); -+ // deleteFiles(dataFilesToDelete, "data"); - Set manifestPathsToDelete = - manifestsToDelete.stream().map(ManifestFile::path).collect(Collectors.toSet()); - deleteFiles(manifestPathsToDelete, "manifest"); -diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/connector/iceberg-core/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java -index 53e5af520d9..95fa8e41de1 100644 ---- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java -+++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java -@@ -147,8 +147,9 @@ public class TestRemoveSnapshots extends TableTestBase { - secondSnapshot - .allManifests(table.io()) - .get(0) -- .path(), // manifest contained only deletes, was dropped -- FILE_A.path()), // deleted -+ .path() // manifest contained only deletes, was dropped -+ // FILE_A.path() should NOT delete data files -+ ), // deleted - deletedFiles); - } - -@@ -209,8 +210,9 @@ public class TestRemoveSnapshots extends TableTestBase { - .allManifests(table.io()) - .get(0) - .path(), // manifest was rewritten for delete -- secondSnapshot.manifestListLocation(), // snapshot expired -- FILE_A.path()), // deleted -+ secondSnapshot.manifestListLocation() // snapshot expired -+ // FILE_A.path() should not delete any data files -+ ), - deletedFiles); - } - -@@ -309,8 +311,9 @@ public class TestRemoveSnapshots extends TableTestBase { - Sets.newHashSet( - secondSnapshot.manifestListLocation(), // snapshot expired - Iterables.getOnlyElement(secondSnapshotManifests) -- .path(), // manifest is no longer referenced -- FILE_B.path()), // added, but rolled back -+ .path() // manifest is no longer referenced -+ // FILE_B.path() should not delete any data files -+ ), - deletedFiles); - } - -@@ -686,7 +689,8 @@ public class TestRemoveSnapshots extends TableTestBase { - - removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit(); - -- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); -+ Assert.assertTrue("FILE_A should NOT be deleted", -+ !deletedFiles.contains(FILE_A.path().toString())); - } - - @Test -@@ -712,7 +716,8 @@ public class TestRemoveSnapshots extends TableTestBase { - - removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit(); - -- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); -+ Assert.assertTrue("FILE_A should NOT be deleted", -+ !deletedFiles.contains(FILE_A.path().toString())); - } - - @Test -@@ -749,8 +754,10 @@ public class TestRemoveSnapshots extends TableTestBase { - - removeSnapshots(table).expireOlderThan(t4).deleteWith(deletedFiles::add).commit(); - -- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); -- Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString())); -+ Assert.assertTrue("FILE_A should NOT be deleted", -+ !deletedFiles.contains(FILE_A.path().toString())); -+ Assert.assertTrue("FILE_B should NOT be deleted", -+ !deletedFiles.contains(FILE_B.path().toString())); - } - - @Test -@@ -824,9 +831,11 @@ public class TestRemoveSnapshots extends TableTestBase { - Sets.newHashSet( - "remove-snapshot-0", "remove-snapshot-1", "remove-snapshot-2", "remove-snapshot-3")); - -- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); -- Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString())); -- Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0); -+ Assert.assertTrue("FILE_A should NOT be deleted", -+ !deletedFiles.contains(FILE_A.path().toString())); -+ Assert.assertTrue("FILE_B should NOT be deleted", -+ !deletedFiles.contains(FILE_B.path().toString())); -+ // Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0); - } - - @Test -@@ -885,13 +894,13 @@ public class TestRemoveSnapshots extends TableTestBase { - Set expectedDeletes = Sets.newHashSet(); - expectedDeletes.add(snapshotA.manifestListLocation()); - -- // Files should be deleted of dangling staged snapshot -- snapshotB -- .addedDataFiles(table.io()) -- .forEach( -- i -> { -- expectedDeletes.add(i.path().toString()); -- }); -+ // Files should NOT be deleted of dangling staged snapshot -+ // snapshotB -+ // .addedDataFiles(table.io()) -+ // .forEach( -+ // i -> { -+ // expectedDeletes.add(i.path().toString()); -+ // }); - - // ManifestList should be deleted too - expectedDeletes.add(snapshotB.manifestListLocation()); -@@ -1144,10 +1153,10 @@ public class TestRemoveSnapshots extends TableTestBase { - removeSnapshots(table).expireOlderThan(fourthSnapshotTs).deleteWith(deletedFiles::add).commit(); - - Assert.assertEquals( -- "Should remove old delete files and delete file manifests", -+ "Should only delete file manifests", - ImmutableSet.builder() -- .add(FILE_A.path()) -- .add(FILE_A_DELETES.path()) -+ // .add(FILE_A.path()) -+ // .add(FILE_A_DELETES.path()) - .add(firstSnapshot.manifestListLocation()) - .add(secondSnapshot.manifestListLocation()) - .add(thirdSnapshot.manifestListLocation()) -@@ -1501,7 +1510,7 @@ public class TestRemoveSnapshots extends TableTestBase { - expectedDeletes.addAll(manifestPaths(appendA, table.io())); - expectedDeletes.add(branchDelete.manifestListLocation()); - expectedDeletes.addAll(manifestPaths(branchDelete, table.io())); -- expectedDeletes.add(FILE_A.path().toString()); -+ // expectedDeletes.add(FILE_A.path().toString()); - - Assert.assertEquals(2, Iterables.size(table.snapshots())); - Assert.assertEquals(expectedDeletes, deletedFiles); --- -2.39.2 (Apple Git-143) diff --git a/icebergShaded/iceberg_src_patches/0003-iceberg-hive-metastore-must-not-remove-unknown-table-data.patch b/icebergShaded/iceberg_src_patches/0003-iceberg-hive-metastore-must-not-remove-unknown-table-data.patch deleted file mode 100644 index 23386853c2d..00000000000 --- a/icebergShaded/iceberg_src_patches/0003-iceberg-hive-metastore-must-not-remove-unknown-table-data.patch +++ /dev/null @@ -1,45 +0,0 @@ -HiveTableOperations should have its catalog operations compatible with Delta - -This patch prevent Iceberg HiveTableOperations to overwrite catalog table properties used by Delta. It also writes a dummy schema to metastore to be aligned with Delta's behavior. ---- -Index: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java -=================================================================== -diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java ---- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java (revision ede085d0f7529f24acd0c81dd0a43f7bb969b763) -+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java (revision 4470b919dd6a97b0f6d6b7d57d1d57348a40c025) -@@ -43,6 +43,7 @@ - import org.apache.hadoop.hive.metastore.IMetaStoreClient; - import org.apache.hadoop.hive.metastore.TableType; - import org.apache.hadoop.hive.metastore.api.InvalidObjectException; -+import org.apache.hadoop.hive.metastore.api.FieldSchema; - import org.apache.hadoop.hive.metastore.api.LockComponent; - import org.apache.hadoop.hive.metastore.api.LockLevel; - import org.apache.hadoop.hive.metastore.api.LockRequest; -@@ -286,7 +287,9 @@ - LOG.debug("Committing new table: {}", fullName); - } - -- tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to pickup any schema changes -+ StorageDescriptor newsd = storageDescriptor(metadata, hiveEngineEnabled); -+ newsd.getSerdeInfo().setParameters(tbl.getSd().getSerdeInfo().getParameters()); -+ tbl.setSd(newsd); // set to pickup any schema changes - - String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP); - String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; -@@ -393,6 +396,7 @@ - @VisibleForTesting - void persistTable(Table hmsTable, boolean updateHiveTable) - throws TException, InterruptedException { -+ hmsTable.getSd().setCols(Collections.singletonList(new FieldSchema("col", "array", ""))); - if (updateHiveTable) { - metaClients.run( - client -> { -@@ -468,7 +472,7 @@ - } - - // remove any props from HMS that are no longer present in Iceberg table props -- obsoleteProps.forEach(parameters::remove); -+ // obsoleteProps.forEach(parameters::remove); - - parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); - parameters.put(METADATA_LOCATION_PROP, newMetadataLocation); diff --git a/icebergShaded/iceberg_src_patches/0005-iceberg-takes-updated-source-column-field-id.patch b/icebergShaded/iceberg_src_patches/0005-iceberg-takes-updated-source-column-field-id.patch deleted file mode 100644 index 21b438bdd90..00000000000 --- a/icebergShaded/iceberg_src_patches/0005-iceberg-takes-updated-source-column-field-id.patch +++ /dev/null @@ -1,40 +0,0 @@ -Iceberg PartitionField takes source column field id from latest schema if changed - ---- a/core/src/main/java/org/apache/iceberg/TableMetadata.java -+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java -@@ -664,13 +664,22 @@ public class TableMetadata implements Serializable { - return new Builder(this).upgradeFormatVersion(newFormatVersion).build(); - } - -- private static PartitionSpec updateSpecSchema(Schema schema, PartitionSpec partitionSpec) { -+ private static PartitionSpec updateSpecSchema(Schema newSchema, Schema currSchema, PartitionSpec partitionSpec) { - PartitionSpec.Builder specBuilder = -- PartitionSpec.builderFor(schema).withSpecId(partitionSpec.specId()); -+ PartitionSpec.builderFor(newSchema).withSpecId(partitionSpec.specId()); - -- // add all the fields to the builder. IDs should not change. -+ // add all the fields to the builder. IDs may change so it looks up the source field id by -+ // name from the new schema - for (PartitionField field : partitionSpec.fields()) { -- specBuilder.add(field.sourceId(), field.fieldId(), field.name(), field.transform()); -+ String partFieldSourceName = currSchema.findField(field.sourceId()).name(); -+ int partFieldSourceInt; -+ org.apache.iceberg.types.Types.NestedField partSourceFieldInNewSchema = newSchema.findField(partFieldSourceName); -+ if (partSourceFieldInNewSchema == null) { -+ partFieldSourceInt = field.sourceId(); -+ } else { -+ partFieldSourceInt = partSourceFieldInNewSchema.fieldId(); -+ } -+ specBuilder.add(partFieldSourceInt, field.fieldId(), field.name(), field.transform()); - } - - // build without validation because the schema may have changed in a way that makes this spec -@@ -970,7 +979,7 @@ public class TableMetadata implements Serializable { - - // rebuild all the partition specs and sort orders for the new current schema - this.specs = -- Lists.newArrayList(Iterables.transform(specs, spec -> updateSpecSchema(schema, spec))); -+ Lists.newArrayList(Iterables.transform(specs, spec -> updateSpecSchema(schema, schemasById.get(currentSchemaId), spec))); - specsById.clear(); - specsById.putAll(indexSpecs(specs)); - From be5a9310632631ba3d770d4fde5d31c039a92a7c Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Fri, 28 Feb 2025 23:26:00 -0800 Subject: [PATCH 7/9] fix HiveTableOperations --- build.sbt | 6 +++++- examples/scala/build.sbt | 1 + .../icebergShaded/IcebergConverter.scala | 5 ----- .../iceberg/hive/HiveTableOperations.java | 21 +++++++++++++++---- 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/build.sbt b/build.sbt index 25369a2a5fc..92b6db7c011 100644 --- a/build.sbt +++ b/build.sbt @@ -434,7 +434,8 @@ lazy val spark = (project in file("spark")) sparkMimaSettings, releaseSettings, crossSparkSettings(), - libraryDependencies ++= Seq( + dependencyOverrides += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.18.2", + libraryDependencies ++= Seq( // Adding test classifier seems to break transitive resolution of the core dependencies "org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", @@ -869,17 +870,20 @@ lazy val icebergShaded = (project in file("icebergShaded")) ExclusionRule("com.fasterxml.jackson"), ExclusionRule(organization = "com.fasterxml.jackson.core"), ExclusionRule("com.github.ben-manes.caffeine"), + ExclusionRule("io.netty"), ), "org.apache.iceberg" % "iceberg-hive-metastore" % "1.8.0" excludeAll ( ExclusionRule("com.fasterxml.jackson"), ExclusionRule(organization = "com.fasterxml.jackson.core"), ExclusionRule("com.github.ben-manes.caffeine"), + ExclusionRule("io.netty"), ), "org.apache.hadoop" % "hadoop-client" % "2.7.3" excludeAll ( ExclusionRule("org.apache.avro"), ExclusionRule("org.slf4j"), ExclusionRule("commons-beanutils"), ExclusionRule("org.datanucleus"), + ExclusionRule("io.netty"), ), "org.apache.hive" % "hive-metastore" % "2.3.8" excludeAll ( ExclusionRule("org.apache.avro"), diff --git a/examples/scala/build.sbt b/examples/scala/build.sbt index b6d132f6ee7..7f6b23771e0 100644 --- a/examples/scala/build.sbt +++ b/examples/scala/build.sbt @@ -175,5 +175,6 @@ lazy val root = (project in file(".")) "-deprecation", "-feature" ), + dependencyOverrides += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.18.2", java17Settings ) diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala index a06836c193f..fc3da3d2cbe 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala @@ -327,11 +327,6 @@ class IcebergConverter(spark: SparkSession) case (None, None) => CREATE_TABLE } - UniversalFormat.enforceSupportInCatalog(cleanedCatalogTable, snapshotToConvert.metadata) match { - case Some(updatedTable) => spark.sessionState.catalog.alterTable(updatedTable) - case _ => - } - val icebergTxn = new IcebergConversionTransaction( spark, cleanedCatalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp, lastConvertedIcebergSnapshotId, lastDeltaVersionConverted) diff --git a/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 4c13ce013f9..2671c3dcb2e 100644 --- a/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/icebergShaded/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -36,8 +36,10 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.iceberg.BaseMetastoreOperations; @@ -55,6 +57,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.NoSuchIcebergTableException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.ConfigProperties; @@ -174,7 +177,13 @@ protected void doRefresh() { if (currentMetadataLocation() != null) { throw new NoSuchTableException("No such table: %s.%s", database, tableName); } - + } catch (NoSuchIcebergTableException e) { + // NoSuchIcebergTableException is throw when table exists in catalog but not with + // table_type=iceberg; in that case we want to swallow so createTable + // txn can proceed with creating the iceberg table/metadata and set table_type=iceberg + if (currentMetadataLocation() != null) { + throw new NoSuchTableException("No such table: %s.%s", database, tableName); + } } catch (TException e) { String errMsg = String.format("Failed to get table info from metastore %s.%s", database, tableName); @@ -257,11 +266,15 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { LOG.debug("Committing new table: {}", fullName); } - tbl.setSd( - HiveOperationsBase.storageDescriptor( + StorageDescriptor newsd = HiveOperationsBase.storageDescriptor( adjustedMetadata.schema(), adjustedMetadata.location(), - hiveEngineEnabled)); // set to pickup any schema changes + hiveEngineEnabled); + // use storage descriptor from Delta + newsd.getSerdeInfo().setParameters(tbl.getSd().getSerdeInfo().getParameters()); + tbl.setSd(newsd); + // set schema to be empty to match Delta behavior + tbl.getSd().setCols(Collections.singletonList(new FieldSchema("col", "array", ""))); String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP); String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; From 2f795ab6c7f50f7aee1eb8091cdbcc711d2fa0ce Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Mon, 3 Mar 2025 09:34:39 -0800 Subject: [PATCH 8/9] improve example --- .../src/main/scala/example/UniForm.scala | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/examples/scala/src/main/scala/example/UniForm.scala b/examples/scala/src/main/scala/example/UniForm.scala index 291f9101bcf..04b5c64e9a5 100644 --- a/examples/scala/src/main/scala/example/UniForm.scala +++ b/examples/scala/src/main/scala/example/UniForm.scala @@ -60,16 +60,36 @@ object UniForm { .config("spark.sql.catalogImplementation", "hive") .getOrCreate() + val schema = + """ + |col0 INT, + |col1 STRUCT< + | col2: MAP, + | col3: ARRAY, + | col4: STRUCT + |>, + |col5 INT, + |col6 INT + |""".stripMargin + def getRowToInsertStr(id: Int): String = { + s""" + |$id, + |struct(map($id, $id), array($id), struct($id)), + |$id, + |$id + |""".stripMargin + } deltaSpark.sql(s"DROP TABLE IF EXISTS ${testTableName}") deltaSpark.sql( - s"""CREATE TABLE `${testTableName}` (col1 INT) using DELTA + s"""CREATE TABLE `${testTableName}` ($schema) using DELTA + |PARTITIONED BY (col0, col5, col6) |TBLPROPERTIES ( | 'delta.columnMapping.mode' = 'name', - | 'delta.enableIcebergCompatV1' = 'true', + | 'delta.enableIcebergCompatV2' = 'true', | 'delta.universalFormat.enabledFormats' = 'iceberg' |)""".stripMargin) - deltaSpark.sql(s"INSERT INTO `$testTableName` VALUES (123)") + deltaSpark.sql(s"INSERT INTO $testTableName VALUES (${getRowToInsertStr(1)})") // Wait for the conversion to be done Thread.sleep(10000) From e14aa4757fc2ce1dd6fd4e305383b55279ca4bf6 Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Mon, 3 Mar 2025 10:38:29 -0800 Subject: [PATCH 9/9] clean build.sbt --- build.sbt | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/build.sbt b/build.sbt index 92b6db7c011..d0a1dad3a92 100644 --- a/build.sbt +++ b/build.sbt @@ -867,14 +867,10 @@ lazy val icebergShaded = (project in file("icebergShaded")) // due to legacy scala. "org.scala-lang.modules" %% "scala-collection-compat" % "2.1.1" % "provided", "org.apache.iceberg" % "iceberg-core" % "1.8.0" excludeAll ( - ExclusionRule("com.fasterxml.jackson"), - ExclusionRule(organization = "com.fasterxml.jackson.core"), ExclusionRule("com.github.ben-manes.caffeine"), ExclusionRule("io.netty"), ), "org.apache.iceberg" % "iceberg-hive-metastore" % "1.8.0" excludeAll ( - ExclusionRule("com.fasterxml.jackson"), - ExclusionRule(organization = "com.fasterxml.jackson.core"), ExclusionRule("com.github.ben-manes.caffeine"), ExclusionRule("io.netty"), ), @@ -919,10 +915,7 @@ lazy val icebergShaded = (project in file("icebergShaded")) val cp = (fullClasspath in assembly).value cp.filter { jar => val doExclude = jar.data.getName.contains("jackson-annotations") || - jar.data.getName.contains("RoaringBitmap") || - jar.data.getName.contains("jackson") || - jar.data.getName.contains("htrace") - println(s"Excluding jar: ${jar.data.getName} ? $doExclude") + jar.data.getName.contains("RoaringBitmap") doExclude } },