Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5: Parallelize reading files in snapshot and migrate procedures #10037

Merged
merged 2 commits into from
Jun 26, 2024

Conversation

manuzhang
Copy link
Collaborator

@manuzhang manuzhang commented Mar 25, 2024

Similar to #9274 for snapshot and migrate procedures.

@manuzhang manuzhang force-pushed the parallelize-migrate-procedures branch 2 times, most recently from 475c51e to fef975d Compare March 27, 2024 07:40
@manuzhang manuzhang force-pushed the parallelize-migrate-procedures branch from fef975d to 23e7062 Compare April 2, 2024 04:17
@github-actions github-actions bot added the data label Apr 2, 2024
@manuzhang manuzhang force-pushed the parallelize-migrate-procedures branch 2 times, most recently from 4f90cbe to 6159067 Compare April 9, 2024 10:19
@manuzhang manuzhang requested a review from nastra April 9, 2024 11:03
Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code here IMHO should be checking what number of threads is passed to the executor service constructor. That's sufficient to test for me.

I also think that we shouldn't allow 0 or -1. Seems like those should have been forbidden before and I don't see why we would allow them now.

I'm also approving this in terms that when Eduard is satisfied i'm good to go as well.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Nothing to add except what was already noted by others.

@manuzhang manuzhang force-pushed the parallelize-migrate-procedures branch 3 times, most recently from bee975e to 63d8fc0 Compare April 12, 2024 17:21
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;

/** Have a separate class for getting ExecutorService to make it testable with static mock */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure it is a good idea to add another public class for this? How do we see it being used in the future? Why not use ThreadPools instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we do static mock, all invocations need to be mocked as well. That includes ThreadPools used for SnapshotProducer and other places. Having a separate class and mock it is the cleanest way.

Copy link
Contributor

@aokolnychyi aokolnychyi Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not convinced it justifies adding another public class. What if we overload listPartition or whatever method we need in TableMigrationUtil to accept ExecutorService?

It is an open question whether we should accept ExecutorService or int in the actions (not procedures). I think we have both approaches in our APIs.

Copy link
Collaborator Author

@manuzhang manuzhang May 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi I finally get time to refactor a bit, accepting ExecutorService in the actions, SparkTableUtil and TableMigrationUtil. Please take another look.

@manuzhang manuzhang force-pushed the parallelize-migrate-procedures branch from 63d8fc0 to c524c75 Compare April 16, 2024 16:08
@manuzhang manuzhang force-pushed the parallelize-migrate-procedures branch from c524c75 to aa3aef1 Compare April 17, 2024 02:05
@manuzhang manuzhang force-pushed the parallelize-migrate-procedures branch from ae7bd62 to f702e55 Compare May 20, 2024 04:16
@manuzhang manuzhang force-pushed the parallelize-migrate-procedures branch from f702e55 to 3de9fd2 Compare June 12, 2024 03:35
@manuzhang
Copy link
Collaborator Author

@nastra @RussellSpitzer @aokolnychyi Could you please take another look?

@nastra
Copy link
Contributor

nastra commented Jun 25, 2024

@manuzhang could you rebase the PR please as there were some changes that might affect it

@manuzhang manuzhang force-pushed the parallelize-migrate-procedures branch 2 times, most recently from 2fb2eb9 to 27cf7b7 Compare June 25, 2024 17:20
@manuzhang
Copy link
Collaborator Author

@nastra Done!

@@ -215,11 +250,7 @@ private static DataFile buildDataFile(
.build();
}

private static ExecutorService migrationService(int parallelism) {
return MoreExecutors.getExitingExecutorService(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we switching here from an exiting executor service?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadPools.newWorkerPool also returns an exiting executor service.

if (!args.isNullAt(4)) {
int parallelism = args.getInt(4);
Preconditions.checkArgument(parallelism > 0, "Parallelism should be larger than 0");
action = action.executeSnapshotWith(executorService(parallelism, "table-snapshot"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown of the executor should have been handled in executorService, but we need to do the same for the executor that is used for migrate. See also my other comment in https://github.com/apache/iceberg/pull/10037/files#r1654342618

@manuzhang manuzhang force-pushed the parallelize-migrate-procedures branch from 27cf7b7 to a138c0c Compare June 26, 2024 14:46
Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for getting this done @manuzhang

@nastra nastra merged commit 87a988f into apache:main Jun 26, 2024
46 checks passed
@puchengy
Copy link
Contributor

Leaving an idea for a further speed up for table w/ data skewness on partition level: we can further divide files from a given partition into X number of buckets.

@manuzhang
Copy link
Collaborator Author

@puchengy Feel free to open a new issue to track.

jasonf20 pushed a commit to jasonf20/iceberg that referenced this pull request Aug 4, 2024
manuzhang added a commit to manuzhang/iceberg that referenced this pull request Aug 29, 2024
manuzhang added a commit to manuzhang/iceberg that referenced this pull request Aug 29, 2024
manuzhang added a commit to manuzhang/iceberg that referenced this pull request Aug 29, 2024
manuzhang added a commit to manuzhang/iceberg that referenced this pull request Aug 30, 2024
amogh-jahagirdar pushed a commit that referenced this pull request Sep 5, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants