Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8800] Introduce SingleSparkConsistentBucketClusteringExecutionStrategy to improve performance #12537

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

TheR1sing3un
Copy link
Member

@TheR1sing3un TheR1sing3un commented Dec 24, 2024

For consistent bucket resizing, it creates a spark job for each clustering group, and then uses spark's datasource method to read the file slices that need to be merged/split inside each job. The data is then written to the new file group using a datasource bulk insert. The above process is fine for other types of clustering, but for bucket resizing, clustering in the above way will have performance problems.

For bucket resizing, the clustering plan already explicitly lists the mapping relationships in the resizing groups, and we already know which file slices each clustering group reads and writes to which file groups

  1. we can avoid unnecessary ser/deser between avro and internalrow
  2. we can eliminate the unnecessary shuffle process after reading and before writing
  3. we can avoid heavy job management

Validation in our production environment:

table

  • mor
  • consistent bucket index
  • initial bucket number = 64
  • 64 buckets per partition
  • 20MB per bucket
  • 15 partitions

operation

  • try to resize to 128 bucket per partition
  • executor memory=10G

original clustering

image

optimized clustering

image

Change Logs

  1. introduce SingleSparkConsistentBucketClusteringExecutionStrategy to avoid shuffle
    Describe context and summary for this change. Highlight if any code was copied.

Impact

none
Describe any public API or user-facing feature change or any performance impact.

Risk level (write none, low medium or high below)

low
If medium or high, explain what verification was done to mitigate the risks.

Documentation Update

none
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Dec 24, 2024
@TheR1sing3un TheR1sing3un changed the title [DNM] Introduce SingleSparkConsistentBucketClusteringExecutionStrategy to avoid shuffle [HUDI-8800] Introduce SingleSparkConsistentBucketClusteringExecutionStrategy to improve performance Dec 30, 2024
@TheR1sing3un
Copy link
Member Author

@hudi-bot run azure

@TheR1sing3un
Copy link
Member Author

TheR1sing3un commented Dec 30, 2024

One subtask will be finished in anther pr:

  • Separate resizing and sort logic of consistent bucket index to maintain clear clustering meaning

…y to avoid shuffle

1. introduce SingleSparkConsistentBucketClusteringExecutionStrategy to avoid shuffle

Signed-off-by: TheR1sing3un <[email protected]>
@TheR1sing3un TheR1sing3un force-pushed the feat_local_clustering_execution_strategy branch from 2cf056a to 665d201 Compare January 2, 2025 03:47
int readParallelism = Math.min(writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size());

return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism).mapPartitions(clusteringOpsPartition -> {
List<Supplier<ClosableIterator<HoodieRecord<T>>>> suppliers = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {

Supplier<ClosableIterator<HoodieRecord<T>>> iteratorSupplier = () -> {
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These deleted code simply moved to the SparkJobExecutionStrategy used to provide a common reading method

int readParallelism = Math.min(writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size());

return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism)
.mapPartitions(clusteringOpsPartition -> {
List<Supplier<ClosableIterator<HoodieRecord<T>>>> iteratorGettersForPartition = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
Supplier<ClosableIterator<HoodieRecord<T>>> recordIteratorGetter = () -> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These deleted code simply moved to the SparkJobExecutionStrategy used to provide a common reading method

iteratorGettersForPartition.add(recordIteratorGetter);
});

return new LazyConcatenatingIterator<>(iteratorGettersForPartition);
}));
}

private HoodieFileReader getBaseOrBootstrapFileReader(StorageConfiguration<?> storageConf, String bootstrapBasePath, Option<String[]> partitionFields, ClusteringOperation clusteringOp)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These deleted code simply moved to the SparkJobExecutionStrategy used to provide a common reading method

final TaskContextSupplier taskContextSupplier = getEngineContext().getTaskContextSupplier();
final SerializableSchema serializableSchema = new SerializableSchema(schema);
final List<ClusteringGroupInfo> clusteringGroupInfos = clusteringPlan.getInputGroups().stream().map(ClusteringGroupInfo::create).collect(Collectors.toList());

String umask = engineContext.hadoopConfiguration().get("fs.permissions.umask-mode");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of this logic is valid, and the class hasn't been used anywhere before, so I refactor this part of the code

@TheR1sing3un
Copy link
Member Author

@danny0405 Hi, Danny. In my opinion, it is not necessary to adopt the current logic for clustering of the type of consistent bucket resizing. We only need to process it like compaction: no InternalRow conversion, no spark shuffle. And separate resizing and sort logic of consistent bucket index to maintain clear clustering meaning.

@TheR1sing3un
Copy link
Member Author

Two subtask:

  • Allows sorting tables of type consistent hashing index using an existing sorting strategy.
  • Deprecated previous hash resizing execution, using single mode by default.

1. refactor clustering related code for better readability

Signed-off-by: TheR1sing3un <[email protected]>
@TheR1sing3un TheR1sing3un requested a review from danny0405 January 9, 2025 03:53
1. fix ut

Signed-off-by: TheR1sing3un <[email protected]>
@@ -110,7 +115,7 @@ public void setup(int maxFileSize, Map<String, String> options) throws IOExcepti
.withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(maxFileSize).build())
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringPlanStrategyClass(SparkConsistentBucketClusteringPlanStrategy.class.getName())
.withClusteringExecutionStrategyClass(SparkConsistentBucketClusteringExecutionStrategy.class.getName())
.withClusteringExecutionStrategyClass(singleJob ? SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY : SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY always better than SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY, why we need two execution strategy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY always better than SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY, why we need two execution strategy.

I'm not sure if any user is already using SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY , and if so, should we keep it for compatibility? If not, we can deprecated it.

1. remove unused SparkJobExecutionStrategy

Signed-off-by: TheR1sing3un <[email protected]>
@TheR1sing3un TheR1sing3un requested a review from danny0405 January 9, 2025 08:19
1. fix ut

Signed-off-by: TheR1sing3un <[email protected]>
@hudi-bot
Copy link

hudi-bot commented Jan 9, 2025

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L PR with lines of changes in (300, 1000]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants