From bb1caf95fc333ccc6f617344a56c8e89d51054eb Mon Sep 17 00:00:00 2001 From: vernedeng Date: Mon, 9 Oct 2023 10:27:56 +0800 Subject: [PATCH 1/2] [INLONG-8982][Sort] Iceberg sink on flink 1.15 --- .../iceberg/FlinkDynamicTableFactory.java | 2 +- .../sort/iceberg/IcebergWritableMetadata.java | 66 ++ .../sort/iceberg/sink/CommitSummary.java | 96 +++ .../sort/iceberg/sink/DeltaManifests.java | 74 ++ .../sink/DeltaManifestsSerializer.java | 126 ++++ .../sink/EqualityFieldKeySelector.java | 88 +++ .../sort/iceberg/sink/FlinkManifestUtil.java | 141 ++++ .../inlong/sort/iceberg/sink/FlinkSink.java | 690 ++++++++++++++++++ .../iceberg/sink/IcebergFilesCommitter.java | 518 +++++++++++++ .../sink/IcebergFilesCommitterMetrics.java | 101 +++ .../iceberg/sink/IcebergStreamWriter.java | 134 ++++ .../sink/IcebergStreamWriterMetrics.java | 116 +++ .../sort/iceberg/sink/IcebergTableSink.java | 160 ++++ .../sink/ManifestOutputFileFactory.java | 98 +++ .../iceberg/sink/PartitionKeySelector.java | 66 ++ .../InlongIcebergSourceReaderMetrics.java | 2 +- .../source/reader/RowDataRecordFactory.java | 4 +- .../utils/RecyclableJoinedRowData.java | 2 +- .../{source => }/utils/RowDataCloneUtil.java | 2 +- .../sort/iceberg/utils/SinkMetadataUtils.java | 107 +++ licenses/inlong-sort-connectors/LICENSE | 33 +- 21 files changed, 2610 insertions(+), 16 deletions(-) create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergWritableMetadata.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/CommitSummary.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitterMetrics.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergTableSink.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java rename inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/{source => }/utils/RecyclableJoinedRowData.java (99%) rename inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/{source => }/utils/RowDataCloneUtil.java (97%) create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java index ef50b8d3a8..240c718754 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.iceberg; +import org.apache.inlong.sort.iceberg.sink.IcebergTableSink; import org.apache.inlong.sort.iceberg.source.IcebergTableSource; import org.apache.flink.configuration.ConfigOption; @@ -38,7 +39,6 @@ import org.apache.flink.util.Preconditions; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.flink.IcebergTableSink; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergWritableMetadata.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergWritableMetadata.java new file mode 100644 index 0000000000..5a1e8fc031 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergWritableMetadata.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg; + +import org.apache.inlong.sort.base.Constants; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import java.io.Serializable; + +public enum IcebergWritableMetadata { + + AUDIT_DATA_TIME( + Constants.META_AUDIT_DATA_TIME, + DataTypes.BIGINT().notNull(), + (r, p) -> r.getLong(p)), + + NULL( + "", + DataTypes.NULL(), + (r, p) -> null); + + private final String key; + private final DataType dataType; + private final MetadataConverter converter; + + public String getKey() { + return key; + } + + public DataType getDataType() { + return dataType; + } + + public MetadataConverter getConverter() { + return converter; + } + + IcebergWritableMetadata(String key, DataType dataType, MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + + public interface MetadataConverter extends Serializable { + + Object read(RowData rowData, int pos); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/CommitSummary.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/CommitSummary.java new file mode 100644 index 0000000000..e387b90e3d --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/CommitSummary.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink; + +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +import java.util.Arrays; +import java.util.NavigableMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +class CommitSummary { + + private final AtomicLong dataFilesCount = new AtomicLong(); + private final AtomicLong dataFilesRecordCount = new AtomicLong(); + private final AtomicLong dataFilesByteCount = new AtomicLong(); + private final AtomicLong deleteFilesCount = new AtomicLong(); + private final AtomicLong deleteFilesRecordCount = new AtomicLong(); + private final AtomicLong deleteFilesByteCount = new AtomicLong(); + + CommitSummary(NavigableMap pendingResults) { + pendingResults + .values() + .forEach( + writeResult -> { + dataFilesCount.addAndGet(writeResult.dataFiles().length); + Arrays.stream(writeResult.dataFiles()) + .forEach( + dataFile -> { + dataFilesRecordCount.addAndGet(dataFile.recordCount()); + dataFilesByteCount.addAndGet(dataFile.fileSizeInBytes()); + }); + deleteFilesCount.addAndGet(writeResult.deleteFiles().length); + Arrays.stream(writeResult.deleteFiles()) + .forEach( + deleteFile -> { + deleteFilesRecordCount.addAndGet(deleteFile.recordCount()); + deleteFilesByteCount.addAndGet(deleteFile.fileSizeInBytes()); + }); + }); + } + + long dataFilesCount() { + return dataFilesCount.get(); + } + + long dataFilesRecordCount() { + return dataFilesRecordCount.get(); + } + + long dataFilesByteCount() { + return dataFilesByteCount.get(); + } + + long deleteFilesCount() { + return deleteFilesCount.get(); + } + + long deleteFilesRecordCount() { + return deleteFilesRecordCount.get(); + } + + long deleteFilesByteCount() { + return deleteFilesByteCount.get(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("dataFilesCount", dataFilesCount) + .add("dataFilesRecordCount", dataFilesRecordCount) + .add("dataFilesByteCount", dataFilesByteCount) + .add("deleteFilesCount", deleteFilesCount) + .add("deleteFilesRecordCount", deleteFilesRecordCount) + .add("deleteFilesByteCount", deleteFilesByteCount) + .toString(); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java new file mode 100644 index 0000000000..5ce9389a87 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink; + +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +import java.util.List; + +/** + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +class DeltaManifests { + + private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0]; + + private final ManifestFile dataManifest; + private final ManifestFile deleteManifest; + private final CharSequence[] referencedDataFiles; + + DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) { + this(dataManifest, deleteManifest, EMPTY_REF_DATA_FILES); + } + + DeltaManifests( + ManifestFile dataManifest, ManifestFile deleteManifest, CharSequence[] referencedDataFiles) { + Preconditions.checkNotNull(referencedDataFiles, "Referenced data files shouldn't be null."); + + this.dataManifest = dataManifest; + this.deleteManifest = deleteManifest; + this.referencedDataFiles = referencedDataFiles; + } + + ManifestFile dataManifest() { + return dataManifest; + } + + ManifestFile deleteManifest() { + return deleteManifest; + } + + CharSequence[] referencedDataFiles() { + return referencedDataFiles; + } + + List manifests() { + List manifests = Lists.newArrayListWithCapacity(2); + if (dataManifest != null) { + manifests.add(dataManifest); + } + + if (deleteManifest != null) { + manifests.add(deleteManifest); + } + + return manifests; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java new file mode 100644 index 0000000000..76e031f231 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +class DeltaManifestsSerializer implements SimpleVersionedSerializer { + + private static final int VERSION_1 = 1; + private static final int VERSION_2 = 2; + private static final byte[] EMPTY_BINARY = new byte[0]; + + static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer(); + + @Override + public int getVersion() { + return VERSION_2; + } + + @Override + public byte[] serialize(DeltaManifests deltaManifests) throws IOException { + Preconditions.checkNotNull( + deltaManifests, "DeltaManifests to be serialized should not be null"); + + ByteArrayOutputStream binaryOut = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(binaryOut); + + byte[] dataManifestBinary = EMPTY_BINARY; + if (deltaManifests.dataManifest() != null) { + dataManifestBinary = ManifestFiles.encode(deltaManifests.dataManifest()); + } + + out.writeInt(dataManifestBinary.length); + out.write(dataManifestBinary); + + byte[] deleteManifestBinary = EMPTY_BINARY; + if (deltaManifests.deleteManifest() != null) { + deleteManifestBinary = ManifestFiles.encode(deltaManifests.deleteManifest()); + } + + out.writeInt(deleteManifestBinary.length); + out.write(deleteManifestBinary); + + CharSequence[] referencedDataFiles = deltaManifests.referencedDataFiles(); + out.writeInt(referencedDataFiles.length); + for (CharSequence referencedDataFile : referencedDataFiles) { + out.writeUTF(referencedDataFile.toString()); + } + + return binaryOut.toByteArray(); + } + + @Override + public DeltaManifests deserialize(int version, byte[] serialized) throws IOException { + if (version == VERSION_1) { + return deserializeV1(serialized); + } else if (version == VERSION_2) { + return deserializeV2(serialized); + } else { + throw new RuntimeException("Unknown serialize version: " + version); + } + } + + private DeltaManifests deserializeV1(byte[] serialized) throws IOException { + return new DeltaManifests(ManifestFiles.decode(serialized), null); + } + + private DeltaManifests deserializeV2(byte[] serialized) throws IOException { + ManifestFile dataManifest = null; + ManifestFile deleteManifest = null; + + ByteArrayInputStream binaryIn = new ByteArrayInputStream(serialized); + DataInputStream in = new DataInputStream(binaryIn); + + int dataManifestSize = in.readInt(); + if (dataManifestSize > 0) { + byte[] dataManifestBinary = new byte[dataManifestSize]; + Preconditions.checkState(in.read(dataManifestBinary) == dataManifestSize); + + dataManifest = ManifestFiles.decode(dataManifestBinary); + } + + int deleteManifestSize = in.readInt(); + if (deleteManifestSize > 0) { + byte[] deleteManifestBinary = new byte[deleteManifestSize]; + Preconditions.checkState(in.read(deleteManifestBinary) == deleteManifestSize); + + deleteManifest = ManifestFiles.decode(deleteManifestBinary); + } + + int referenceDataFileNum = in.readInt(); + CharSequence[] referencedDataFiles = new CharSequence[referenceDataFileNum]; + for (int i = 0; i < referenceDataFileNum; i++) { + referencedDataFiles[i] = in.readUTF(); + } + + return new DeltaManifests(dataManifest, deleteManifest, referencedDataFiles); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java new file mode 100644 index 0000000000..c3e43585bf --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.util.StructProjection; + +import java.util.List; + +/** + * Create a {@link KeySelector} to shuffle by equality fields, to ensure same equality fields record + * will be emitted to same writer in order. + * + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +class EqualityFieldKeySelector implements KeySelector { + + private final Schema schema; + private final RowType flinkSchema; + private final Schema deleteSchema; + + private transient RowDataWrapper rowDataWrapper; + private transient StructProjection structProjection; + private transient StructLikeWrapper structLikeWrapper; + + EqualityFieldKeySelector(Schema schema, RowType flinkSchema, List equalityFieldIds) { + this.schema = schema; + this.flinkSchema = flinkSchema; + this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); + } + + /** + * Construct the {@link RowDataWrapper} lazily here because few members in it are not + * serializable. In this way, we don't have to serialize them with forcing. + */ + protected RowDataWrapper lazyRowDataWrapper() { + if (rowDataWrapper == null) { + rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + } + return rowDataWrapper; + } + + /** Construct the {@link StructProjection} lazily because it is not serializable. */ + protected StructProjection lazyStructProjection() { + if (structProjection == null) { + structProjection = StructProjection.create(schema, deleteSchema); + } + return structProjection; + } + + /** Construct the {@link StructLikeWrapper} lazily because it is not serializable. */ + protected StructLikeWrapper lazyStructLikeWrapper() { + if (structLikeWrapper == null) { + structLikeWrapper = StructLikeWrapper.forType(deleteSchema.asStruct()); + } + return structLikeWrapper; + } + + @Override + public Integer getKey(RowData row) { + RowDataWrapper wrappedRowData = lazyRowDataWrapper().wrap(row); + StructProjection projectedRowData = lazyStructProjection().wrap(wrappedRowData); + StructLikeWrapper wrapper = lazyStructLikeWrapper().set(projectedRowData); + return wrapper.hashCode(); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java new file mode 100644 index 0000000000..ef410ba3d3 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +class FlinkManifestUtil { + + private static final int FORMAT_V2 = 2; + private static final Long DUMMY_SNAPSHOT_ID = 0L; + + private FlinkManifestUtil() { + } + + static ManifestFile writeDataFiles( + OutputFile outputFile, PartitionSpec spec, List dataFiles) throws IOException { + ManifestWriter writer = + ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID); + + try (ManifestWriter closeableWriter = writer) { + closeableWriter.addAll(dataFiles); + } + + return writer.toManifestFile(); + } + + static List readDataFiles( + ManifestFile manifestFile, FileIO io, Map specsById) + throws IOException { + try (CloseableIterable dataFiles = ManifestFiles.read(manifestFile, io, specsById)) { + return Lists.newArrayList(dataFiles); + } + } + + static ManifestOutputFileFactory createOutputFileFactory( + Table table, String flinkJobId, String operatorUniqueId, int subTaskId, long attemptNumber) { + TableOperations ops = ((HasTableOperations) table).operations(); + return new ManifestOutputFileFactory( + ops, + table.io(), + table.properties(), + flinkJobId, + operatorUniqueId, + subTaskId, + attemptNumber); + } + + /** + * Write the {@link WriteResult} to temporary manifest files. + * + * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same + * partition spec + */ + static DeltaManifests writeCompletedFiles( + WriteResult result, Supplier outputFileSupplier, PartitionSpec spec) + throws IOException { + + ManifestFile dataManifest = null; + ManifestFile deleteManifest = null; + + // Write the completed data files into a newly created data manifest file. + if (result.dataFiles() != null && result.dataFiles().length > 0) { + dataManifest = + writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles())); + } + + // Write the completed delete files into a newly created delete manifest file. + if (result.deleteFiles() != null && result.deleteFiles().length > 0) { + OutputFile deleteManifestFile = outputFileSupplier.get(); + + ManifestWriter deleteManifestWriter = + ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID); + try (ManifestWriter writer = deleteManifestWriter) { + for (DeleteFile deleteFile : result.deleteFiles()) { + writer.add(deleteFile); + } + } + + deleteManifest = deleteManifestWriter.toManifestFile(); + } + + return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles()); + } + + static WriteResult readCompletedFiles( + DeltaManifests deltaManifests, FileIO io, Map specsById) + throws IOException { + WriteResult.Builder builder = WriteResult.builder(); + + // Read the completed data files from persisted data manifest file. + if (deltaManifests.dataManifest() != null) { + builder.addDataFiles(readDataFiles(deltaManifests.dataManifest(), io, specsById)); + } + + // Read the completed delete files from persisted delete manifests file. + if (deltaManifests.deleteManifest() != null) { + try (CloseableIterable deleteFiles = + ManifestFiles.readDeleteManifest(deltaManifests.deleteManifest(), io, specsById)) { + builder.addDeleteFiles(deleteFiles); + } + } + + return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()).build(); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java new file mode 100644 index 0000000000..28d034a06f --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.iceberg.utils.SinkMetadataUtils; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.FlinkWriteConf; +import org.apache.iceberg.flink.FlinkWriteOptions; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.flink.sink.TaskWriterFactory; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +import static org.apache.iceberg.TableProperties.*; +import static org.apache.inlong.sort.base.Constants.*; + +/** + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +public class FlinkSink { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class); + + private static final String ICEBERG_STREAM_WRITER_NAME = + IcebergStreamWriter.class.getSimpleName(); + private static final String ICEBERG_FILES_COMMITTER_NAME = + IcebergFilesCommitter.class.getSimpleName(); + + private FlinkSink() { + } + + /** + * Initialize a {@link Builder} to export the data from generic input data stream into iceberg + * table. We use {@link RowData} inside the sink connector, so users need to provide a mapper + * function and a {@link TypeInformation} to convert those generic records to a RowData + * DataStream. + * + * @param input the generic source input data stream. + * @param mapper function to convert the generic data to {@link RowData} + * @param outputType to define the {@link TypeInformation} for the input data. + * @param the data type of records. + * @return {@link Builder} to connect the iceberg table. + */ + public static Builder builderFor( + DataStream input, MapFunction mapper, TypeInformation outputType) { + return new Builder().forMapperOutputType(input, mapper, outputType); + } + + /** + * Initialize a {@link Builder} to export the data from input data stream with {@link Row}s into + * iceberg table. We use {@link RowData} inside the sink connector, so users need to provide a + * {@link TableSchema} for builder to convert those {@link Row}s to a {@link RowData} DataStream. + * + * @param input the source input data stream with {@link Row}s. + * @param tableSchema defines the {@link TypeInformation} for input data. + * @return {@link Builder} to connect the iceberg table. + */ + public static Builder forRow(DataStream input, TableSchema tableSchema) { + RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType(); + DataType[] fieldDataTypes = tableSchema.getFieldDataTypes(); + + DataFormatConverters.RowConverter rowConverter = + new DataFormatConverters.RowConverter(fieldDataTypes); + return builderFor(input, rowConverter::toInternal, FlinkCompatibilityUtil.toTypeInfo(rowType)) + .tableSchema(tableSchema); + } + + /** + * Initialize a {@link Builder} to export the data from input data stream with {@link RowData}s + * into iceberg table. + * + * @param input the source input data stream with {@link RowData}s. + * @return {@link Builder} to connect the iceberg table. + */ + public static Builder forRowData(DataStream input) { + return new Builder().forRowData(input); + } + + public static class Builder { + + private Function> inputCreator = null; + private TableLoader tableLoader; + private Table table; + private TableSchema tableSchema; + private List equalityFieldColumns = null; + private String uidPrefix = null; + private final Map snapshotProperties = Maps.newHashMap(); + private ReadableConfig readableConfig = new Configuration(); + private final Map writeOptions = Maps.newHashMap(); + private FlinkWriteConf flinkWriteConf = null; + private SinkMetadataUtils sinkMetadataUtils; + private MetricOption metricOption; + private String inlongAuditAddress; + private String inlongAuditKeys; + private String inlongMetrics; + + private Builder() { + } + + private Builder forRowData(DataStream newRowDataInput) { + this.inputCreator = ignored -> newRowDataInput; + return this; + } + + private Builder forMapperOutputType( + DataStream input, MapFunction mapper, TypeInformation outputType) { + this.inputCreator = + newUidPrefix -> { + // Input stream order is crucial for some situation(e.g. in cdc case). Therefore, we + // need to set the parallelism + // of map operator same as its input to keep map operator chaining its input, and avoid + // rebalanced by default. + SingleOutputStreamOperator inputStream = + input.map(mapper, outputType).setParallelism(input.getParallelism()); + if (newUidPrefix != null) { + inputStream.name(operatorName(newUidPrefix)).uid(newUidPrefix + "-mapper"); + } + return inputStream; + }; + return this; + } + + /** + * This iceberg {@link Table} instance is used for initializing {@link IcebergStreamWriter} + * which will write all the records into {@link DataFile}s and emit them to downstream operator. + * Providing a table would avoid so many table loading from each separate task. + * + * @param newTable the loaded iceberg table instance. + * @return {@link Builder} to connect the iceberg table. + */ + public Builder table(Table newTable) { + this.table = newTable; + return this; + } + + /** + * The table loader is used for loading tables in {@link IcebergFilesCommitter} lazily, we need + * this loader because {@link Table} is not serializable and could not just use the loaded table + * from Builder#table in the remote task manager. + * + * @param newTableLoader to load iceberg table inside tasks. + * @return {@link Builder} to connect the iceberg table. + */ + public Builder tableLoader(TableLoader newTableLoader) { + this.tableLoader = newTableLoader; + return this; + } + + /** + * Set the write properties for Flink sink. View the supported properties in {@link + * FlinkWriteOptions} + */ + public Builder set(String property, String value) { + writeOptions.put(property, value); + return this; + } + + public Builder SinkMetadataUtils(SinkMetadataUtils sinkMetadataUtils) { + this.sinkMetadataUtils = sinkMetadataUtils; + return this; + } + + public Builder metricOption(MetricOption metricOption) { + this.metricOption = metricOption; + return this; + } + + /** + * Set the write properties for Flink sink. View the supported properties in {@link + * FlinkWriteOptions} + */ + public Builder setAll(Map properties) { + writeOptions.putAll(properties); + return this; + } + + public Builder tableSchema(TableSchema newTableSchema) { + this.tableSchema = newTableSchema; + return this; + } + + public Builder overwrite(boolean newOverwrite) { + writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite)); + return this; + } + + public Builder flinkConf(ReadableConfig config) { + this.readableConfig = config; + return this; + } + + /** + * Configure the write {@link DistributionMode} that the flink sink will use. Currently, flink + * support {@link DistributionMode#NONE} and {@link DistributionMode#HASH}. + * + * @param mode to specify the write distribution mode. + * @return {@link Builder} to connect the iceberg table. + */ + public Builder distributionMode(DistributionMode mode) { + Preconditions.checkArgument( + !DistributionMode.RANGE.equals(mode), + "Flink does not support 'range' write distribution mode now."); + if (mode != null) { + writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName()); + } + return this; + } + + /** + * Configuring the write parallel number for iceberg stream writer. + * + * @param newWriteParallelism the number of parallel iceberg stream writer. + * @return {@link Builder} to connect the iceberg table. + */ + public Builder writeParallelism(int newWriteParallelism) { + writeOptions.put( + FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism)); + return this; + } + + /** + * All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which + * means it will DELETE the old records and then INSERT the new records. In partitioned table, + * the partition fields should be a subset of equality fields, otherwise the old row that + * located in partition-A could not be deleted by the new row that located in partition-B. + * + * @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT. + * @return {@link Builder} to connect the iceberg table. + */ + public Builder upsert(boolean enabled) { + writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled)); + return this; + } + + /** + * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events. + * + * @param columns defines the iceberg table's key. + * @return {@link Builder} to connect the iceberg table. + */ + public Builder equalityFieldColumns(List columns) { + this.equalityFieldColumns = columns; + return this; + } + + /** + * Set the uid prefix for FlinkSink operators. Note that FlinkSink internally consists of + * multiple operators (like writer, committer, dummy sink etc.) Actually operator uid will be + * appended with a suffix like "uidPrefix-writer".
+ *
+ * If provided, this prefix is also applied to operator names.
+ *
+ * Flink auto generates operator uid if not set explicitly. It is a recommended + * best-practice to set uid for all operators before deploying to production. Flink has an + * option to {@code pipeline.auto-generate-uid=false} to disable auto-generation and force + * explicit setting of all operator uid.
+ *
+ * Be careful with setting this for an existing job, because now we are changing the operator + * uid from an auto-generated one to this new value. When deploying the change with a + * checkpoint, Flink won't be able to restore the previous Flink sink operator state (more + * specifically the committer operator state). You need to use {@code --allowNonRestoredState} + * to ignore the previous sink state. During restore Flink sink state is used to check if last + * commit was actually successful or not. {@code --allowNonRestoredState} can lead to data loss + * if the Iceberg commit failed in the last completed checkpoint. + * + * @param newPrefix prefix for Flink sink operator uid and name + * @return {@link Builder} to connect the iceberg table. + */ + public Builder uidPrefix(String newPrefix) { + this.uidPrefix = newPrefix; + return this; + } + + public Builder setSnapshotProperties(Map properties) { + snapshotProperties.putAll(properties); + return this; + } + + public Builder setSnapshotProperty(String property, String value) { + snapshotProperties.put(property, value); + return this; + } + + public Builder toBranch(String branch) { + writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch); + return this; + } + + private DataStreamSink chainIcebergOperators() { + Preconditions.checkArgument( + inputCreator != null, + "Please use forRowData() or forMapperOutputType() to initialize the input DataStream."); + Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null"); + + DataStream rowDataInput = inputCreator.apply(uidPrefix); + + if (table == null) { + tableLoader.open(); + try (TableLoader loader = tableLoader) { + this.table = loader.loadTable(); + } catch (IOException e) { + throw new UncheckedIOException( + "Failed to load iceberg table from table loader: " + tableLoader, e); + } + } + + flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig); + + // Find out the equality field id list based on the user-provided equality field column names. + List equalityFieldIds = checkAndGetEqualityFieldIds(); + + // Convert the requested flink table schema to flink row type. + RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema); + + // Distribute the records from input data stream based on the write.distribution-mode and + // equality fields. + DataStream distributeStream = + distributeDataStream( + rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType); + + // Add parallel writers that append rows to files + resolveMetricOption(); + SingleOutputStreamOperator writerStream = + appendWriter(distributeStream, flinkRowType, equalityFieldIds, metricOption); + + // Add single-parallelism committer that commits files + // after successful checkpoint or end of input + SingleOutputStreamOperator committerStream = appendCommitter(writerStream); + + // Add dummy discard sink + return appendDummySink(committerStream); + } + + /** + * Append the iceberg sink operators to write records to iceberg table. + * + * @return {@link DataStreamSink} for sink. + */ + public DataStreamSink append() { + return chainIcebergOperators(); + } + + private void resolveMetricOption() { + if (metricOption != null) { + LOG.info("metric option is not null, no need to init it"); + return; + } + + if (StringUtils.isNullOrWhitespaceOnly(inlongAuditAddress)) { + inlongAuditAddress = writeOptions.get(INLONG_AUDIT.key()); + } + if (StringUtils.isNullOrWhitespaceOnly(inlongAuditKeys)) { + inlongAuditKeys = writeOptions.get(AUDIT_KEYS.key()); + } + if (StringUtils.isNullOrWhitespaceOnly(inlongMetrics)) { + inlongMetrics = writeOptions.get(INLONG_METRIC.key()); + } + LOG.info("start to init metric option with audit={}, keys={}, metric={}", + inlongAuditAddress, inlongAuditKeys, inlongMetrics); + this.metricOption = MetricOption.builder() + .withInlongLabels(inlongMetrics) + .withAuditAddress(inlongAuditAddress) + .withAuditKeys(inlongAuditKeys) + .withRegisterMetric(MetricOption.RegisteredMetric.ALL) + .build(); + } + + private String operatorName(String suffix) { + return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; + } + + @VisibleForTesting + List checkAndGetEqualityFieldIds() { + List equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds()); + if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) { + Set equalityFieldSet = + Sets.newHashSetWithExpectedSize(equalityFieldColumns.size()); + for (String column : equalityFieldColumns) { + org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column); + Preconditions.checkNotNull( + field, + "Missing required equality field column '%s' in table schema %s", + column, + table.schema()); + equalityFieldSet.add(field.fieldId()); + } + + if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) { + LOG.warn( + "The configured equality field column IDs {} are not matched with the schema identifier field IDs" + + " {}, use job specified equality field columns as the equality fields by default.", + equalityFieldSet, + table.schema().identifierFieldIds()); + } + equalityFieldIds = Lists.newArrayList(equalityFieldSet); + } + return equalityFieldIds; + } + + @SuppressWarnings("unchecked") + private DataStreamSink appendDummySink( + SingleOutputStreamOperator committerStream) { + DataStreamSink resultStream = + committerStream + .addSink(new DiscardingSink()) + .name(operatorName(String.format("IcebergSink %s", this.table.name()))) + .setParallelism(1); + if (uidPrefix != null) { + resultStream = resultStream.uid(uidPrefix + "-dummysink"); + } + return resultStream; + } + + private SingleOutputStreamOperator appendCommitter( + SingleOutputStreamOperator writerStream) { + IcebergFilesCommitter filesCommitter = + new IcebergFilesCommitter( + tableLoader, + flinkWriteConf.overwriteMode(), + snapshotProperties, + flinkWriteConf.workerPoolSize(), + flinkWriteConf.branch(), + table.spec()); + SingleOutputStreamOperator committerStream = + writerStream + .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter) + .setParallelism(1) + .setMaxParallelism(1); + if (uidPrefix != null) { + committerStream = committerStream.uid(uidPrefix + "-committer"); + } + return committerStream; + } + + private SingleOutputStreamOperator appendWriter( + DataStream input, RowType flinkRowType, List equalityFieldIds, + MetricOption metricOption) { + // Validate the equality fields and partition fields if we enable the upsert mode. + if (flinkWriteConf.upsertMode()) { + Preconditions.checkState( + !flinkWriteConf.overwriteMode(), + "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream."); + Preconditions.checkState( + !equalityFieldIds.isEmpty(), + "Equality field columns shouldn't be empty when configuring to use UPSERT data stream."); + if (!table.spec().isUnpartitioned()) { + for (PartitionField partitionField : table.spec().fields()) { + Preconditions.checkState( + equalityFieldIds.contains(partitionField.sourceId()), + "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'", + partitionField, + equalityFieldColumns); + } + } + } + + IcebergStreamWriter streamWriter = + createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds, sinkMetadataUtils, + metricOption); + + int parallelism = + flinkWriteConf.writeParallelism() == null + ? input.getParallelism() + : flinkWriteConf.writeParallelism(); + SingleOutputStreamOperator writerStream = + input + .transform( + operatorName(ICEBERG_STREAM_WRITER_NAME), + TypeInformation.of(WriteResult.class), + streamWriter) + .setParallelism(parallelism); + if (uidPrefix != null) { + writerStream = writerStream.uid(uidPrefix + "-writer"); + } + return writerStream; + } + + private DataStream distributeDataStream( + DataStream input, + List equalityFieldIds, + PartitionSpec partitionSpec, + Schema iSchema, + RowType flinkRowType) { + DistributionMode writeMode = flinkWriteConf.distributionMode(); + + LOG.info("Write distribution mode is '{}'", writeMode.modeName()); + switch (writeMode) { + case NONE: + if (equalityFieldIds.isEmpty()) { + return input; + } else { + LOG.info("Distribute rows by equality fields, because there are equality fields set"); + return input.keyBy( + new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds)); + } + + case HASH: + if (equalityFieldIds.isEmpty()) { + if (partitionSpec.isUnpartitioned()) { + LOG.warn( + "Fallback to use 'none' distribution mode, because there are no equality fields set " + + "and table is unpartitioned"); + return input; + } else { + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + } + } else { + if (partitionSpec.isUnpartitioned()) { + LOG.info( + "Distribute rows by equality fields, because there are equality fields set " + + "and table is unpartitioned"); + return input.keyBy( + new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds)); + } else { + for (PartitionField partitionField : partitionSpec.fields()) { + Preconditions.checkState( + equalityFieldIds.contains(partitionField.sourceId()), + "In 'hash' distribution mode with equality fields set, partition field '%s' " + + "should be included in equality fields: '%s'", + partitionField, + equalityFieldColumns); + } + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + } + } + + case RANGE: + if (equalityFieldIds.isEmpty()) { + LOG.warn( + "Fallback to use 'none' distribution mode, because there are no equality fields set " + + "and {}=range is not supported yet in flink", + WRITE_DISTRIBUTION_MODE); + return input; + } else { + LOG.info( + "Distribute rows by equality fields, because there are equality fields set " + + "and{}=range is not supported yet in flink", + WRITE_DISTRIBUTION_MODE); + return input.keyBy( + new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds)); + } + + default: + throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode); + } + } + } + + static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) { + if (requestedSchema != null) { + // Convert the flink schema to iceberg schema firstly, then reassign ids to match the existing + // iceberg schema. + Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema); + TypeUtil.validateWriteSchema(schema, writeSchema, true, true); + + // We use this flink schema to read values from RowData. The flink's TINYINT and SMALLINT will + // be promoted to + // iceberg INTEGER, that means if we use iceberg's table schema to read TINYINT (backend by 1 + // 'byte'), we will + // read 4 bytes rather than 1 byte, it will mess up the byte array in BinaryRowData. So here + // we must use flink + // schema. + return (RowType) requestedSchema.toRowDataType().getLogicalType(); + } else { + return FlinkSchemaUtil.convert(schema); + } + } + + static IcebergStreamWriter createStreamWriter( + Table table, + FlinkWriteConf flinkWriteConf, + RowType flinkRowType, + List equalityFieldIds, + SinkMetadataUtils sinkMetadataUtils, + MetricOption metricOption) { + Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null"); + + Table serializableTable = SerializableTable.copyOf(table); + FileFormat format = flinkWriteConf.dataFileFormat(); + TaskWriterFactory taskWriterFactory = + new RowDataTaskWriterFactory( + serializableTable, + flinkRowType, + flinkWriteConf.targetDataFileSize(), + format, + writeProperties(table, format, flinkWriteConf), + equalityFieldIds, + flinkWriteConf.upsertMode()); + return new IcebergStreamWriter<>(table.name(), taskWriterFactory, sinkMetadataUtils, metricOption); + } + + /** + * Based on the {@link FileFormat} overwrites the table level compression properties for the table + * write. + * + * @param table The table to get the table level settings + * @param format The FileFormat to use + * @param conf The write configuration + * @return The properties to use for writing + */ + private static Map writeProperties( + Table table, FileFormat format, FlinkWriteConf conf) { + Map writeProperties = Maps.newHashMap(table.properties()); + + switch (format) { + case PARQUET: + writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec()); + String parquetCompressionLevel = conf.parquetCompressionLevel(); + if (parquetCompressionLevel != null) { + writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); + } + + break; + case AVRO: + writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec()); + String avroCompressionLevel = conf.avroCompressionLevel(); + if (avroCompressionLevel != null) { + writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel()); + } + + break; + case ORC: + writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec()); + writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy()); + break; + default: + throw new IllegalArgumentException(String.format("Unknown file format %s", format)); + } + + return writeProperties; + } + +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java new file mode 100644 index 0000000000..b79c3974cf --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotUpdate; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.SortedMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +class IcebergFilesCommitter extends AbstractStreamOperator + implements + OneInputStreamOperator, + BoundedOneInput { + + private static final long serialVersionUID = 1L; + private static final long INITIAL_CHECKPOINT_ID = -1L; + private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; + + private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class); + private static final String FLINK_JOB_ID = "flink.job-id"; + private static final String OPERATOR_ID = "flink.operator-id"; + + // The max checkpoint id we've committed to iceberg table. As the flink's checkpoint is always + // increasing, so we could correctly commit all the data files whose checkpoint id is greater than + // the max committed one to iceberg table, for avoiding committing the same data files twice. This + // id will be attached to iceberg's meta when committing the iceberg transaction. + private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; + static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits"; + + // TableLoader to load iceberg table lazily. + private final TableLoader tableLoader; + private final boolean replacePartitions; + private final Map snapshotProperties; + + // A sorted map to maintain the completed data files for each pending checkpointId (which have not + // been committed to iceberg table). We need a sorted map here because there's possible that few + // checkpoints snapshot failed, for example: the 1st checkpoint have 2 data files <1, >, the 2st checkpoint have 1 data files <2, >. Snapshot for checkpoint#1 + // interrupted because of network/disk failure etc, while we don't expect any data loss in iceberg + // table. So we keep the finished files <1, > in memory and retry to commit iceberg + // table when the next checkpoint happen. + private final NavigableMap dataFilesPerCheckpoint = Maps.newTreeMap(); + + // The completed files cache for current checkpoint. Once the snapshot barrier received, it will + // be flushed to the 'dataFilesPerCheckpoint'. + private final List writeResultsOfCurrentCkpt = Lists.newArrayList(); + private final String branch; + + // It will have an unique identifier for one job. + private transient String flinkJobId; + private transient String operatorUniqueId; + private transient Table table; + private transient IcebergFilesCommitterMetrics committerMetrics; + private transient ManifestOutputFileFactory manifestOutputFileFactory; + private transient long maxCommittedCheckpointId; + private transient int continuousEmptyCheckpoints; + private transient int maxContinuousEmptyCommits; + // There're two cases that we restore from flink checkpoints: the first case is restoring from + // snapshot created by the same flink job; another case is restoring from snapshot created by + // another different job. For the second case, we need to maintain the old flink job's id in flink + // state backend to find the max-committed-checkpoint-id when traversing iceberg table's + // snapshots. + private static final ListStateDescriptor JOB_ID_DESCRIPTOR = + new ListStateDescriptor<>("iceberg-flink-job-id", BasicTypeInfo.STRING_TYPE_INFO); + private transient ListState jobIdState; + // All pending checkpoints states for this function. + private static final ListStateDescriptor> STATE_DESCRIPTOR = + buildStateDescriptor(); + private transient ListState> checkpointsState; + + private final Integer workerPoolSize; + private final PartitionSpec spec; + private transient ExecutorService workerPool; + + IcebergFilesCommitter( + TableLoader tableLoader, + boolean replacePartitions, + Map snapshotProperties, + Integer workerPoolSize, + String branch, + PartitionSpec spec) { + this.tableLoader = tableLoader; + this.replacePartitions = replacePartitions; + this.snapshotProperties = snapshotProperties; + this.workerPoolSize = workerPoolSize; + this.branch = branch; + this.spec = spec; + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + this.flinkJobId = getContainingTask().getEnvironment().getJobID().toString(); + this.operatorUniqueId = getRuntimeContext().getOperatorUniqueID(); + + // Open the table loader and load the table. + this.tableLoader.open(); + this.table = tableLoader.loadTable(); + this.committerMetrics = new IcebergFilesCommitterMetrics(super.metrics, table.name()); + + maxContinuousEmptyCommits = + PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10); + Preconditions.checkArgument( + maxContinuousEmptyCommits > 0, MAX_CONTINUOUS_EMPTY_COMMITS + " must be positive"); + + int subTaskId = getRuntimeContext().getIndexOfThisSubtask(); + int attemptId = getRuntimeContext().getAttemptNumber(); + this.manifestOutputFileFactory = + FlinkManifestUtil.createOutputFileFactory( + table, flinkJobId, operatorUniqueId, subTaskId, attemptId); + this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID; + + this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR); + this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR); + if (context.isRestored()) { + Iterable jobIdIterable = jobIdState.get(); + if (jobIdIterable == null || !jobIdIterable.iterator().hasNext()) { + LOG.warn( + "Failed to restore committer state. This can happen when operator uid changed and Flink " + + "allowNonRestoredState is enabled. Best practice is to explicitly set the operator id " + + "via FlinkSink#Builder#uidPrefix() so that the committer operator uid is stable. " + + "Otherwise, Flink auto generate an operator uid based on job topology." + + "With that, operator uid is subjective to change upon topology change."); + return; + } + + String restoredFlinkJobId = jobIdIterable.iterator().next(); + Preconditions.checkState( + !Strings.isNullOrEmpty(restoredFlinkJobId), + "Flink job id parsed from checkpoint snapshot shouldn't be null or empty"); + + // Since flink's checkpoint id will start from the max-committed-checkpoint-id + 1 in the new + // flink job even if it's restored from a snapshot created by another different flink job, so + // it's safe to assign the max committed checkpoint id from restored flink job to the current + // flink job. + this.maxCommittedCheckpointId = + getMaxCommittedCheckpointId(table, restoredFlinkJobId, operatorUniqueId, branch); + + NavigableMap uncommittedDataFiles = + Maps.newTreeMap(checkpointsState.get().iterator().next()) + .tailMap(maxCommittedCheckpointId, false); + if (!uncommittedDataFiles.isEmpty()) { + // Committed all uncommitted data files from the old flink job to iceberg table. + long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey(); + commitUpToCheckpoint( + uncommittedDataFiles, restoredFlinkJobId, operatorUniqueId, maxUncommittedCheckpointId); + } + } + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + long checkpointId = context.getCheckpointId(); + LOG.info( + "Start to flush snapshot state to state backend, table: {}, checkpointId: {}", + table, + checkpointId); + + // Update the checkpoint state. + long startNano = System.nanoTime(); + dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId)); + // Reset the snapshot state to the latest state. + checkpointsState.clear(); + checkpointsState.add(dataFilesPerCheckpoint); + + jobIdState.clear(); + jobIdState.add(flinkJobId); + + // Clear the local buffer for current checkpoint. + writeResultsOfCurrentCkpt.clear(); + committerMetrics.checkpointDuration( + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano)); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + super.notifyCheckpointComplete(checkpointId); + // It's possible that we have the following events: + // 1. snapshotState(ckpId); + // 2. snapshotState(ckpId+1); + // 3. notifyCheckpointComplete(ckpId+1); + // 4. notifyCheckpointComplete(ckpId); + // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all + // the files, + // Besides, we need to maintain the max-committed-checkpoint-id to be increasing. + if (checkpointId > maxCommittedCheckpointId) { + LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId); + commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId); + this.maxCommittedCheckpointId = checkpointId; + } else { + LOG.info( + "Skipping committing checkpoint {}. {} is already committed.", + checkpointId, + maxCommittedCheckpointId); + } + } + + private void commitUpToCheckpoint( + NavigableMap deltaManifestsMap, + String newFlinkJobId, + String operatorId, + long checkpointId) + throws IOException { + NavigableMap pendingMap = deltaManifestsMap.headMap(checkpointId, true); + List manifests = Lists.newArrayList(); + NavigableMap pendingResults = Maps.newTreeMap(); + for (Map.Entry e : pendingMap.entrySet()) { + if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) { + // Skip the empty flink manifest. + continue; + } + + DeltaManifests deltaManifests = + SimpleVersionedSerialization.readVersionAndDeSerialize( + DeltaManifestsSerializer.INSTANCE, e.getValue()); + pendingResults.put( + e.getKey(), + FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs())); + manifests.addAll(deltaManifests.manifests()); + } + + CommitSummary summary = new CommitSummary(pendingResults); + commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId, checkpointId); + committerMetrics.updateCommitSummary(summary); + pendingMap.clear(); + deleteCommittedManifests(manifests, newFlinkJobId, checkpointId); + } + + private void commitPendingResult( + NavigableMap pendingResults, + CommitSummary summary, + String newFlinkJobId, + String operatorId, + long checkpointId) { + long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount(); + continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0; + if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) { + if (replacePartitions) { + replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, checkpointId); + } else { + commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId); + } + continuousEmptyCheckpoints = 0; + } else { + LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId); + } + } + + private void deleteCommittedManifests( + List manifests, String newFlinkJobId, long checkpointId) { + for (ManifestFile manifest : manifests) { + try { + table.io().deleteFile(manifest.path()); + } catch (Exception e) { + // The flink manifests cleaning failure shouldn't abort the completed checkpoint. + String details = + MoreObjects.toStringHelper(this) + .add("flinkJobId", newFlinkJobId) + .add("checkpointId", checkpointId) + .add("manifestPath", manifest.path()) + .toString(); + LOG.warn( + "The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}", + details, + e); + } + } + } + + private void replacePartitions( + NavigableMap pendingResults, + CommitSummary summary, + String newFlinkJobId, + String operatorId, + long checkpointId) { + Preconditions.checkState( + summary.deleteFilesCount() == 0, "Cannot overwrite partitions with delete files."); + // Commit the overwrite transaction. + ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool); + for (WriteResult result : pendingResults.values()) { + Preconditions.checkState( + result.referencedDataFiles().length == 0, "Should have no referenced data files."); + Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile); + } + + commitOperation( + dynamicOverwrite, + summary, + "dynamic partition overwrite", + newFlinkJobId, + operatorId, + checkpointId); + } + + private void commitDeltaTxn( + NavigableMap pendingResults, + CommitSummary summary, + String newFlinkJobId, + String operatorId, + long checkpointId) { + if (summary.deleteFilesCount() == 0) { + // To be compatible with iceberg format V1. + AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool); + for (WriteResult result : pendingResults.values()) { + Preconditions.checkState( + result.referencedDataFiles().length == 0, + "Should have no referenced data files for append."); + Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); + } + commitOperation(appendFiles, summary, "append", newFlinkJobId, operatorId, checkpointId); + } else { + // To be compatible with iceberg format V2. + for (Map.Entry e : pendingResults.entrySet()) { + // We don't commit the merged result into a single transaction because for the sequential + // transaction txn1 and txn2, the equality-delete files of txn2 are required to be applied + // to data files from txn1. Committing the merged one will lead to the incorrect delete + // semantic. + WriteResult result = e.getValue(); + + // Row delta validations are not needed for streaming changes that write equality deletes. + // Equality deletes are applied to data in all previous sequence numbers, so retries may + // push deletes further in the future, but do not affect correctness. Position deletes + // committed to the table in this path are used only to delete rows from data files that are + // being added in this commit. There is no way for data files added along with the delete + // files to be concurrently removed, so there is no need to validate the files referenced by + // the position delete files that are being committed. + RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool); + + Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); + Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); + commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, e.getKey()); + } + } + } + + private void commitOperation( + SnapshotUpdate operation, + CommitSummary summary, + String description, + String newFlinkJobId, + String operatorId, + long checkpointId) { + LOG.info( + "Committing {} for checkpoint {} to table {} branch {} with summary: {}", + description, + checkpointId, + table.name(), + branch, + summary); + snapshotProperties.forEach(operation::set); + // custom snapshot metadata properties will be overridden if they conflict with internal ones + // used by the sink. + operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); + operation.set(FLINK_JOB_ID, newFlinkJobId); + operation.set(OPERATOR_ID, operatorId); + operation.toBranch(branch); + + long startNano = System.nanoTime(); + operation.commit(); // abort is automatically called if this fails. + long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano); + LOG.info( + "Committed {} to table: {}, branch: {}, checkpointId {} in {} ms", + description, + table.name(), + branch, + checkpointId, + durationMs); + committerMetrics.commitDuration(durationMs); + } + + @Override + public void processElement(StreamRecord element) { + this.writeResultsOfCurrentCkpt.add(element.getValue()); + } + + @Override + public void endInput() throws IOException { + // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. + long currentCheckpointId = Long.MAX_VALUE; + dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId)); + writeResultsOfCurrentCkpt.clear(); + + commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, currentCheckpointId); + } + + /** + * Write all the complete data files to a newly created manifest file and return the manifest's + * avro serialized bytes. + */ + private byte[] writeToManifest(long checkpointId) throws IOException { + if (writeResultsOfCurrentCkpt.isEmpty()) { + return EMPTY_MANIFEST_DATA; + } + + WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build(); + DeltaManifests deltaManifests = + FlinkManifestUtil.writeCompletedFiles( + result, () -> manifestOutputFileFactory.create(checkpointId), spec); + + return SimpleVersionedSerialization.writeVersionAndSerialize( + DeltaManifestsSerializer.INSTANCE, deltaManifests); + } + + @Override + public void open() throws Exception { + super.open(); + + final String operatorID = getRuntimeContext().getOperatorUniqueID(); + this.workerPool = + ThreadPools.newWorkerPool("iceberg-worker-pool-" + operatorID, workerPoolSize); + } + + @Override + public void close() throws Exception { + if (tableLoader != null) { + tableLoader.close(); + } + + if (workerPool != null) { + workerPool.shutdown(); + } + } + + @VisibleForTesting + static ListStateDescriptor> buildStateDescriptor() { + Comparator longComparator = Comparators.forType(Types.LongType.get()); + // Construct a SortedMapTypeInfo. + SortedMapTypeInfo sortedMapTypeInfo = + new SortedMapTypeInfo<>( + BasicTypeInfo.LONG_TYPE_INFO, + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, + longComparator); + return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo); + } + + static long getMaxCommittedCheckpointId( + Table table, String flinkJobId, String operatorId, String branch) { + Snapshot snapshot = table.snapshot(branch); + long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID; + + while (snapshot != null) { + Map summary = snapshot.summary(); + String snapshotFlinkJobId = summary.get(FLINK_JOB_ID); + String snapshotOperatorId = summary.get(OPERATOR_ID); + if (flinkJobId.equals(snapshotFlinkJobId) + && (snapshotOperatorId == null || snapshotOperatorId.equals(operatorId))) { + String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID); + if (value != null) { + lastCommittedCheckpointId = Long.parseLong(value); + break; + } + } + Long parentSnapshotId = snapshot.parentId(); + snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; + } + + return lastCommittedCheckpointId; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitterMetrics.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitterMetrics.java new file mode 100644 index 0000000000..2e68830982 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitterMetrics.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +class IcebergFilesCommitterMetrics { + + private final AtomicLong lastCheckpointDurationMs = new AtomicLong(); + private final AtomicLong lastCommitDurationMs = new AtomicLong(); + private final ElapsedTimeGauge elapsedSecondsSinceLastSuccessfulCommit; + private final Counter committedDataFilesCount; + private final Counter committedDataFilesRecordCount; + private final Counter committedDataFilesByteCount; + private final Counter committedDeleteFilesCount; + private final Counter committedDeleteFilesRecordCount; + private final Counter committedDeleteFilesByteCount; + + IcebergFilesCommitterMetrics(MetricGroup metrics, String fullTableName) { + MetricGroup committerMetrics = + metrics.addGroup("IcebergFilesCommitter").addGroup("table", fullTableName); + committerMetrics.gauge("lastCheckpointDurationMs", lastCheckpointDurationMs::get); + committerMetrics.gauge("lastCommitDurationMs", lastCommitDurationMs::get); + this.elapsedSecondsSinceLastSuccessfulCommit = new ElapsedTimeGauge(TimeUnit.SECONDS); + committerMetrics.gauge( + "elapsedSecondsSinceLastSuccessfulCommit", elapsedSecondsSinceLastSuccessfulCommit); + this.committedDataFilesCount = committerMetrics.counter("committedDataFilesCount"); + this.committedDataFilesRecordCount = committerMetrics.counter("committedDataFilesRecordCount"); + this.committedDataFilesByteCount = committerMetrics.counter("committedDataFilesByteCount"); + this.committedDeleteFilesCount = committerMetrics.counter("committedDeleteFilesCount"); + this.committedDeleteFilesRecordCount = + committerMetrics.counter("committedDeleteFilesRecordCount"); + this.committedDeleteFilesByteCount = committerMetrics.counter("committedDeleteFilesByteCount"); + } + + void checkpointDuration(long checkpointDurationMs) { + lastCheckpointDurationMs.set(checkpointDurationMs); + } + + void commitDuration(long commitDurationMs) { + lastCommitDurationMs.set(commitDurationMs); + } + + /** This is called upon a successful commit. */ + void updateCommitSummary(CommitSummary stats) { + elapsedSecondsSinceLastSuccessfulCommit.refreshLastRecordedTime(); + committedDataFilesCount.inc(stats.dataFilesCount()); + committedDataFilesRecordCount.inc(stats.dataFilesRecordCount()); + committedDataFilesByteCount.inc(stats.dataFilesByteCount()); + committedDeleteFilesCount.inc(stats.deleteFilesCount()); + committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount()); + committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount()); + } + + /** + * This gauge measures the elapsed time between now and last recorded time set by {@link + * ElapsedTimeGauge#refreshLastRecordedTime()}. + */ + private static class ElapsedTimeGauge implements Gauge { + + private final TimeUnit reportUnit; + private volatile long lastRecordedTimeNano; + + ElapsedTimeGauge(TimeUnit timeUnit) { + this.reportUnit = timeUnit; + this.lastRecordedTimeNano = System.nanoTime(); + } + + void refreshLastRecordedTime() { + this.lastRecordedTimeNano = System.nanoTime(); + } + + @Override + public Long getValue() { + return reportUnit.convert(System.nanoTime() - lastRecordedTimeNano, TimeUnit.NANOSECONDS); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java new file mode 100644 index 0000000000..0cf31c206e --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.iceberg.utils.SinkMetadataUtils; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.flink.sink.TaskWriterFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +class IcebergStreamWriter extends AbstractStreamOperator + implements + OneInputStreamOperator, + BoundedOneInput { + + private static final long serialVersionUID = 1L; + + private final String fullTableName; + private final TaskWriterFactory taskWriterFactory; + private final SinkMetadataUtils sinkMetadataUtils; + private final MetricOption metricOption; + private transient TaskWriter writer; + private transient int subTaskId; + private transient int attemptId; + private transient IcebergStreamWriterMetrics writerMetrics; + + IcebergStreamWriter(String fullTableName, TaskWriterFactory taskWriterFactory, + SinkMetadataUtils sinkMetadataUtils, MetricOption metricOption) { + this.fullTableName = fullTableName; + this.taskWriterFactory = taskWriterFactory; + this.sinkMetadataUtils = sinkMetadataUtils; + this.metricOption = metricOption; + setChainingStrategy(ChainingStrategy.ALWAYS); + } + + @Override + public void open() { + this.subTaskId = getRuntimeContext().getIndexOfThisSubtask(); + this.attemptId = getRuntimeContext().getAttemptNumber(); + this.writerMetrics = new IcebergStreamWriterMetrics(super.metrics, fullTableName); + this.writerMetrics.registerMetrics(metricOption); + + // Initialize the task writer factory. + this.taskWriterFactory.initialize(subTaskId, attemptId); + + // Initialize the task writer. + this.writer = taskWriterFactory.create(); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + flush(); + this.writer = taskWriterFactory.create(); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + T data = element.getValue(); + writerMetrics.outputMetricsWithEstimate(sinkMetadataUtils.getDataSize(data), + sinkMetadataUtils.getDataTime(data)); + writer.write(data); + } + + @Override + public void close() throws Exception { + super.close(); + if (writer != null) { + writer.close(); + writer = null; + } + } + + @Override + public void endInput() throws IOException { + // For bounded stream, it may don't enable the checkpoint mechanism so we'd better to emit the + // remaining completed files to downstream before closing the writer so that we won't miss any + // of them. + // Note that if the task is not closed after calling endInput, checkpoint may be triggered again + // causing files to be sent repeatedly, the writer is marked as null after the last file is sent + // to guard against duplicated writes. + flush(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("table_name", fullTableName) + .add("subtask_id", subTaskId) + .add("attempt_id", attemptId) + .toString(); + } + + /** close all open files and emit files to downstream committer operator */ + private void flush() throws IOException { + if (writer == null) { + return; + } + + long startNano = System.nanoTime(); + WriteResult result = writer.complete(); + writerMetrics.updateFlushResult(result); + output.collect(new StreamRecord<>(result)); + writerMetrics.flushDuration(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano)); + + // Set writer to null to prevent duplicate flushes in the corner case of + // prepareSnapshotPreBarrier happening after endInput. + writer = null; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java new file mode 100644 index 0000000000..1d627714bc --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SourceMetricData; + +import com.codahale.metrics.SlidingWindowReservoir; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; +import org.apache.iceberg.io.WriteResult; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +@Slf4j +class IcebergStreamWriterMetrics { + + // 1,024 reservoir size should cost about 8KB, which is quite small. + // It should also produce good accuracy for histogram distribution (like percentiles). + private static final int HISTOGRAM_RESERVOIR_SIZE = 1024; + + private MetricGroup metrics; + private SourceMetricData sourceMetricData; + + private final Counter flushedDataFiles; + private final Counter flushedDeleteFiles; + private final Counter flushedReferencedDataFiles; + private final AtomicLong lastFlushDurationMs; + private final Histogram dataFilesSizeHistogram; + private final Histogram deleteFilesSizeHistogram; + + IcebergStreamWriterMetrics(MetricGroup metrics, String fullTableName) { + this.metrics = metrics; + MetricGroup writerMetrics = + metrics.addGroup("IcebergStreamWriter").addGroup("table", fullTableName); + this.flushedDataFiles = writerMetrics.counter("flushedDataFiles"); + this.flushedDeleteFiles = writerMetrics.counter("flushedDeleteFiles"); + this.flushedReferencedDataFiles = writerMetrics.counter("flushedReferencedDataFiles"); + this.lastFlushDurationMs = new AtomicLong(); + writerMetrics.gauge("lastFlushDurationMs", lastFlushDurationMs::get); + + com.codahale.metrics.Histogram dropwizardDataFilesSizeHistogram = + new com.codahale.metrics.Histogram(new SlidingWindowReservoir(HISTOGRAM_RESERVOIR_SIZE)); + this.dataFilesSizeHistogram = + writerMetrics.histogram( + "dataFilesSizeHistogram", + new DropwizardHistogramWrapper(dropwizardDataFilesSizeHistogram)); + com.codahale.metrics.Histogram dropwizardDeleteFilesSizeHistogram = + new com.codahale.metrics.Histogram(new SlidingWindowReservoir(HISTOGRAM_RESERVOIR_SIZE)); + this.deleteFilesSizeHistogram = + writerMetrics.histogram( + "deleteFilesSizeHistogram", + new DropwizardHistogramWrapper(dropwizardDeleteFilesSizeHistogram)); + } + + public void registerMetrics(MetricOption metricOption) { + if (metricOption != null) { + sourceMetricData = new SourceMetricData(metricOption, metrics); + } else { + log.warn("failed to init sourceMetricData since the metricOption is null"); + } + } + + void updateFlushResult(WriteResult result) { + flushedDataFiles.inc(result.dataFiles().length); + flushedDeleteFiles.inc(result.deleteFiles().length); + flushedReferencedDataFiles.inc(result.referencedDataFiles().length); + + // For file size distribution histogram, we don't have to update them after successful commits. + // This should works equally well and we avoided the overhead of tracking the list of file sizes + // in the {@link CommitSummary}, which currently stores simple stats for counters and gauges + // metrics. + Arrays.stream(result.dataFiles()) + .forEach( + dataFile -> { + dataFilesSizeHistogram.update(dataFile.fileSizeInBytes()); + }); + Arrays.stream(result.deleteFiles()) + .forEach( + deleteFile -> { + deleteFilesSizeHistogram.update(deleteFile.fileSizeInBytes()); + }); + } + + void flushDuration(long flushDurationMs) { + lastFlushDurationMs.set(flushDurationMs); + } + + void outputMetricsWithEstimate(int size, long time) { + if (sourceMetricData != null) { + sourceMetricData.outputMetrics(1, size, time); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergTableSink.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergTableSink.java new file mode 100644 index 0000000000..69167e8b7f --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergTableSink.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink; + +import org.apache.inlong.sort.iceberg.IcebergReadableMetadata; +import org.apache.inlong.sort.iceberg.utils.SinkMetadataUtils; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.constraints.UniqueConstraint; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.ProviderContext; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; +import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; +import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +public class IcebergTableSink + implements + DynamicTableSink, + SupportsPartitioning, + SupportsOverwrite, + SupportsWritingMetadata { + + private final TableLoader tableLoader; + private final TableSchema tableSchema; + private final ReadableConfig readableConfig; + private final Map writeProps; + protected DataType consumedDataType; + protected List metadataKeys; + + private boolean overwrite = false; + + private IcebergTableSink(IcebergTableSink toCopy) { + this.tableLoader = toCopy.tableLoader; + this.tableSchema = toCopy.tableSchema; + this.overwrite = toCopy.overwrite; + this.readableConfig = toCopy.readableConfig; + this.writeProps = toCopy.writeProps; + this.metadataKeys = toCopy.metadataKeys; + this.consumedDataType = toCopy.consumedDataType; + } + + public IcebergTableSink( + TableLoader tableLoader, + TableSchema tableSchema, + ReadableConfig readableConfig, + Map writeProps) { + this.tableLoader = tableLoader; + this.tableSchema = tableSchema; + this.readableConfig = readableConfig; + this.writeProps = writeProps; + this.metadataKeys = ImmutableList.of(); + this.consumedDataType = tableSchema.toPhysicalRowDataType(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + Preconditions.checkState( + !overwrite || context.isBounded(), + "Unbounded data stream doesn't support overwrite operation."); + + List equalityColumns = + tableSchema.getPrimaryKey().map(UniqueConstraint::getColumns).orElseGet(ImmutableList::of); + SinkMetadataUtils sinkMetadataUtils = new SinkMetadataUtils(metadataKeys, consumedDataType); + return new DataStreamSinkProvider() { + + @Override + public DataStreamSink consumeDataStream( + ProviderContext providerContext, DataStream dataStream) { + return FlinkSink.forRowData(dataStream) + .tableLoader(tableLoader) + .tableSchema(tableSchema) + .equalityFieldColumns(equalityColumns) + .overwrite(overwrite) + .setAll(writeProps) + .flinkConf(readableConfig) + .SinkMetadataUtils(sinkMetadataUtils) + .append(); + } + }; + } + + @Override + public void applyStaticPartition(Map partition) { + // The flink's PartitionFanoutWriter will handle the static partition write policy + // automatically. + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + ChangelogMode.Builder builder = ChangelogMode.newBuilder(); + for (RowKind kind : requestedMode.getContainedKinds()) { + builder.addContainedKind(kind); + } + return builder.build(); + } + + @Override + public DynamicTableSink copy() { + return new IcebergTableSink(this); + } + + @Override + public String asSummaryString() { + return "Iceberg table sink"; + } + + @Override + public void applyOverwrite(boolean newOverwrite) { + this.overwrite = newOverwrite; + } + + @Override + public Map listWritableMetadata() { + return Stream.of(IcebergReadableMetadata.values()) + .collect( + Collectors.toMap( + IcebergReadableMetadata::getKey, IcebergReadableMetadata::getDataType)); + } + + @Override + public void applyWritableMetadata(List metadataKeys, DataType consumedDataType) { + this.metadataKeys = metadataKeys; + this.consumedDataType = consumedDataType; + } + +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java new file mode 100644 index 0000000000..c2039cdc5a --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Strings; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +class ManifestOutputFileFactory { + + // Users could define their own flink manifests directory by setting this value in table + // properties. + static final String FLINK_MANIFEST_LOCATION = "flink.manifests.location"; + + private final TableOperations ops; + private final FileIO io; + private final Map props; + private final String flinkJobId; + private final String operatorUniqueId; + private final int subTaskId; + private final long attemptNumber; + private final AtomicInteger fileCount = new AtomicInteger(0); + + ManifestOutputFileFactory( + TableOperations ops, + FileIO io, + Map props, + String flinkJobId, + String operatorUniqueId, + int subTaskId, + long attemptNumber) { + this.ops = ops; + this.io = io; + this.props = props; + this.flinkJobId = flinkJobId; + this.operatorUniqueId = operatorUniqueId; + this.subTaskId = subTaskId; + this.attemptNumber = attemptNumber; + } + + private String generatePath(long checkpointId) { + return FileFormat.AVRO.addExtension( + String.format( + "%s-%s-%05d-%d-%d-%05d", + flinkJobId, + operatorUniqueId, + subTaskId, + attemptNumber, + checkpointId, + fileCount.incrementAndGet())); + } + + OutputFile create(long checkpointId) { + String flinkManifestDir = props.get(FLINK_MANIFEST_LOCATION); + + String newManifestFullPath; + if (Strings.isNullOrEmpty(flinkManifestDir)) { + // User don't specify any flink manifest directory, so just use the default metadata path. + newManifestFullPath = ops.metadataFileLocation(generatePath(checkpointId)); + } else { + newManifestFullPath = + String.format("%s/%s", stripTrailingSlash(flinkManifestDir), generatePath(checkpointId)); + } + + return io.newOutputFile(newManifestFullPath); + } + + private static String stripTrailingSlash(String path) { + String result = path; + while (result.endsWith("/")) { + result = result.substring(0, result.length() - 1); + } + return result; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java new file mode 100644 index 0000000000..61b5f705b1 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.flink.sink.FlinkSink; + +/** + * Create a {@link KeySelector} to shuffle by partition key, then each partition/bucket will be + * wrote by only one task. That will reduce lots of small files in partitioned fanout write policy + * for {@link FlinkSink}. + * + * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1 + */ +class PartitionKeySelector implements KeySelector { + + private final Schema schema; + private final PartitionKey partitionKey; + private final RowType flinkSchema; + + private transient RowDataWrapper rowDataWrapper; + + PartitionKeySelector(PartitionSpec spec, Schema schema, RowType flinkSchema) { + this.schema = schema; + this.partitionKey = new PartitionKey(spec, schema); + this.flinkSchema = flinkSchema; + } + + /** + * Construct the {@link RowDataWrapper} lazily here because few members in it are not + * serializable. In this way, we don't have to serialize them with forcing. + */ + private RowDataWrapper lazyRowDataWrapper() { + if (rowDataWrapper == null) { + rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + } + return rowDataWrapper; + } + + @Override + public String getKey(RowData row) { + partitionKey.partition(lazyRowDataWrapper().wrap(row)); + return partitionKey.toPath(); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java index a2431d3fbd..252ae4580d 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java @@ -19,7 +19,7 @@ import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.SourceMetricData; -import org.apache.inlong.sort.iceberg.source.utils.RecyclableJoinedRowData; +import org.apache.inlong.sort.iceberg.utils.RecyclableJoinedRowData; import lombok.extern.slf4j.Slf4j; import org.apache.flink.metrics.MetricGroup; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java index f17186449a..7ab2721f89 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java @@ -18,8 +18,8 @@ package org.apache.inlong.sort.iceberg.source.reader; import org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter; -import org.apache.inlong.sort.iceberg.source.utils.RecyclableJoinedRowData; -import org.apache.inlong.sort.iceberg.source.utils.RowDataCloneUtil; +import org.apache.inlong.sort.iceberg.utils.RecyclableJoinedRowData; +import org.apache.inlong.sort.iceberg.utils.RowDataCloneUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.data.RowData; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/RecyclableJoinedRowData.java similarity index 99% rename from inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java rename to inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/RecyclableJoinedRowData.java index 4dcec331d4..dfe589bd15 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/RecyclableJoinedRowData.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.sort.iceberg.source.utils; +package org.apache.inlong.sort.iceberg.utils; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.data.ArrayData; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RowDataCloneUtil.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/RowDataCloneUtil.java similarity index 97% rename from inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RowDataCloneUtil.java rename to inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/RowDataCloneUtil.java index 58e541f0f1..287876f86e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RowDataCloneUtil.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/RowDataCloneUtil.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.sort.iceberg.source.utils; +package org.apache.inlong.sort.iceberg.utils; import org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java new file mode 100644 index 0000000000..cd303c0234 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/utils/SinkMetadataUtils.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.utils; + +import org.apache.inlong.sort.base.Constants; +import org.apache.inlong.sort.iceberg.IcebergWritableMetadata; + +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.relocated.com.google.common.collect.BiMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Set; +import java.util.stream.Stream; + +/** + * Use for parse sink metadata by pos or name. + */ +@Slf4j +public class SinkMetadataUtils implements Serializable { + + private static final long serialVersionUID = 1L; + private final int DATA_TIME_INDEX; + private final BiMap field2posMap; + private final BiMap pos2Field; + private final IcebergWritableMetadata.MetadataConverter[] converters; + + public SinkMetadataUtils(List metadataKeys, DataType consumedDataType) { + Set metadataKeySet = ImmutableSet.copyOf(metadataKeys); + List metaFields = ((RowType) consumedDataType.getLogicalType()).getFields(); + + // get related converters by real keys + // the pos of physical column will be replaced by IcebergWritableMetadata.NULL + this.converters = metaFields.stream() + .map(RowType.RowField::getName) + .map(key -> Stream.of(IcebergWritableMetadata.values()) + .filter(m -> m.getKey().equals(key)) + .findFirst() + .orElse(IcebergWritableMetadata.NULL)) + .map(IcebergWritableMetadata::getConverter) + .toArray(IcebergWritableMetadata.MetadataConverter[]::new); + + // parse bimap of key name and pos + ImmutableBiMap.Builder builder = ImmutableBiMap.builder(); + for (int i = 0; i < metaFields.size(); i++) { + String name = metaFields.get(i).getName(); + if (metadataKeySet.contains(name)) { + builder.put(name, i); + } + } + this.field2posMap = builder.build(); + this.pos2Field = field2posMap.inverse(); + + // for audit time + DATA_TIME_INDEX = field2posMap.getOrDefault(Constants.META_AUDIT_DATA_TIME, -1); + } + + public Integer getMetadataPosByName(String name) { + return field2posMap.get(name); + } + + public String getMetadataNameByPos(Integer pos) { + return pos2Field.get(pos); + } + + public Object convertByName(RowData row, String name) { + Integer pos = getMetadataPosByName(name); + return pos == null ? null : converters[pos].read(row, pos); + } + + public Object covertByPos(RowData row, int pos) { + return pos < 0 ? null : converters[pos].read(row, pos); + } + + public long getDataTime(Object data) { + if (DATA_TIME_INDEX < 0 || !(data instanceof RowData)) { + return System.currentTimeMillis(); + } + return (Long) covertByPos((RowData) data, DATA_TIME_INDEX); + } + + public int getDataSize(Object data) { + return data.toString().getBytes(StandardCharsets.UTF_8).length; + } + +} diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index e83548b61e..654d625f16 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -768,16 +768,29 @@ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/ArrayBatchRecords.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/ArrayPoolDataIteratorBatcher.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/IcebergSourceReader.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/IcebergSourceRecordEmitter.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/IcebergSourceSplitReader.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/InlongIcebergSourceReaderMetrics.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/MetaDataReaderFunction.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/RecordFactory.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/RowDataReaderFunction.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/RowDataRecordFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/ArrayBatchRecords.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/ArrayPoolDataIteratorBatcher.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceRecordEmitter.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/MetaDataReaderFunction.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RecordFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataReaderFunction.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/CommitSummary.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifests.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitterMetrics.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergTableSink.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/PartitionKeySelector.java Source : iceberg-flink:iceberg-flink-1.15:1.3.1 (Please note that the software have been modified.) License : https://github.com/apache/iceberg/LICENSE From a7b126bdbb5052e0ddef5aec095a510981245e73 Mon Sep 17 00:00:00 2001 From: vernedeng Date: Wed, 11 Oct 2023 14:25:26 +0800 Subject: [PATCH 2/2] fix --- .../sort/protocol/node/load/IcebergLoadNode.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java index 4d90d99c5a..d39114c3b7 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java @@ -17,8 +17,10 @@ package org.apache.inlong.sort.protocol.node.load; +import org.apache.inlong.common.enums.MetaField; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.InlongMetric; +import org.apache.inlong.sort.protocol.Metadata; import org.apache.inlong.sort.protocol.constant.IcebergConstant; import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType; import org.apache.inlong.sort.protocol.enums.FilterStrategy; @@ -38,14 +40,16 @@ import javax.annotation.Nullable; import java.io.Serializable; +import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Set; @JsonTypeName("icebergLoad") @Data @NoArgsConstructor @EqualsAndHashCode(callSuper = true) -public class IcebergLoadNode extends LoadNode implements InlongMetric, Serializable { +public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata, Serializable { private static final long serialVersionUID = -1L; @@ -128,4 +132,13 @@ public List getPartitionFields() { return super.getPartitionFields(); } + @Override + public boolean isVirtual(MetaField metaField) { + return true; + } + + @Override + public Set supportedMetaFields() { + return EnumSet.of(MetaField.AUDIT_DATA_TIME); + } }