From 9c344f96c874dcdb88c57dff15adba0647ce47e5 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Thu, 29 Aug 2024 08:38:32 -0700 Subject: [PATCH] Spark 3.5: Use FileGenerationUtil in PlanningBenchmark (#11027) --- .../iceberg/spark/PlanningBenchmark.java | 118 ++++-------------- 1 file changed, 26 insertions(+), 92 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java b/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java index e2ce5e956348..ed97e6b08414 100644 --- a/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java +++ b/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java @@ -20,49 +20,41 @@ import static org.apache.iceberg.PlanningMode.DISTRIBUTED; import static org.apache.iceberg.PlanningMode.LOCAL; -import static org.apache.spark.sql.functions.lit; import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.AppendFiles; import org.apache.iceberg.BatchScan; import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileMetadata; -import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.FileGenerationUtil; import org.apache.iceberg.PlanningMode; import org.apache.iceberg.RowDelta; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.ScanTask; -import org.apache.iceberg.Schema; import org.apache.iceberg.SparkDistributedDataScan; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.spark.data.RandomData; import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.parser.ParseException; -import org.apache.spark.sql.types.StructType; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -108,10 +100,8 @@ public class PlanningBenchmark { Expressions.and(PARTITION_PREDICATE, SORT_KEY_PREDICATE); private static final int NUM_PARTITIONS = 30; - private static final int NUM_REAL_DATA_FILES_PER_PARTITION = 25; - private static final int NUM_REPLICA_DATA_FILES_PER_PARTITION = 50_000; + private static final int NUM_DATA_FILES_PER_PARTITION = 50_000; private static final int NUM_DELETE_FILES_PER_PARTITION = 50; - private static final int NUM_ROWS_PER_DATA_FILE = 500; private final Configuration hadoopConf = new Configuration(); private SparkSession spark; @@ -285,99 +275,43 @@ private void dropTable() { sql("DROP TABLE IF EXISTS %s PURGE", TABLE_NAME); } - private DataFile loadAddedDataFile() { - table.refresh(); - - Iterable dataFiles = table.currentSnapshot().addedDataFiles(table.io()); - return Iterables.getOnlyElement(dataFiles); - } - - private DeleteFile loadAddedDeleteFile() { - table.refresh(); - - Iterable deleteFiles = table.currentSnapshot().addedDeleteFiles(table.io()); - return Iterables.getOnlyElement(deleteFiles); - } - - private void initDataAndDeletes() throws NoSuchTableException { - Schema schema = table.schema(); - PartitionSpec spec = table.spec(); - LocationProvider locations = table.locationProvider(); - + private void initDataAndDeletes() { for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; partitionOrdinal++) { - Dataset inputDF = - randomDataDF(schema, NUM_ROWS_PER_DATA_FILE) - .drop(PARTITION_COLUMN) - .withColumn(PARTITION_COLUMN, lit(partitionOrdinal)) - .drop(SORT_KEY_COLUMN) - .withColumn(SORT_KEY_COLUMN, lit(Integer.MIN_VALUE)); - - for (int fileOrdinal = 0; fileOrdinal < NUM_REAL_DATA_FILES_PER_PARTITION; fileOrdinal++) { - appendAsFile(inputDF); - } + StructLike partition = TestHelpers.Row.of(partitionOrdinal); - DataFile dataFile = loadAddedDataFile(); - - sql( - "DELETE FROM %s WHERE ss_item_sk IS NULL AND %s = %d", - TABLE_NAME, PARTITION_COLUMN, partitionOrdinal); - - DeleteFile deleteFile = loadAddedDeleteFile(); - - AppendFiles append = table.newFastAppend(); + RowDelta rowDelta = table.newRowDelta(); - for (int fileOrdinal = 0; fileOrdinal < NUM_REPLICA_DATA_FILES_PER_PARTITION; fileOrdinal++) { - String replicaFileName = UUID.randomUUID() + "-replica.parquet"; - DataFile replicaDataFile = - DataFiles.builder(spec) - .copy(dataFile) - .withPath(locations.newDataLocation(spec, dataFile.partition(), replicaFileName)) - .build(); - append.appendFile(replicaDataFile); + for (int fileOrdinal = 0; fileOrdinal < NUM_DATA_FILES_PER_PARTITION; fileOrdinal++) { + DataFile dataFile = generateDataFile(partition, Integer.MIN_VALUE, Integer.MIN_VALUE); + rowDelta.addRows(dataFile); } - append.commit(); - - RowDelta rowDelta = table.newRowDelta(); + // add one data file that would match the sort key predicate + DataFile sortKeyDataFile = generateDataFile(partition, SORT_KEY_VALUE, SORT_KEY_VALUE); + rowDelta.addRows(sortKeyDataFile); for (int fileOrdinal = 0; fileOrdinal < NUM_DELETE_FILES_PER_PARTITION; fileOrdinal++) { - String replicaFileName = UUID.randomUUID() + "-replica.parquet"; - DeleteFile replicaDeleteFile = - FileMetadata.deleteFileBuilder(spec) - .copy(deleteFile) - .withPath(locations.newDataLocation(spec, deleteFile.partition(), replicaFileName)) - .build(); - rowDelta.addDeletes(replicaDeleteFile); + DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, partition); + rowDelta.addDeletes(deleteFile); } rowDelta.commit(); - - Dataset sortedInputDF = - randomDataDF(schema, NUM_ROWS_PER_DATA_FILE) - .drop(SORT_KEY_COLUMN) - .withColumn(SORT_KEY_COLUMN, lit(SORT_KEY_VALUE)) - .drop(PARTITION_COLUMN) - .withColumn(PARTITION_COLUMN, lit(partitionOrdinal)); - appendAsFile(sortedInputDF); } } - private void appendAsFile(Dataset df) throws NoSuchTableException { - df.coalesce(1).writeTo(TABLE_NAME).append(); + private DataFile generateDataFile(StructLike partition, int sortKeyMin, int sortKeyMax) { + int sortKeyFieldId = table.schema().findField(SORT_KEY_COLUMN).fieldId(); + ByteBuffer lower = Conversions.toByteBuffer(Types.IntegerType.get(), sortKeyMin); + Map lowerBounds = ImmutableMap.of(sortKeyFieldId, lower); + ByteBuffer upper = Conversions.toByteBuffer(Types.IntegerType.get(), sortKeyMax); + Map upperBounds = ImmutableMap.of(sortKeyFieldId, upper); + return FileGenerationUtil.generateDataFile(table, partition, lowerBounds, upperBounds); } private String newWarehouseDir() { return hadoopConf.get("hadoop.tmp.dir") + UUID.randomUUID(); } - private Dataset randomDataDF(Schema schema, int numRows) { - Iterable rows = RandomData.generateSpark(schema, numRows, 0); - JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD rowRDD = context.parallelize(Lists.newArrayList(rows)); - StructType rowSparkType = SparkSchemaUtil.convert(schema); - return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType, false); - } - private List planFilesWithoutColumnStats(BatchScan scan, Expression predicate) { return planFiles(scan, predicate, false); }