-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8800] Introduce SingleSparkConsistentBucketClusteringExecutionStrategy to improve performance #12537
base: master
Are you sure you want to change the base?
[HUDI-8800] Introduce SingleSparkConsistentBucketClusteringExecutionStrategy to improve performance #12537
Conversation
@hudi-bot run azure |
One subtask will be finished in anther pr:
|
…y to avoid shuffle 1. introduce SingleSparkConsistentBucketClusteringExecutionStrategy to avoid shuffle Signed-off-by: TheR1sing3un <[email protected]>
2cf056a
to
665d201
Compare
int readParallelism = Math.min(writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size()); | ||
|
||
return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism).mapPartitions(clusteringOpsPartition -> { | ||
List<Supplier<ClosableIterator<HoodieRecord<T>>>> suppliers = new ArrayList<>(); | ||
clusteringOpsPartition.forEachRemaining(clusteringOp -> { | ||
|
||
Supplier<ClosableIterator<HoodieRecord<T>>> iteratorSupplier = () -> { | ||
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These deleted code simply moved to the SparkJobExecutionStrategy used to provide a common reading method
int readParallelism = Math.min(writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size()); | ||
|
||
return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism) | ||
.mapPartitions(clusteringOpsPartition -> { | ||
List<Supplier<ClosableIterator<HoodieRecord<T>>>> iteratorGettersForPartition = new ArrayList<>(); | ||
clusteringOpsPartition.forEachRemaining(clusteringOp -> { | ||
Supplier<ClosableIterator<HoodieRecord<T>>> recordIteratorGetter = () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These deleted code simply moved to the SparkJobExecutionStrategy used to provide a common reading method
iteratorGettersForPartition.add(recordIteratorGetter); | ||
}); | ||
|
||
return new LazyConcatenatingIterator<>(iteratorGettersForPartition); | ||
})); | ||
} | ||
|
||
private HoodieFileReader getBaseOrBootstrapFileReader(StorageConfiguration<?> storageConf, String bootstrapBasePath, Option<String[]> partitionFields, ClusteringOperation clusteringOp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These deleted code simply moved to the SparkJobExecutionStrategy used to provide a common reading method
final TaskContextSupplier taskContextSupplier = getEngineContext().getTaskContextSupplier(); | ||
final SerializableSchema serializableSchema = new SerializableSchema(schema); | ||
final List<ClusteringGroupInfo> clusteringGroupInfos = clusteringPlan.getInputGroups().stream().map(ClusteringGroupInfo::create).collect(Collectors.toList()); | ||
|
||
String umask = engineContext.hadoopConfiguration().get("fs.permissions.umask-mode"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None of this logic is valid, and the class hasn't been used anywhere before, so I refactor this part of the code
@danny0405 Hi, Danny. In my opinion, it is not necessary to adopt the current logic for clustering of the type of consistent bucket resizing. We only need to process it like compaction: no |
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
Outdated
Show resolved
Hide resolved
...i/client/clustering/run/strategy/SingleSparkConsistentBucketClusteringExecutionStrategy.java
Outdated
Show resolved
Hide resolved
...i/client/clustering/run/strategy/SingleSparkConsistentBucketClusteringExecutionStrategy.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkJobExecutionStrategy.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/model/ClusteringGroupInfo.java
Outdated
Show resolved
Hide resolved
...hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
Outdated
Show resolved
Hide resolved
...hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
Outdated
Show resolved
Hide resolved
Two subtask:
|
1. refactor clustering related code for better readability Signed-off-by: TheR1sing3un <[email protected]>
1. fix ut Signed-off-by: TheR1sing3un <[email protected]>
@@ -110,7 +115,7 @@ public void setup(int maxFileSize, Map<String, String> options) throws IOExcepti | |||
.withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(maxFileSize).build()) | |||
.withClusteringConfig(HoodieClusteringConfig.newBuilder() | |||
.withClusteringPlanStrategyClass(SparkConsistentBucketClusteringPlanStrategy.class.getName()) | |||
.withClusteringExecutionStrategyClass(SparkConsistentBucketClusteringExecutionStrategy.class.getName()) | |||
.withClusteringExecutionStrategyClass(singleJob ? SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY : SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY
always better than SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY
, why we need two execution strategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is
SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY
always better thanSPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY
, why we need two execution strategy.
I'm not sure if any user is already using SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY
, and if so, should we keep it for compatibility? If not, we can deprecated it.
1. remove unused SparkJobExecutionStrategy Signed-off-by: TheR1sing3un <[email protected]>
1. fix ut Signed-off-by: TheR1sing3un <[email protected]>
For consistent bucket resizing, it creates a spark job for each clustering group, and then uses spark's datasource method to read the file slices that need to be merged/split inside each job. The data is then written to the new file group using a datasource bulk insert. The above process is fine for other types of clustering, but for bucket resizing, clustering in the above way will have performance problems.
For bucket resizing, the clustering plan already explicitly lists the mapping relationships in the resizing groups, and we already know which file slices each clustering group reads and writes to which file groups
Validation in our production environment:
Change Logs
Describe context and summary for this change. Highlight if any code was copied.
Impact
none
Describe any public API or user-facing feature change or any performance impact.
Risk level (write none, low medium or high below)
low
If medium or high, explain what verification was done to mitigate the risks.
Documentation Update
none
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist