diff --git a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java index 5438c4b65a46..4d3e7dbeef8f 100644 --- a/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java +++ b/api/src/main/java/org/apache/iceberg/actions/MigrateTable.java @@ -60,6 +60,16 @@ default MigrateTable backupTableName(String tableName) { throw new UnsupportedOperationException("Backup table name cannot be specified"); } + /** + * Sets the number of threads to use for file reading. The default is 1. + * + * @param numThreads the number of threads + * @return this for method chaining + */ + default MigrateTable parallelism(int numThreads) { + throw new UnsupportedOperationException("Setting parallelism is not supported"); + } + /** The action result that contains a summary of the execution. */ interface Result { /** Returns the number of migrated data files. */ diff --git a/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java b/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java index 37c600ab0392..81294739248b 100644 --- a/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java +++ b/api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java @@ -57,6 +57,16 @@ public interface SnapshotTable extends Action | Properties to add to the newly created table | +| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) | #### Output @@ -588,6 +589,7 @@ By default, the original table is retained with the name `table_BACKUP_`. | `properties` | ️ | map | Properties for the new Iceberg table | | `drop_backup` | | boolean | When true, the original table will not be retained as backup (defaults to false) | | `backup_table_name` | | string | Name of the table that will be retained as backup (defaults to `table_BACKUP_`) | +| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) | #### Output @@ -629,7 +631,7 @@ will then treat these files as if they are part of the set of files owned by Ic | `source_table` | ✔️ | string | Table where files should come from, paths are also possible in the form of \`file_format\`.\`path\` | | `partition_filter` | ️ | map | A map of partitions in the source table to import from | | `check_duplicate_files` | ️ | boolean | Whether to prevent files existing in the table from being added (defaults to true) | -| `parallelism` | | int | number of threads to use for file reading (defaults to 1) | +| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) | Warning : Schema is not validated, adding files with different schema to the Iceberg table will cause issues. diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java index de4acd74a7ed..3438a061bb09 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java @@ -21,10 +21,15 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.UUID; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.jetbrains.annotations.NotNull; public class ProcedureUtil { @@ -51,4 +56,29 @@ static String statsFileLocation(String tableLocation) { String statsFileName = "stats-file-" + UUID.randomUUID(); return tableLocation.replaceFirst("file:", "") + "/metadata/" + statsFileName; } + + static class TestExecutorService extends ThreadPoolExecutor { + + private int executedTasks = 0; + + TestExecutorService() { + super( + 1, + 1, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().build()); + } + + @Override + public void execute(@NotNull Runnable task) { + super.execute(task); + executedTasks++; + } + + public int getExecutedTasks() { + return executedTasks; + } + } } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java index 26a888356ca6..c1ecee472c4c 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java @@ -20,19 +20,24 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mockStatic; import java.io.IOException; import java.nio.file.Files; import java.util.Map; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.spark.sql.AnalysisException; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedStatic; @ExtendWith(ParameterizedTestExtension.class) public class TestMigrateTableProcedure extends ExtensionsTestBase { @@ -232,4 +237,42 @@ public void testMigrateEmptyTable() throws Exception { Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName); assertThat(result).isEqualTo(0L); } + + @TestTemplate + public void testMigrateWithParallelism() throws IOException { + assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog"); + + testWithParallelism(-1); + testWithParallelism(0); + testWithParallelism(1); + testWithParallelism(5); + } + + private void testWithParallelism(int parallelism) throws IOException { + String location = Files.createTempDirectory(temp, "junit").toFile().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + tableName, location); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + + try (MockedStatic executors = mockStatic(MoreExecutors.class)) { + ProcedureUtil.TestExecutorService testService = new ProcedureUtil.TestExecutorService(); + executors + .when(() -> MoreExecutors.getExitingExecutorService(any(ThreadPoolExecutor.class))) + .thenReturn(testService); + + Object result = + scalarSql( + "CALL %s.system.migrate(table => '%s', parallelism => %d)", + catalogName, tableName, parallelism); + assertThat(result).as("Should have added two files").isEqualTo(2L); + + assertThat(testService.getExecutedTasks()).isEqualTo(parallelism > 1 ? 2 : 0); + testService.shutdown(); + } + + sql("DROP TABLE IF EXISTS %s PURGE", tableName); + sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName); + } } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java index 69bc35e9ea33..fb1d86e5d582 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java @@ -20,19 +20,27 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import java.io.IOException; import java.nio.file.Files; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.spark.sql.AnalysisException; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedStatic; @ExtendWith(ParameterizedTestExtension.class) public class TestSnapshotTableProcedure extends ExtensionsTestBase { @@ -223,4 +231,53 @@ public void testInvalidSnapshotsCases() throws IOException { .isInstanceOf(IllegalArgumentException.class) .hasMessage("Cannot handle an empty identifier for argument table"); } + + @TestTemplate + public void testSnapshotWithParallelism() throws IOException { + testWithParallelism(-1); + testWithParallelism(0); + testWithParallelism(1); + testWithParallelism(5); + } + + private void testWithParallelism(int parallelism) throws IOException { + String location = Files.createTempDirectory(temp, "junit").toFile().toString(); + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'", + sourceName, location); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", sourceName); + + try (MockedStatic executors = mockStatic(MoreExecutors.class)) { + ProcedureUtil.TestExecutorService testService = new ProcedureUtil.TestExecutorService(); + executors + .when(() -> MoreExecutors.getExitingExecutorService(any(ThreadPoolExecutor.class))) + .thenReturn(testService); + executors + .when( + () -> + MoreExecutors.getExitingScheduledExecutorService( + any(ScheduledThreadPoolExecutor.class))) + .thenReturn(mock(ScheduledExecutorService.class)); + Object result = + scalarSql( + "CALL %s.system.snapshot(source_table => '%s', table => '%s', parallelism => %d)", + catalogName, sourceName, tableName, parallelism); + + assertThat(result).as("Should have added two files").isEqualTo(2L); + assertThat(testService.getExecutedTasks()).isEqualTo(parallelism > 1 ? 2 : 0); + } + + Table createdTable = validationCatalog.loadTable(tableIdent); + String tableLocation = createdTable.location(); + assertThat(tableLocation) + .as("Table should not have the original location") + .isNotEqualTo(location); + + assertThat(sql("SELECT * FROM %s ORDER BY id", tableName)) + .containsExactly(row(1L, "a"), row(2L, "b")); + + sql("DROP TABLE IF EXISTS %s", tableName); + sql("DROP TABLE IF EXISTS %s PURGE", sourceName); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 3a2324d89184..2dd530050404 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -388,6 +388,34 @@ public static void importSparkTable( spark, sourceTableIdent, targetTable, stagingDir, partitionFilter, checkDuplicateFiles, 1); } + /** + * Import files from an existing Spark table to an Iceberg table. + * + *

The import uses the Spark session to get table metadata. It assumes no operation is going on + * the original and target table and thus is not thread-safe. + * + * @param spark a Spark session + * @param sourceTableIdent an identifier of the source Spark table + * @param targetTable an Iceberg table where to import the data + * @param stagingDir a staging directory to store temporary manifest files + * @param parallelism number of threads to use for file reading + */ + public static void importSparkTable( + SparkSession spark, + TableIdentifier sourceTableIdent, + Table targetTable, + String stagingDir, + int parallelism) { + importSparkTable( + spark, + sourceTableIdent, + targetTable, + stagingDir, + Collections.emptyMap(), + false, + parallelism); + } + /** * Import files from an existing Spark table to an Iceberg table. * diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java index 5f3cdd3f035c..66e1a756ba13 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java @@ -59,6 +59,7 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction