diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index b164b60fe525..eb28759f934f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -484,6 +484,28 @@ public DataFileMeta copy(List newExtraFiles) { externalPath); } + public DataFileMeta newExternalPath(String newExternalPath) { + return new DataFileMeta( + fileName, + fileSize, + rowCount, + minKey, + maxKey, + keyStats, + valueStats, + minSequenceNumber, + maxSequenceNumber, + schemaId, + level, + extraFiles, + creationTime, + deleteRowCount, + embeddedIndex, + fileSource, + valueStatsCols, + newExternalPath); + } + public DataFileMeta copy(byte[] newEmbeddedIndex) { return new DataFileMeta( fileName, diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java index 61b3e8a5173b..2bcca9c906e9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java @@ -116,7 +116,12 @@ public RollingFileWriter createRollingWriter() suggestedFileSize); } - private class ManifestEntryWriter extends SingleFileWriter { + public ManifestEntryWriter createManifestEntryWriter(Path manifestPath) { + return new ManifestEntryWriter(writerFactory, manifestPath, compression); + } + + /** Writer for {@link ManifestEntry}. */ + public class ManifestEntryWriter extends SingleFileWriter { private final SimpleStatsCollector partitionStatsCollector; private final SimpleStatsConverter partitionStatsSerializer; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java index 15b90ec83411..e697a79552fc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java @@ -20,8 +20,9 @@ import org.apache.paimon.flink.clone.CloneFileInfo; import org.apache.paimon.flink.clone.CloneSourceBuilder; -import org.apache.paimon.flink.clone.CopyFileOperator; -import org.apache.paimon.flink.clone.PickFilesForCloneOperator; +import org.apache.paimon.flink.clone.CopyDataFileOperator; +import org.apache.paimon.flink.clone.CopyManifestFileOperator; +import org.apache.paimon.flink.clone.CopyMetaFilesForCloneOperator; import org.apache.paimon.flink.clone.SnapshotHintChannelComputer; import org.apache.paimon.flink.clone.SnapshotHintOperator; import org.apache.paimon.flink.sink.FlinkStreamPartitioner; @@ -105,27 +106,50 @@ private void buildCloneFlinkJob(StreamExecutionEnvironment env) throws Exception targetTableName) .build(); - SingleOutputStreamOperator pickFilesForClone = + SingleOutputStreamOperator copyMetaFiles = cloneSource + .forward() + .process( + new CopyMetaFilesForCloneOperator( + sourceCatalogConfig, targetCatalogConfig)) + .name("Side Output") + .setParallelism(1); + + DataStream indexFilesStream = + copyMetaFiles.getSideOutput(CopyMetaFilesForCloneOperator.INDEX_FILES_TAG); + DataStream dataManifestFilesStream = + copyMetaFiles.getSideOutput(CopyMetaFilesForCloneOperator.DATA_MANIFEST_FILES_TAG); + + SingleOutputStreamOperator copyIndexFiles = + indexFilesStream .transform( - "Pick Files", + "Copy Index Files", TypeInformation.of(CloneFileInfo.class), - new PickFilesForCloneOperator( + new CopyDataFileOperator(sourceCatalogConfig, targetCatalogConfig)) + .setParallelism(parallelism); + + SingleOutputStreamOperator copyDataManifestFiles = + dataManifestFilesStream + .transform( + "Copy Data Manifest Files", + TypeInformation.of(CloneFileInfo.class), + new CopyManifestFileOperator( sourceCatalogConfig, targetCatalogConfig)) - .forceNonParallel(); + .setParallelism(parallelism); - SingleOutputStreamOperator copyFiles = - pickFilesForClone - .rebalance() + SingleOutputStreamOperator copyDataFile = + copyDataManifestFiles .transform( - "Copy Files", + "Copy Data Files", TypeInformation.of(CloneFileInfo.class), - new CopyFileOperator(sourceCatalogConfig, targetCatalogConfig)) + new CopyDataFileOperator(sourceCatalogConfig, targetCatalogConfig)) .setParallelism(parallelism); + DataStream combinedStream = copyDataFile.union(copyIndexFiles); + SingleOutputStreamOperator snapshotHintOperator = FlinkStreamPartitioner.partition( - copyFiles, new SnapshotHintChannelComputer(), parallelism) + combinedStream, new SnapshotHintChannelComputer(), parallelism) .transform( "Recreate Snapshot Hint", TypeInformation.of(CloneFileInfo.class), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java index 5c0ac75e167f..ebe28bd9f86e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java @@ -18,17 +18,18 @@ package org.apache.paimon.flink.clone; +import javax.annotation.Nullable; + /** The information of copy file. */ public class CloneFileInfo { - - private final String sourceFilePath; - private final String filePathExcludeTableRoot; + @Nullable private final String sourceFilePath; + @Nullable private final String filePathExcludeTableRoot; private final String sourceIdentifier; private final String targetIdentifier; public CloneFileInfo( - String sourceFilePath, - String filePathExcludeTableRoot, + @Nullable String sourceFilePath, + @Nullable String filePathExcludeTableRoot, String sourceIdentifier, String targetIdentifier) { this.sourceFilePath = sourceFilePath; @@ -37,10 +38,12 @@ public CloneFileInfo( this.targetIdentifier = targetIdentifier; } + @Nullable public String getSourceFilePath() { return sourceFilePath; } + @Nullable public String getFilePathExcludeTableRoot() { return filePathExcludeTableRoot; } @@ -56,7 +59,7 @@ public String getTargetIdentifier() { @Override public String toString() { return String.format( - "{ sourceFilePath: %s, filePathExcludeTableRoot: %s, sourceIdentifier: %s, targetIdentifier: %s }", + "{ sourceFilePath: %s, filePathExcludeTableRoot: %s, sourceIdentifier: %s, targetIdentifier: %s}", sourceFilePath, filePathExcludeTableRoot, sourceIdentifier, targetIdentifier); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFilesUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFilesUtil.java new file mode 100644 index 000000000000..09fa4b307234 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFilesUtil.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.clone; + +import org.apache.paimon.FileStore; +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.Path; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.manifest.SimpleFileEntry; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.SupplierWithIOException; + +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** Util class for get used files' paths of a table's latest snapshot. */ +public class CloneFilesUtil { + + private static final int READ_FILE_RETRY_NUM = 3; + private static final int READ_FILE_RETRY_INTERVAL = 5; + + /** + * Retrieves a map of schema file types to their corresponding list of file paths for a given + * snapshot。 The schema file types include: Snapshot, Schema, ManifestList, StatisticFile and + * IndexFile . + * + * @param table The FileStoreTable object representing the table. + * @param snapshotId The ID of the snapshot to retrieve files for. + * @return A map where the key is the FileType and the value is a list of file paths. + * @throws FileNotFoundException If the snapshot file is not found. + */ + public static List getSchemaUsedFilesForSnapshot(FileStoreTable table, long snapshotId) + throws IOException { + FileStore store = table.store(); + SnapshotManager snapshotManager = store.snapshotManager(); + Snapshot snapshot = snapshotManager.tryGetSnapshot(snapshotId); + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + IndexFileHandler indexFileHandler = store.newIndexFileHandler(); + List fileList = new ArrayList<>(); + if (snapshot != null) { + FileStorePathFactory pathFactory = store.pathFactory(); + // 1. add the Snapshot file + fileList.add(snapshotManager.snapshotPath(snapshotId)); + // 2. add the ManifestList files + addManifestList(fileList, snapshot, pathFactory); + + // 3. try to read index files + String indexManifest = snapshot.indexManifest(); + if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) { + fileList.add(pathFactory.indexManifestFileFactory().toPath(indexManifest)); + + List indexManifestEntries = + retryReadingFiles( + () -> indexFileHandler.readManifestWithIOException(indexManifest)); + if (indexManifestEntries != null) { + indexManifestEntries.stream() + .map(IndexManifestEntry::indexFile) + .map(indexFileHandler::filePath) + .forEach(fileList::add); + } + } + + // 4. add statistic file + if (snapshot.statistics() != null) { + fileList.add(pathFactory.statsFileFactory().toPath(snapshot.statistics())); + } + } + + // 5. add the Schema files + for (long id : schemaManager.listAllIds()) { + fileList.add(schemaManager.toSchemaPath(id)); + } + + return fileList; + } + + /** + * Retrieves a map of data file types to their corresponding list of file paths for a given + * snapshot. The data file types include: DataFile and ChangelogFile. + * + * @param table The FileStoreTable object representing the table. + * @param snapshotId The ID of the snapshot to retrieve files for. + * @return A map where the key is the FileType and the value is a list of file paths. the pair + * is the data file's absolute path and data file's relative path. + * @throws FileNotFoundException If the snapshot file is not found. + */ + public static List> getDataUsedFilesForSnapshot( + FileStoreTable table, long snapshotId) throws FileNotFoundException { + FileStore store = table.store(); + SnapshotManager snapshotManager = store.snapshotManager(); + Snapshot snapshot = snapshotManager.tryGetSnapshot(snapshotId); + List> fileList = new ArrayList<>(); + if (snapshot != null) { + // try to read data files + List> dataFiles = new ArrayList<>(); + List simpleFileEntries = + store.newScan().withSnapshot(snapshot).readSimpleEntries(); + for (SimpleFileEntry simpleFileEntry : simpleFileEntries) { + FileStorePathFactory fileStorePathFactory = store.pathFactory(); + Path dataFilePath = + fileStorePathFactory + .createDataFilePathFactory( + simpleFileEntry.partition(), simpleFileEntry.bucket()) + .toPath(simpleFileEntry); + Path relativeBucketPath = + fileStorePathFactory.relativeBucketPath( + simpleFileEntry.partition(), simpleFileEntry.bucket()); + Path relativeTablePath = new Path("/" + relativeBucketPath, dataFilePath.getName()); + dataFiles.add(Pair.of(dataFilePath, relativeTablePath)); + } + + // When scanning, dataFiles are listed from older to newer. + // By reversing dataFiles, newer files will be copied first. + // + // We do this because new files are from the latest partition, and are prone to be + // deleted. Older files however, are from previous partitions and should not be changed + // very often. + Collections.reverse(dataFiles); + fileList.addAll(dataFiles); + } + return fileList; + } + + /** + * Retrieves a map of manifest file types to their corresponding list of file paths for a given + * snapshot. The manifest file types include: ManifestFile. + * + * @param table The FileStoreTable object representing the table. + * @param snapshotId The ID of the snapshot to retrieve files for. + * @return A map where the key is the FileType and the value is a list of file paths. + * @throws FileNotFoundException If the snapshot file is not found. + */ + public static List getManifestUsedFilesForSnapshot(FileStoreTable table, long snapshotId) + throws IOException { + FileStore store = table.store(); + SnapshotManager snapshotManager = store.snapshotManager(); + Snapshot snapshot = snapshotManager.tryGetSnapshot(snapshotId); + ManifestList manifestList = store.manifestListFactory().create(); + List fileList = new ArrayList<>(); + // try to read manifests + List manifestFileMetas = + retryReadingFiles(() -> readAllManifestsWithIOException(snapshot, manifestList)); + if (manifestFileMetas == null) { + return fileList; + } + List manifestFileName = + manifestFileMetas.stream() + .map(ManifestFileMeta::fileName) + .collect(Collectors.toList()); + fileList.addAll( + manifestFileName.stream() + .map(store.pathFactory()::toManifestFilePath) + .collect(Collectors.toList())); + return fileList; + } + + private static void addManifestList( + List fileList, Snapshot snapshot, FileStorePathFactory pathFactory) { + fileList.add(pathFactory.toManifestListPath(snapshot.baseManifestList())); + fileList.add(pathFactory.toManifestListPath(snapshot.deltaManifestList())); + String changelogManifestList = snapshot.changelogManifestList(); + if (changelogManifestList != null) { + fileList.add(pathFactory.toManifestListPath(changelogManifestList)); + } + } + + private static List readAllManifestsWithIOException( + Snapshot snapshot, ManifestList manifestList) throws IOException { + List result = new ArrayList<>(); + + result.addAll(manifestList.readWithIOException(snapshot.baseManifestList())); + result.addAll(manifestList.readWithIOException(snapshot.deltaManifestList())); + + String changelogManifestList = snapshot.changelogManifestList(); + if (changelogManifestList != null) { + result.addAll(manifestList.readWithIOException(changelogManifestList)); + } + + return result; + } + + @Nullable + public static T retryReadingFiles(SupplierWithIOException reader) throws IOException { + int retryNumber = 0; + IOException caught = null; + while (retryNumber++ < READ_FILE_RETRY_NUM) { + try { + return reader.get(); + } catch (FileNotFoundException e) { + return null; + } catch (IOException e) { + caught = e; + } + try { + TimeUnit.MILLISECONDS.sleep(READ_FILE_RETRY_INTERVAL); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + throw caught; + } + + public static List toCloneFileInfos( + List fileList, + Path sourceTableRoot, + String sourceIdentifier, + String targetIdentifier) { + List result = new ArrayList<>(); + for (Path file : fileList) { + Path relativePath = getPathExcludeTableRoot(file, sourceTableRoot); + result.add( + new CloneFileInfo( + file.toUri().toString(), + relativePath.toString(), + sourceIdentifier, + targetIdentifier)); + } + return result; + } + + public static List toCloneFileInfos( + List> fileList, String sourceIdentifier, String targetIdentifier) { + List result = new ArrayList<>(); + for (Pair file : fileList) { + result.add( + new CloneFileInfo( + file.getLeft().toUri().toString(), + file.getRight().toString(), + sourceIdentifier, + targetIdentifier)); + } + return result; + } + + public static Path getPathExcludeTableRoot(Path absolutePath, Path sourceTableRoot) { + String fileAbsolutePath = absolutePath.toUri().toString(); + String sourceTableRootPath = sourceTableRoot.toString(); + + Preconditions.checkState( + fileAbsolutePath.startsWith(sourceTableRootPath), + "File absolute path does not start with source table root path. This is unexpected. " + + "fileAbsolutePath is: " + + fileAbsolutePath + + ", sourceTableRootPath is: " + + sourceTableRootPath); + + return new Path(fileAbsolutePath.substring(sourceTableRootPath.length())); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyDataFileOperator.java similarity index 70% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyDataFileOperator.java index e7002cce1eec..71e1de84669c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyDataFileOperator.java @@ -34,14 +34,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileNotFoundException; import java.util.HashMap; import java.util.Map; /** A Operator to copy files. */ -public class CopyFileOperator extends AbstractStreamOperator +public class CopyDataFileOperator extends AbstractStreamOperator implements OneInputStreamOperator { - private static final Logger LOG = LoggerFactory.getLogger(CopyFileOperator.class); + private static final Logger LOG = LoggerFactory.getLogger(CopyDataFileOperator.class); private final Map sourceCatalogConfig; private final Map targetCatalogConfig; @@ -49,10 +50,9 @@ public class CopyFileOperator extends AbstractStreamOperator private transient Catalog sourceCatalog; private transient Catalog targetCatalog; - private transient Map srcLocations; private transient Map targetLocations; - public CopyFileOperator( + public CopyDataFileOperator( Map sourceCatalogConfig, Map targetCatalogConfig) { this.sourceCatalogConfig = sourceCatalogConfig; this.targetCatalogConfig = targetCatalogConfig; @@ -64,7 +64,6 @@ public void open() throws Exception { FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig)); targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig)); - srcLocations = new HashMap<>(); targetLocations = new HashMap<>(); } @@ -74,18 +73,6 @@ public void processElement(StreamRecord streamRecord) throws Exce FileIO sourceTableFileIO = sourceCatalog.fileIO(); FileIO targetTableFileIO = targetCatalog.fileIO(); - - Path sourceTableRootPath = - srcLocations.computeIfAbsent( - cloneFileInfo.getSourceIdentifier(), - key -> { - try { - return pathOfTable( - sourceCatalog.getTable(Identifier.fromString(key))); - } catch (Catalog.TableNotExistException e) { - throw new RuntimeException(e); - } - }); Path targetTableRootPath = targetLocations.computeIfAbsent( cloneFileInfo.getTargetIdentifier(), @@ -102,32 +89,41 @@ public void processElement(StreamRecord streamRecord) throws Exce Path sourcePath = new Path(cloneFileInfo.getSourceFilePath()); Path targetPath = new Path(targetTableRootPath + filePathExcludeTableRoot); - if (targetTableFileIO.exists(targetPath) - && targetTableFileIO.getFileSize(targetPath) - == sourceTableFileIO.getFileSize(sourcePath)) { - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Skipping clone target file {} because it already exists and has the same size.", - targetPath); + try { + if (targetTableFileIO.exists(targetPath) + && targetTableFileIO.getFileSize(targetPath) + == sourceTableFileIO.getFileSize(sourcePath)) { + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Skipping clone target file {} because it already exists and has the same size.", + targetPath); + } + + // We still send record to SnapshotHintOperator to avoid the following corner case: + // + // When cloning two tables under a catalog, after clone table A is completed, + // the job fails due to snapshot expiration when cloning table B. + // If we don't re-send file information of table A to SnapshotHintOperator, + // the snapshot hint file of A will not be created after the restart. + output.collect(streamRecord); + return; } - - // We still send record to SnapshotHintOperator to avoid the following corner case: - // - // When cloning two tables under a catalog, after clone table A is completed, - // the job fails due to snapshot expiration when cloning table B. - // If we don't re-send file information of table A to SnapshotHintOperator, - // the snapshot hint file of A will not be created after the restart. - output.collect(streamRecord); - return; + } catch (FileNotFoundException e) { + LOG.warn("File {} does not exist. ignore it", sourcePath, e); } if (LOG.isDebugEnabled()) { LOG.debug("Begin copy file from {} to {}.", sourcePath, targetPath); } - IOUtils.copyBytes( - sourceTableFileIO.newInputStream(sourcePath), - targetTableFileIO.newOutputStream(targetPath, true)); + try { + IOUtils.copyBytes( + sourceTableFileIO.newInputStream(sourcePath), + targetTableFileIO.newOutputStream(targetPath, true)); + } catch (FileNotFoundException e) { + LOG.warn("File {} does not exist. ignore it", sourcePath, e); + } + if (LOG.isDebugEnabled()) { LOG.debug("End copy file from {} to {}.", sourcePath, targetPath); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyManifestFileOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyManifestFileOperator.java new file mode 100644 index 000000000000..a0021003bdcd --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyManifestFileOperator.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.clone; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.FileStore; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestFile.ManifestEntryWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.IOUtils; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** A Operator to copy files. */ +public class CopyManifestFileOperator extends AbstractStreamOperator + implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(CopyManifestFileOperator.class); + + private final Map sourceCatalogConfig; + private final Map targetCatalogConfig; + + private transient Catalog sourceCatalog; + private transient Catalog targetCatalog; + + private transient Map targetLocations; + + public CopyManifestFileOperator( + Map sourceCatalogConfig, Map targetCatalogConfig) { + this.sourceCatalogConfig = sourceCatalogConfig; + this.targetCatalogConfig = targetCatalogConfig; + } + + @Override + public void open() throws Exception { + sourceCatalog = + FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig)); + targetCatalog = + FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig)); + targetLocations = new HashMap<>(); + } + + @Override + public void processElement(StreamRecord streamRecord) throws Exception { + CloneFileInfo cloneFileInfo = streamRecord.getValue(); + FileIO sourceTableFileIO = sourceCatalog.fileIO(); + FileIO targetTableFileIO = targetCatalog.fileIO(); + Path targetTableRootPath = + targetLocations.computeIfAbsent( + cloneFileInfo.getTargetIdentifier(), + key -> { + try { + return pathOfTable( + targetCatalog.getTable(Identifier.fromString(key))); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + }); + + String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot(); + Path sourcePath = new Path(cloneFileInfo.getSourceFilePath()); + Path targetPath = new Path(targetTableRootPath + filePathExcludeTableRoot); + + if (targetTableFileIO.exists(targetPath) + && targetTableFileIO.getFileSize(targetPath) + == sourceTableFileIO.getFileSize(sourcePath)) { + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Skipping clone target file {} because it already exists and has the same size.", + targetPath); + } + + // in this case,we don't need to copy the manifest file, just pick the data files + copyOrRewriteManifestFile( + sourceTableFileIO, + targetTableFileIO, + sourcePath, + targetPath, + cloneFileInfo, + false); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Begin copy file from {} to {}.", sourcePath, targetPath); + } + copyOrRewriteManifestFile( + sourceTableFileIO, targetTableFileIO, sourcePath, targetPath, cloneFileInfo, true); + if (LOG.isDebugEnabled()) { + LOG.debug("End copy file from {} to {}.", sourcePath, targetPath); + } + } + + private void copyOrRewriteManifestFile( + FileIO sourceTableFileIO, + FileIO targetTableFileIO, + Path sourcePath, + Path targetPath, + CloneFileInfo cloneFileInfo, + boolean needCopyManifestFile) + throws IOException, Catalog.TableNotExistException { + Identifier sourceIdentifier = Identifier.fromString(cloneFileInfo.getSourceIdentifier()); + FileStoreTable sourceTable = (FileStoreTable) sourceCatalog.getTable(sourceIdentifier); + FileStore store = sourceTable.store(); + ManifestFile manifestFile = store.manifestFileFactory().create(); + + List manifestEntries = + manifestFile.readWithIOException(sourcePath.getName()); + List targetManifestEntries = new ArrayList<>(manifestEntries.size()); + + if (needCopyManifestFile) { + if (containsExternalPath(manifestEntries)) { + // rewrite it, clone job will clone the source path to target warehouse path, so the + // target external + // path is null + for (ManifestEntry manifestEntry : manifestEntries) { + ManifestEntry newManifestEntry = + new ManifestEntry( + manifestEntry.kind(), + manifestEntry.partition(), + manifestEntry.bucket(), + manifestEntry.totalBuckets(), + manifestEntry.file().newExternalPath(null)); + targetManifestEntries.add(newManifestEntry); + } + ManifestEntryWriter manifestEntryWriter = + manifestFile.createManifestEntryWriter(targetPath); + manifestEntryWriter.write(targetManifestEntries); + manifestEntryWriter.close(); + } else { + // copy it + IOUtils.copyBytes( + sourceTableFileIO.newInputStream(sourcePath), + targetTableFileIO.newOutputStream(targetPath, true)); + } + } + // pick data files + pickDataFilesForClone(manifestEntries, store, cloneFileInfo); + } + + private void pickDataFilesForClone( + List manifestEntries, FileStore store, CloneFileInfo cloneFileInfo) { + // pick the data files + for (ManifestEntry manifestEntry : manifestEntries) { + FileStorePathFactory fileStorePathFactory = store.pathFactory(); + Path dataFilePath = + fileStorePathFactory + .createDataFilePathFactory( + manifestEntry.partition(), manifestEntry.bucket()) + .toPath(manifestEntry); + Path relativeBucketPath = + fileStorePathFactory.relativeBucketPath( + manifestEntry.partition(), manifestEntry.bucket()); + Path relativeTablePath = new Path("/" + relativeBucketPath, dataFilePath.getName()); + + output.collect( + new StreamRecord<>( + new CloneFileInfo( + dataFilePath.toString(), + relativeTablePath.toString(), + cloneFileInfo.getSourceIdentifier(), + cloneFileInfo.getTargetIdentifier()))); + } + } + + private boolean containsExternalPath(List manifestEntries) { + boolean result = false; + for (ManifestEntry manifestEntry : manifestEntries) { + if (manifestEntry.file().externalPath().isPresent()) { + result = true; + break; + } + } + return result; + } + + private Path pathOfTable(Table table) { + return new Path(table.options().get(CoreOptions.PATH.key())); + } + + @Override + public void close() throws Exception { + if (sourceCatalog != null) { + sourceCatalog.close(); + } + if (targetCatalog != null) { + targetCatalog.close(); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyMetaFilesForCloneOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyMetaFilesForCloneOperator.java new file mode 100644 index 000000000000..4065bfe4c357 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyMetaFilesForCloneOperator.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.clone; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.FileStore; +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.fs.Path; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.SnapshotManager; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; +import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Copy the meta files of a table for clone operation. and output the index files and data manifest + * files of the table to the next operator. + */ +public class CopyMetaFilesForCloneOperator extends ProcessFunction, Void> { + + public static final OutputTag INDEX_FILES_TAG = + new OutputTag("index-files") {}; + public static final OutputTag DATA_MANIFEST_FILES_TAG = + new OutputTag("data-manifest-files") {}; + + private static final Logger LOG = LoggerFactory.getLogger(CopyMetaFilesForCloneOperator.class); + + private final Map sourceCatalogConfig; + private final Map targetCatalogConfig; + + private Catalog sourceCatalog; + private Catalog targetCatalog; + + public CopyMetaFilesForCloneOperator( + Map sourceCatalogConfig, Map targetCatalogConfig) { + this.sourceCatalogConfig = sourceCatalogConfig; + this.targetCatalogConfig = targetCatalogConfig; + } + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + sourceCatalog = + FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig)); + targetCatalog = + FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig)); + } + + @Override + public void processElement( + Tuple2 tuple, + ProcessFunction, Void>.Context context, + Collector collector) + throws Exception { + String sourceIdentifierStr = tuple.f0; + Identifier sourceIdentifier = Identifier.fromString(sourceIdentifierStr); + String targetIdentifierStr = tuple.f1; + Identifier targetIdentifier = Identifier.fromString(targetIdentifierStr); + + FileStoreTable sourceTable = (FileStoreTable) sourceCatalog.getTable(sourceIdentifier); + + // 1. create target table + targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true); + targetCatalog.createTable( + targetIdentifier, newSchemaFromTableSchema(sourceTable.schema()), true); + FileStoreTable targetTable = (FileStoreTable) targetCatalog.getTable(targetIdentifier); + + // 2. copy all schema files + SchemaManager sourceSchemaManager = sourceTable.schemaManager(); + SchemaManager targetSchemaManager = targetTable.schemaManager(); + for (long schemaId : sourceSchemaManager.listAllIds()) { + targetTable + .fileIO() + .copyFile( + sourceSchemaManager.toSchemaPath(schemaId), + targetSchemaManager.toSchemaPath(schemaId), + true); + } + + // 3. copy latest snapshot files + FileStore sourceStore = sourceTable.store(); + FileStore targetStore = targetTable.store(); + SnapshotManager sourceSnapshotManager = sourceStore.snapshotManager(); + SnapshotManager targetSnapshotManager = targetStore.snapshotManager(); + Snapshot latestSnapshot = sourceSnapshotManager.latestSnapshot(); + if (latestSnapshot != null) { + long snapshotId = latestSnapshot.id(); + targetTable + .fileIO() + .copyFile( + sourceSnapshotManager.snapshotPath(snapshotId), + targetSnapshotManager.snapshotPath(snapshotId), + true); + } + + FileStorePathFactory sourcePathFactory = sourceStore.pathFactory(); + FileStorePathFactory targetPathFactory = targetStore.pathFactory(); + // 4. copy manifest list files + if (latestSnapshot != null) { + targetTable + .fileIO() + .copyFile( + sourcePathFactory.toManifestListPath(latestSnapshot.baseManifestList()), + targetPathFactory.toManifestListPath(latestSnapshot.baseManifestList()), + true); + + targetTable + .fileIO() + .copyFile( + sourcePathFactory.toManifestListPath( + latestSnapshot.deltaManifestList()), + targetPathFactory.toManifestListPath( + latestSnapshot.deltaManifestList()), + true); + + String changelogManifestList = latestSnapshot.changelogManifestList(); + if (changelogManifestList != null) { + targetTable + .fileIO() + .copyFile( + sourcePathFactory.toManifestListPath(changelogManifestList), + targetPathFactory.toManifestListPath(changelogManifestList), + true); + } + } + + // 5. copy index manifest files + List indexFiles = new ArrayList<>(); + if (latestSnapshot != null) { + IndexFileHandler indexFileHandler = sourceStore.newIndexFileHandler(); + String indexManifest = latestSnapshot.indexManifest(); + if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) { + targetTable + .fileIO() + .copyFile( + sourcePathFactory.indexManifestFileFactory().toPath(indexManifest), + targetPathFactory.indexManifestFileFactory().toPath(indexManifest), + true); + + // read index files + List indexManifestEntries = + CloneFilesUtil.retryReadingFiles( + () -> indexFileHandler.readManifestWithIOException(indexManifest)); + + List indexFileList = new ArrayList<>(); + if (indexManifestEntries != null) { + indexManifestEntries.stream() + .map(IndexManifestEntry::indexFile) + .map(indexFileHandler::filePath) + .forEach(indexFileList::add); + } + + indexFiles = + CloneFilesUtil.toCloneFileInfos( + indexFileList, + sourceTable.location(), + sourceIdentifierStr, + targetIdentifierStr); + for (CloneFileInfo info : indexFiles) { + context.output(INDEX_FILES_TAG, info); + } + } + } + + // 6. copy statistics file + if (latestSnapshot != null && latestSnapshot.statistics() != null) { + targetTable + .fileIO() + .copyFile( + sourcePathFactory + .statsFileFactory() + .toPath(latestSnapshot.statistics()), + targetPathFactory + .statsFileFactory() + .toPath(latestSnapshot.statistics()), + true); + } + + // pick manifest files + List dataManifestFiles = new ArrayList<>(); + if (latestSnapshot != null) { + List list = + CloneFilesUtil.getManifestUsedFilesForSnapshot( + sourceTable, latestSnapshot.id()); + dataManifestFiles = + CloneFilesUtil.toCloneFileInfos( + list, sourceTable.location(), sourceIdentifierStr, targetIdentifierStr); + } + + for (CloneFileInfo info : dataManifestFiles) { + context.output(DATA_MANIFEST_FILES_TAG, info); + } + if (LOG.isDebugEnabled()) { + LOG.debug( + "The CloneFileInfo of table {} is: indexFiles={}, dataManifestFiles={}", + sourceTable.location(), + indexFiles, + dataManifestFiles); + } + } + + private static Schema newSchemaFromTableSchema(TableSchema tableSchema) { + return new Schema( + ImmutableList.copyOf(tableSchema.fields()), + ImmutableList.copyOf(tableSchema.partitionKeys()), + ImmutableList.copyOf(tableSchema.primaryKeys()), + ImmutableMap.copyOf( + Iterables.filter( + tableSchema.options().entrySet(), + entry -> !Objects.equals(entry.getKey(), CoreOptions.PATH.key()))), + tableSchema.comment()); + } + + @Override + public void close() throws Exception { + if (sourceCatalog != null) { + sourceCatalog.close(); + } + if (targetCatalog != null) { + targetCatalog.close(); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java deleted file mode 100644 index 512d48170f4a..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.clone; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.flink.FlinkCatalogFactory; -import org.apache.paimon.fs.Path; -import org.apache.paimon.options.Options; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.utils.Preconditions; - -import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; -import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; -import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -/** - * Pick the files to be cloned of a table based on the input record. The record type it produce is - * CloneFileInfo that indicate the information of copy file. - */ -public class PickFilesForCloneOperator extends AbstractStreamOperator - implements OneInputStreamOperator, CloneFileInfo> { - - private static final Logger LOG = LoggerFactory.getLogger(PickFilesForCloneOperator.class); - - private final Map sourceCatalogConfig; - private final Map targetCatalogConfig; - - private Catalog sourceCatalog; - private Catalog targetCatalog; - - public PickFilesForCloneOperator( - Map sourceCatalogConfig, Map targetCatalogConfig) { - this.sourceCatalogConfig = sourceCatalogConfig; - this.targetCatalogConfig = targetCatalogConfig; - } - - @Override - public void open() throws Exception { - sourceCatalog = - FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig)); - targetCatalog = - FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig)); - } - - @Override - public void processElement(StreamRecord> streamRecord) throws Exception { - String sourceIdentifierStr = streamRecord.getValue().f0; - Identifier sourceIdentifier = Identifier.fromString(sourceIdentifierStr); - String targetIdentifierStr = streamRecord.getValue().f1; - Identifier targetIdentifier = Identifier.fromString(targetIdentifierStr); - - FileStoreTable sourceTable = (FileStoreTable) sourceCatalog.getTable(sourceIdentifier); - targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true); - targetCatalog.createTable( - targetIdentifier, newSchemaFromTableSchema(sourceTable.schema()), true); - FileStoreTable targetTable = (FileStoreTable) targetCatalog.getTable(targetIdentifier); - - // Make sure that latest schema file of source and target table are the same, - // so latest schema won't be overwritten in `CopyFileOperator` and the target table can - // always be retrieved from catalog. - SchemaManager sourceSchemaManager = sourceTable.schemaManager(); - SchemaManager targetSchemaManager = targetTable.schemaManager(); - long schemaId = sourceTable.schema().id(); - targetTable - .fileIO() - .copyFile( - sourceSchemaManager.toSchemaPath(schemaId), - targetSchemaManager.toSchemaPath(schemaId), - true); - - List result = - toCloneFileInfos( - PickFilesUtil.getUsedFilesForLatestSnapshot(sourceTable), - sourceTable.location(), - sourceIdentifierStr, - targetIdentifierStr); - - if (LOG.isDebugEnabled()) { - LOG.debug("The CloneFileInfo of table {} is {} : ", sourceTable.location(), result); - } - - for (CloneFileInfo info : result) { - output.collect(new StreamRecord<>(info)); - } - } - - private static Schema newSchemaFromTableSchema(TableSchema tableSchema) { - return new Schema( - ImmutableList.copyOf(tableSchema.fields()), - ImmutableList.copyOf(tableSchema.partitionKeys()), - ImmutableList.copyOf(tableSchema.primaryKeys()), - ImmutableMap.copyOf( - Iterables.filter( - tableSchema.options().entrySet(), - entry -> !Objects.equals(entry.getKey(), CoreOptions.PATH.key()))), - tableSchema.comment()); - } - - private List toCloneFileInfos( - List files, - Path sourceTableRoot, - String sourceIdentifier, - String targetIdentifier) { - List result = new ArrayList<>(); - for (Path file : files) { - Path relativePath = getPathExcludeTableRoot(file, sourceTableRoot); - result.add( - new CloneFileInfo( - file.toUri().toString(), - relativePath.toString(), - sourceIdentifier, - targetIdentifier)); - } - return result; - } - - private Path getPathExcludeTableRoot(Path absolutePath, Path sourceTableRoot) { - String fileAbsolutePath = absolutePath.toUri().toString(); - String sourceTableRootPath = sourceTableRoot.toString(); - - Preconditions.checkState( - fileAbsolutePath.startsWith(sourceTableRootPath), - "File absolute path does not start with source table root path. This is unexpected. " - + "fileAbsolutePath is: " - + fileAbsolutePath - + ", sourceTableRootPath is: " - + sourceTableRootPath); - - return new Path(fileAbsolutePath.substring(sourceTableRootPath.length())); - } - - @Override - public void close() throws Exception { - if (sourceCatalog != null) { - sourceCatalog.close(); - } - if (targetCatalog != null) { - targetCatalog.close(); - } - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java deleted file mode 100644 index c36a6cd18668..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.clone; - -import org.apache.paimon.FileStore; -import org.apache.paimon.Snapshot; -import org.apache.paimon.fs.Path; -import org.apache.paimon.index.IndexFileHandler; -import org.apache.paimon.manifest.IndexManifestEntry; -import org.apache.paimon.manifest.ManifestFileMeta; -import org.apache.paimon.manifest.ManifestList; -import org.apache.paimon.manifest.SimpleFileEntry; -import org.apache.paimon.operation.FileStoreScan; -import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.utils.FileStorePathFactory; -import org.apache.paimon.utils.SnapshotManager; -import org.apache.paimon.utils.SupplierWithIOException; - -import javax.annotation.Nullable; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -/** Util class for get used files' paths of a table's latest snapshot. */ -public class PickFilesUtil { - - private static final int READ_FILE_RETRY_NUM = 3; - private static final int READ_FILE_RETRY_INTERVAL = 5; - - public static List getUsedFilesForLatestSnapshot(FileStoreTable table) { - FileStore store = table.store(); - SnapshotManager snapshotManager = store.snapshotManager(); - Snapshot snapshot = snapshotManager.latestSnapshot(); - ManifestList manifestList = store.manifestListFactory().create(); - SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); - IndexFileHandler indexFileHandler = store.newIndexFileHandler(); - - List files = new ArrayList<>(); - if (snapshot != null) { - files.add(snapshotManager.snapshotPath(snapshot.id())); - files.addAll( - getUsedFilesInternal( - snapshot, - store.pathFactory(), - store.newScan(), - manifestList, - indexFileHandler)); - } - for (long id : schemaManager.listAllIds()) { - files.add(schemaManager.toSchemaPath(id)); - } - return files; - } - - private static List getUsedFilesInternal( - Snapshot snapshot, - FileStorePathFactory pathFactory, - FileStoreScan scan, - ManifestList manifestList, - IndexFileHandler indexFileHandler) { - List files = new ArrayList<>(); - addManifestList(files, snapshot, pathFactory); - - try { - // try to read manifests - List manifestFileMetas = - retryReadingFiles( - () -> readAllManifestsWithIOException(snapshot, manifestList)); - if (manifestFileMetas == null) { - return Collections.emptyList(); - } - List manifestFileName = - manifestFileMetas.stream() - .map(ManifestFileMeta::fileName) - .collect(Collectors.toList()); - files.addAll( - manifestFileName.stream() - .map(pathFactory::toManifestFilePath) - .collect(Collectors.toList())); - - // try to read data files - List dataFiles = new ArrayList<>(); - List simpleFileEntries = - scan.withSnapshot(snapshot).readSimpleEntries(); - for (SimpleFileEntry simpleFileEntry : simpleFileEntries) { - Path dataFilePath = - pathFactory - .createDataFilePathFactory( - simpleFileEntry.partition(), simpleFileEntry.bucket()) - .toPath(simpleFileEntry); - dataFiles.add(dataFilePath); - } - - // When scanning, dataFiles are listed from older to newer. - // By reversing dataFiles, newer files will be copied first. - // - // We do this because new files are from the latest partition, and are prone to be - // deleted. Older files however, are from previous partitions and should not be changed - // very often. - Collections.reverse(dataFiles); - files.addAll(dataFiles); - - // try to read index files - String indexManifest = snapshot.indexManifest(); - if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) { - files.add(pathFactory.indexManifestFileFactory().toPath(indexManifest)); - - List indexManifestEntries = - retryReadingFiles( - () -> indexFileHandler.readManifestWithIOException(indexManifest)); - if (indexManifestEntries == null) { - return Collections.emptyList(); - } - - indexManifestEntries.stream() - .map(IndexManifestEntry::indexFile) - .map(indexFileHandler::filePath) - .forEach(files::add); - } - - // add statistic file - if (snapshot.statistics() != null) { - files.add(pathFactory.statsFileFactory().toPath(snapshot.statistics())); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - - return files; - } - - private static void addManifestList( - List used, Snapshot snapshot, FileStorePathFactory pathFactory) { - used.add(pathFactory.toManifestListPath(snapshot.baseManifestList())); - used.add(pathFactory.toManifestListPath(snapshot.deltaManifestList())); - String changelogManifestList = snapshot.changelogManifestList(); - if (changelogManifestList != null) { - used.add(pathFactory.toManifestListPath(changelogManifestList)); - } - } - - private static List readAllManifestsWithIOException( - Snapshot snapshot, ManifestList manifestList) throws IOException { - List result = new ArrayList<>(); - - result.addAll(manifestList.readWithIOException(snapshot.baseManifestList())); - result.addAll(manifestList.readWithIOException(snapshot.deltaManifestList())); - - String changelogManifestList = snapshot.changelogManifestList(); - if (changelogManifestList != null) { - result.addAll(manifestList.readWithIOException(changelogManifestList)); - } - - return result; - } - - @Nullable - private static T retryReadingFiles(SupplierWithIOException reader) throws IOException { - int retryNumber = 0; - IOException caught = null; - while (retryNumber++ < READ_FILE_RETRY_NUM) { - try { - return reader.get(); - } catch (FileNotFoundException e) { - return null; - } catch (IOException e) { - caught = e; - } - try { - TimeUnit.MILLISECONDS.sleep(READ_FILE_RETRY_INTERVAL); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - - throw caught; - } -} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java index a55b01cc203b..50ddb3cea723 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java @@ -18,25 +18,33 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.FileStore; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.flink.clone.PickFilesUtil; +import org.apache.paimon.flink.clone.CloneFilesUtil; +import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TraceableFileIO; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -120,6 +128,69 @@ public void testCloneTable(String invoker) throws Exception { compareCloneFiles(sourceWarehouse, "db1", "t1", targetWarehouse, "mydb", "myt"); } + @ParameterizedTest(name = "invoker = {0}") + @ValueSource(strings = {"action", "procedure_indexed", "procedure_named"}) + public void testCloneTableWithSourceTableExternalPath(String invoker) throws Exception { + String sourceWarehouse = getTempDirPath("source-ware"); + prepareDataWithExternalPath(sourceWarehouse); + + String targetWarehouse = getTempDirPath("target-ware"); + switch (invoker) { + case "action": + String[] args = + new String[] { + "clone", + "--warehouse", + sourceWarehouse, + "--database", + "db1", + "--table", + "t1", + "--target_warehouse", + targetWarehouse, + "--target_database", + "mydb", + "--target_table", + "myt" + }; + ActionFactory.createAction(args).get().run(); + break; + case "procedure_indexed": + executeSQL( + String.format( + "CALL sys.clone('%s', 'db1', 't1', '', '%s', 'mydb', 'myt')", + sourceWarehouse, targetWarehouse), + true, + true); + break; + case "procedure_named": + executeSQL( + String.format( + "CALL sys.clone(warehouse => '%s', database => 'db1', `table` => 't1', target_warehouse => '%s', target_database => 'mydb', target_table => 'myt')", + sourceWarehouse, targetWarehouse), + true, + true); + break; + default: + throw new UnsupportedOperationException(invoker); + } + + // check result + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql( + "CREATE CATALOG targetcat WITH (\n" + + " 'type' = 'paimon',\n" + + String.format(" 'warehouse' = '%s'\n", targetWarehouse) + + ")"); + tEnv.executeSql("USE CATALOG targetcat"); + + List actual = collect(tEnv, "SELECT pt, k, v FROM mydb.myt ORDER BY pt, k"); + assertThat(actual) + .containsExactly( + "+I[one, 1, 10]", "+I[one, 2, 21]", "+I[two, 1, 101]", "+I[two, 2, 200]"); + compareCloneFiles(sourceWarehouse, "db1", "t1", targetWarehouse, "mydb", "myt"); + } + @ParameterizedTest(name = "invoker = {0}") @ValueSource(strings = {"action", "procedure_indexed", "procedure_named"}) public void testCloneDatabase(String invoker) throws Exception { @@ -362,6 +433,130 @@ private void prepareData(String sourceWarehouse) throws Exception { .await(); } + private void prepareDataWithExternalPath(String sourceWarehouse) throws Exception { + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql( + "CREATE CATALOG sourcecat WITH (\n" + + " 'type' = 'paimon',\n" + + String.format(" 'warehouse' = '%s'\n", sourceWarehouse) + + ")"); + tEnv.executeSql("USE CATALOG sourcecat"); + + tEnv.executeSql("CREATE DATABASE db1"); + tEnv.executeSql("CREATE DATABASE db2"); + + String db1T1ExternalPath = TraceableFileIO.SCHEME + "://" + getTempDirPath(); + String db1T2ExternalPath = TraceableFileIO.SCHEME + "://" + getTempDirPath(); + String db2T3ExternalPath = TraceableFileIO.SCHEME + "://" + getTempDirPath(); + String db2T4ExternalPath = TraceableFileIO.SCHEME + "://" + getTempDirPath(); + + // prepare data: db1.t1 + tEnv.executeSql( + "CREATE TABLE db1.t1 (\n" + + " pt STRING,\n" + + " k INT,\n" + + " v INT,\n" + + " PRIMARY KEY (pt, k) NOT ENFORCED\n" + + ") PARTITIONED BY (pt) WITH (\n" + + " 'changelog-producer' = 'lookup',\n" + + " 'data-file.external-paths' = '" + + db1T1ExternalPath + + "',\n" + + " 'data-file.external-paths.strategy' = 'round-robin'\n" + + ")"); + tEnv.executeSql( + "INSERT INTO db1.t1 VALUES " + + "('one', 1, 10), " + + "('one', 2, 20), " + + "('two', 1, 100)") + .await(); + tEnv.executeSql( + "INSERT INTO db1.t1 VALUES " + + "('one', 2, 21), " + + "('two', 1, 101), " + + "('two', 2, 200)") + .await(); + + // prepare data: db1.t2 + tEnv.executeSql( + "CREATE TABLE db1.t2 (\n" + + " k INT,\n" + + " v INT,\n" + + " PRIMARY KEY (k) NOT ENFORCED\n" + + ") WITH (\n" + + " 'changelog-producer' = 'lookup',\n" + + " 'data-file.external-paths' = '" + + db1T2ExternalPath + + "',\n" + + " 'data-file.external-paths.strategy' = 'round-robin'\n" + + ")"); + tEnv.executeSql( + "INSERT INTO db1.t2 VALUES " + + "(10, 100), " + + "(20, 200), " + + "(100, 1000)") + .await(); + tEnv.executeSql( + "INSERT INTO db1.t2 VALUES " + + "(20, 201), " + + "(100, 1001), " + + "(200, 2000)") + .await(); + + // prepare data: db2.t3 + tEnv.executeSql( + "CREATE TABLE db2.t3 (\n" + + " pt INT,\n" + + " k INT,\n" + + " v STRING,\n" + + " PRIMARY KEY (pt, k) NOT ENFORCED\n" + + ") PARTITIONED BY (pt) WITH (\n" + + " 'changelog-producer' = 'lookup',\n" + + " 'data-file.external-paths' = '" + + db2T3ExternalPath + + "',\n" + + " 'data-file.external-paths.strategy' = 'round-robin'\n" + + ")"); + tEnv.executeSql( + "INSERT INTO db2.t3 VALUES " + + "(1, 1, 'one'), " + + "(1, 2, 'two'), " + + "(2, 1, 'apple')") + .await(); + tEnv.executeSql( + "INSERT INTO db2.t3 VALUES " + + "(1, 2, 'twenty'), " + + "(2, 1, 'banana'), " + + "(2, 2, 'orange')") + .await(); + + // prepare data: db2.t4 + tEnv.executeSql( + "CREATE TABLE db2.t4 (\n" + + " k INT,\n" + + " v STRING,\n" + + " PRIMARY KEY (k) NOT ENFORCED\n" + + ") WITH (\n" + + " 'changelog-producer' = 'lookup',\n" + + " 'data-file.external-paths' = '" + + db2T4ExternalPath + + "',\n" + + " 'data-file.external-paths.strategy' = 'round-robin'\n" + + ")"); + tEnv.executeSql( + "INSERT INTO db2.t4 VALUES " + + "(10, 'one'), " + + "(20, 'two'), " + + "(100, 'apple')") + .await(); + tEnv.executeSql( + "INSERT INTO db2.t4 VALUES " + + "(20, 'twenty'), " + + "(100, 'banana'), " + + "(200, 'orange')") + .await(); + } + @ParameterizedTest(name = "invoker = {0}") @ValueSource(strings = {"action", "procedure_indexed", "procedure_named"}) public void testCloneWithSchemaEvolution(String invoker) throws Exception { @@ -457,9 +652,20 @@ private void compareCloneFiles( String targetTableName) throws Exception { FileStoreTable targetTable = getFileStoreTable(targetWarehouse, targetDb, targetTableName); - List targetTableFiles = PickFilesUtil.getUsedFilesForLatestSnapshot(targetTable); + + FileStore store = targetTable.store(); + SnapshotManager snapshotManager = store.snapshotManager(); + Snapshot latestSnapshot = snapshotManager.latestSnapshot(); + assertThat(latestSnapshot).isNotNull(); + long snapshotId = latestSnapshot.id(); + FileStoreTable sourceTable = getFileStoreTable(sourceWarehouse, sourceDb, sourceTableName); + Path tableLocation = sourceTable.location(); + + // 1. check the schema files + List targetTableSchemaFiles = + CloneFilesUtil.getSchemaUsedFilesForSnapshot(targetTable, snapshotId); List> filesPathInfoList = - targetTableFiles.stream() + targetTableSchemaFiles.stream() .map( absolutePath -> Pair.of( @@ -467,15 +673,71 @@ private void compareCloneFiles( getPathExcludeTableRoot( absolutePath, targetTable.location()))) .collect(Collectors.toList()); - - FileStoreTable sourceTable = getFileStoreTable(sourceWarehouse, sourceDb, sourceTableName); - Path tableLocation = sourceTable.location(); for (Pair filesPathInfo : filesPathInfoList) { Path sourceTableFile = new Path(tableLocation.toString() + filesPathInfo.getRight()); assertThat(sourceTable.fileIO().exists(sourceTableFile)).isTrue(); assertThat(targetTable.fileIO().getFileSize(filesPathInfo.getLeft())) .isEqualTo(sourceTable.fileIO().getFileSize(sourceTableFile)); } + + // 2. check the manifest files + List targetTableManifestFiles = + CloneFilesUtil.getManifestUsedFilesForSnapshot(targetTable, snapshotId); + filesPathInfoList = + targetTableManifestFiles.stream() + .map( + absolutePath -> + Pair.of( + absolutePath, + getPathExcludeTableRoot( + absolutePath, targetTable.location()))) + .collect(Collectors.toList()); + boolean isExternalPath = + sourceTable.options().containsKey(CoreOptions.DATA_FILE_EXTERNAL_PATHS.key()); + for (Pair filesPathInfo : filesPathInfoList) { + Path sourceTableFile = new Path(tableLocation.toString() + filesPathInfo.getRight()); + assertThat(sourceTable.fileIO().exists(sourceTableFile)).isTrue(); + if (!isExternalPath) { + assertThat(targetTable.fileIO().getFileSize(filesPathInfo.getLeft())) + .isEqualTo(sourceTable.fileIO().getFileSize(sourceTableFile)); + } else { + // todo need to check the content of manifest files + } + } + + // 3. check the data files + filesPathInfoList = CloneFilesUtil.getDataUsedFilesForSnapshot(targetTable, snapshotId); + isExternalPath = + sourceTable.options().containsKey(CoreOptions.DATA_FILE_EXTERNAL_PATHS.key()); + String externalPaths = null; + if (isExternalPath) { + externalPaths = sourceTable.options().get(CoreOptions.DATA_FILE_EXTERNAL_PATHS.key()); + } + + for (Pair filesPathInfo : filesPathInfoList) { + List paths = new ArrayList<>(); + if (externalPaths == null) { + paths.add(new Path(tableLocation.toString() + filesPathInfo.getRight())); + } else { + for (String externalPath : externalPaths.split(",")) { + paths.add(new Path(externalPath + filesPathInfo.getRight())); + } + } + + Pair result = pathExist(targetTable.fileIO(), paths); + assertThat(result.getRight()).isTrue(); + assertThat(targetTable.fileIO().getFileSize(filesPathInfo.getLeft())) + .isEqualTo(sourceTable.fileIO().getFileSize(result.getLeft())); + } + } + + private Pair pathExist(FileIO fileIO, List paths) throws IOException { + for (Path path : paths) { + if (fileIO.exists(path)) { + return Pair.of(path, true); + } + } + return Pair.of(null, false); } private Path getPathExcludeTableRoot(Path absolutePath, Path sourceTableRoot) { @@ -504,6 +766,7 @@ private FileStoreTable getFileStoreTable(String warehouse, String db, String tab // Random Tests // ------------------------------------------------------------------------ + @Disabled @ParameterizedTest(name = "invoker = {0}") @ValueSource(strings = {"action", "procedure_indexed", "procedure_named"}) @Timeout(180)