From 629614ef68e901cdcc64248acae944da20d99371 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Wed, 22 Jan 2025 00:14:38 +0800 Subject: [PATCH] Test Spark broadcast --- .../main/java/org/apache/iceberg/SparkDistributedDataScan.java | 2 +- .../java/org/apache/iceberg/spark/actions/BaseSparkAction.java | 2 +- .../iceberg/spark/actions/RewriteManifestsSparkAction.java | 3 +-- .../org/apache/iceberg/spark/source/SparkMicroBatchStream.java | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java index 43ce2a303e2b..0a99e8e21b04 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java @@ -190,7 +190,7 @@ private List toBeans(List manifests) { private Broadcast tableBroadcast() { if (tableBroadcast == null) { Table serializableTable = SerializableTableWithSize.copyOf(table()); - this.tableBroadcast = sparkContext.broadcast(serializableTable); + this.tableBroadcast = sparkContext.broadcast(table()); } return tableBroadcast; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index a8e82d101fbf..2ea3c29b261f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -150,7 +150,7 @@ protected Dataset contentFileDS(Table table) { protected Dataset contentFileDS(Table table, Set snapshotIds) { Table serializableTable = SerializableTableWithSize.copyOf(table); - Broadcast
tableBroadcast = sparkContext.broadcast(serializableTable); + Broadcast
tableBroadcast = sparkContext.broadcast(table); int numShufflePartitions = spark.sessionState().conf().numShufflePartitions(); Dataset manifestBeanDS = diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 60e2b11881cb..a15446028889 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -59,7 +59,6 @@ import org.apache.iceberg.spark.SparkContentFile; import org.apache.iceberg.spark.SparkDataFile; import org.apache.iceberg.spark.SparkDeleteFile; -import org.apache.iceberg.spark.source.SerializableTableWithSize; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.ThreadPools; @@ -378,7 +377,7 @@ private void deleteFiles(Iterable locations) { private ManifestWriterFactory manifestWriters() { return new ManifestWriterFactory( - sparkContext().broadcast(SerializableTableWithSize.copyOf(table)), + sparkContext().broadcast(table), formatVersion, spec.specId(), outputLocation, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 49180e07c465..d5297a385c4f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -97,7 +97,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio this.caseSensitive = readConf.caseSensitive(); this.expectedSchema = SchemaParser.toJson(expectedSchema); this.localityPreferred = readConf.localityEnabled(); - this.tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + this.tableBroadcast = sparkContext.broadcast(table); this.splitSize = readConf.splitSize(); this.splitLookback = readConf.splitLookback(); this.splitOpenFileCost = readConf.splitOpenFileCost();