Skip to content

Commit

Permalink
Spark 3.5: Parallelize reading files in snapshot and migrate procedures
Browse files Browse the repository at this point in the history
  • Loading branch information
manuzhang committed Apr 2, 2024
1 parent fa80c85 commit 23e7062
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 6 deletions.
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/MigrateTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ default MigrateTable backupTableName(String tableName) {
throw new UnsupportedOperationException("Backup table name cannot be specified");
}

/**
* Sets the number of threads to use for file reading. The default is 1.
*
* @param numThreads the number of threads
* @return this for method chaining
*/
default MigrateTable parallelism(int numThreads) {
throw new UnsupportedOperationException("Setting parallelism is not supported");
}

/** The action result that contains a summary of the execution. */
interface Result {
/** Returns the number of migrated data files. */
Expand Down
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ public interface SnapshotTable extends Action<SnapshotTable, SnapshotTable.Resul
*/
SnapshotTable tableProperty(String key, String value);

/**
* Sets the number of threads to use for file reading. The default is 1.
*
* @param numThreads the number of threads
* @return this for method chaining
*/
default SnapshotTable parallelism(int numThreads) {
throw new UnsupportedOperationException("Setting parallelism is not supported");
}

/** The action result that contains a summary of the execution. */
interface Result {
/** Returns the number of imported data files. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class TableMigrationUtil {
private static final PathFilter HIDDEN_PATH_FILTER =
p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");

private static int fileReadingParallelism = 1;

private TableMigrationUtil() {}

/**
Expand Down Expand Up @@ -131,8 +133,11 @@ public static List<DataFile> listPartition(
Tasks.range(fileStatus.size()).stopOnFailure().throwFailureWhenFinished();

if (parallelism > 1) {
service = migrationService(parallelism);
fileReadingParallelism = parallelism;
service = migrationService(fileReadingParallelism);
task.executeWith(service);
} else {
fileReadingParallelism = 1;
}

if (format.contains("avro")) {
Expand Down Expand Up @@ -171,6 +176,10 @@ public static List<DataFile> listPartition(
}
}

public static boolean isFileReadingParallelized() {
return fileReadingParallelism > 1;
}

private static Metrics getAvroMetrics(Path path, Configuration conf) {
try {
InputFile file = HadoopInputFile.fromPath(path, conf);
Expand Down
4 changes: 3 additions & 1 deletion docs/docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ See [`migrate`](#migrate) to replace an existing table with an Iceberg table.
| `table` | ✔️ | string | Name of the new Iceberg table to create |
| `location` | | string | Table location for the new table (delegated to the catalog by default) |
| `properties` || map<string, string> | Properties to add to the newly created table |
| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) |

#### Output

Expand Down Expand Up @@ -588,6 +589,7 @@ By default, the original table is retained with the name `table_BACKUP_`.
| `properties` || map<string, string> | Properties for the new Iceberg table |
| `drop_backup` | | boolean | When true, the original table will not be retained as backup (defaults to false) |
| `backup_table_name` | | string | Name of the table that will be retained as backup (defaults to `table_BACKUP_`) |
| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) |

#### Output

Expand Down Expand Up @@ -629,7 +631,7 @@ will then treat these files as if they are part of the set of files owned by Ic
| `source_table` | ✔️ | string | Table where files should come from, paths are also possible in the form of \`file_format\`.\`path\` |
| `partition_filter` || map<string, string> | A map of partitions in the source table to import from |
| `check_duplicate_files` || boolean | Whether to prevent files existing in the table from being added (defaults to true) |
| `parallelism` | | int | number of threads to use for file reading (defaults to 1) |
| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) |

Warning : Schema is not validated, adding files with different schema to the Iceberg table will cause issues.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.TableMigrationUtil;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.spark.sql.AnalysisException;
Expand Down Expand Up @@ -232,4 +233,32 @@ public void testMigrateEmptyTable() throws Exception {
Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, tableName);
assertThat(result).isEqualTo(0L);
}

@TestTemplate
public void testMigrateWithParallelism() throws IOException {
assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");

testWithParallelism(-1, false);
testWithParallelism(0, false);
testWithParallelism(1, false);
testWithParallelism(5, true);
}

private void testWithParallelism(int parallelism, boolean parallelized) throws IOException {
String location = Files.createTempDirectory(temp, "junit").toFile().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
tableName, location);
sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);

Object result =
scalarSql(
"CALL %s.system.migrate(table => '%s', parallelism => %d)",
catalogName, tableName, parallelism);
assertThat(result).as("Should have added two files").isEqualTo(2L);
assertThat(TableMigrationUtil.isFileReadingParallelized()).isEqualTo(parallelized);
sql("DROP TABLE IF EXISTS %s PURGE", tableName);
sql("DROP TABLE IF EXISTS %s_BACKUP_", tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.TableMigrationUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.spark.sql.AnalysisException;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -223,4 +224,41 @@ public void testInvalidSnapshotsCases() throws IOException {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
}

@TestTemplate
public void testSnapshotWithParallelism() throws IOException {
testWithParallelism(-1, false);
testWithParallelism(0, false);
testWithParallelism(1, false);
testWithParallelism(5, true);
}

private void testWithParallelism(int parallelism, boolean parallelized) throws IOException {
String location = Files.createTempDirectory(temp, "junit").toFile().toString();
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet LOCATION '%s'",
sourceName, location);
sql("INSERT INTO TABLE %s VALUES (1, 'a')", sourceName);
sql("INSERT INTO TABLE %s VALUES (2, 'b')", sourceName);

Object result =
scalarSql(
"CALL %s.system.snapshot(source_table => '%s', table => '%s', parallelism => %d)",
catalogName, sourceName, tableName, parallelism);

assertThat(result).as("Should have added two files").isEqualTo(2L);
assertThat(TableMigrationUtil.isFileReadingParallelized()).isEqualTo(parallelized);

Table createdTable = validationCatalog.loadTable(tableIdent);
String tableLocation = createdTable.location();
assertThat(tableLocation)
.as("Table should not have the original location")
.isNotEqualTo(location);

assertThat(sql("SELECT * FROM %s ORDER BY id", tableName))
.containsExactly(row(1L, "a"), row(2L, "b"));

sql("DROP TABLE IF EXISTS %s", tableName);
sql("DROP TABLE IF EXISTS %s PURGE", sourceName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,34 @@ public static void importSparkTable(
spark, sourceTableIdent, targetTable, stagingDir, partitionFilter, checkDuplicateFiles, 1);
}

/**
* Import files from an existing Spark table to an Iceberg table.
*
* <p>The import uses the Spark session to get table metadata. It assumes no operation is going on
* the original and target table and thus is not thread-safe.
*
* @param spark a Spark session
* @param sourceTableIdent an identifier of the source Spark table
* @param targetTable an Iceberg table where to import the data
* @param stagingDir a staging directory to store temporary manifest files
* @param parallelism number of threads to use for file reading
*/
public static void importSparkTable(
SparkSession spark,
TableIdentifier sourceTableIdent,
Table targetTable,
String stagingDir,
int parallelism) {
importSparkTable(
spark,
sourceTableIdent,
targetTable,
stagingDir,
Collections.emptyMap(),
false,
parallelism);
}

/**
* Import files from an existing Spark table to an Iceberg table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat

private Identifier backupIdent;
private boolean dropBackup = false;
private int parallelism = 1;

MigrateTableSparkAction(
SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
Expand Down Expand Up @@ -108,6 +109,12 @@ public MigrateTableSparkAction backupTableName(String tableName) {
return this;
}

@Override
public MigrateTableSparkAction parallelism(int numThreads) {
this.parallelism = numThreads;
return this;
}

@Override
public MigrateTable.Result execute() {
String desc = String.format("Migrating table %s", destTableIdent().toString());
Expand Down Expand Up @@ -137,7 +144,8 @@ private MigrateTable.Result doExecute() {
TableIdentifier v1BackupIdent = new TableIdentifier(backupIdent.name(), backupNamespace);
String stagingLocation = getMetadataLocation(icebergTable);
LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
SparkTableUtil.importSparkTable(spark(), v1BackupIdent, icebergTable, stagingLocation);
SparkTableUtil.importSparkTable(
spark(), v1BackupIdent, icebergTable, stagingLocation, parallelism);

LOG.info("Committing staged changes to {}", destTableIdent());
stagedTable.commitStagedChanges();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class SnapshotTableSparkAction extends BaseTableCreationSparkAction<Snaps
private StagingTableCatalog destCatalog;
private Identifier destTableIdent;
private String destTableLocation = null;
private int parallelism = 1;

SnapshotTableSparkAction(
SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
Expand Down Expand Up @@ -98,6 +99,12 @@ public SnapshotTableSparkAction tableProperty(String property, String value) {
return this;
}

@Override
public SnapshotTable parallelism(int numThreads) {
this.parallelism = numThreads;
return this;
}

@Override
public SnapshotTable.Result execute() {
String desc = String.format("Snapshotting table %s as %s", sourceTableIdent(), destTableIdent);
Expand Down Expand Up @@ -126,7 +133,8 @@ private SnapshotTable.Result doExecute() {
TableIdentifier v1TableIdent = v1SourceTable().identifier();
String stagingLocation = getMetadataLocation(icebergTable);
LOG.info("Generating Iceberg metadata for {} in {}", destTableIdent(), stagingLocation);
SparkTableUtil.importSparkTable(spark(), v1TableIdent, icebergTable, stagingLocation);
SparkTableUtil.importSparkTable(
spark(), v1TableIdent, icebergTable, stagingLocation, parallelism);

LOG.info("Committing staged changes to {}", destTableIdent());
stagedTable.commitStagedChanges();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class MigrateTableProcedure extends BaseProcedure {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.optional("properties", STRING_MAP),
ProcedureParameter.optional("drop_backup", DataTypes.BooleanType),
ProcedureParameter.optional("backup_table_name", DataTypes.StringType)
ProcedureParameter.optional("backup_table_name", DataTypes.StringType),
ProcedureParameter.optional("parallelism", DataTypes.IntegerType)
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -105,6 +106,10 @@ public InternalRow[] call(InternalRow args) {
migrateTableSparkAction = migrateTableSparkAction.backupTableName(backupTableName);
}

if (!args.isNullAt(4)) {
migrateTableSparkAction = migrateTableSparkAction.parallelism(args.getInt(4));
}

MigrateTable.Result result = migrateTableSparkAction.execute();
return new InternalRow[] {newInternalRow(result.migratedDataFilesCount())};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class SnapshotTableProcedure extends BaseProcedure {
ProcedureParameter.required("source_table", DataTypes.StringType),
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.optional("location", DataTypes.StringType),
ProcedureParameter.optional("properties", STRING_MAP)
ProcedureParameter.optional("properties", STRING_MAP),
ProcedureParameter.optional("parallelism", DataTypes.IntegerType)
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -102,6 +103,10 @@ public InternalRow[] call(InternalRow args) {
action.tableLocation(snapshotLocation);
}

if (!args.isNullAt(4)) {
action = action.parallelism(args.getInt(4));
}

SnapshotTable.Result result = action.tableProperties(properties).execute();
return new InternalRow[] {newInternalRow(result.importedDataFilesCount())};
}
Expand Down

0 comments on commit 23e7062

Please sign in to comment.