-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.5: Parallelize reading files in snapshot and migrate procedures #10037
Conversation
...extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSnapshotTableProcedure.java
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java
Outdated
Show resolved
Hide resolved
...-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
Show resolved
Hide resolved
475c51e
to
fef975d
Compare
...-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
Outdated
Show resolved
Hide resolved
fef975d
to
23e7062
Compare
data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
Outdated
Show resolved
Hide resolved
4f90cbe
to
6159067
Compare
...k/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
Outdated
Show resolved
Hide resolved
...-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code here IMHO should be checking what number of threads is passed to the executor service constructor. That's sufficient to test for me.
I also think that we shouldn't allow 0 or -1. Seems like those should have been forbidden before and I don't see why we would allow them now.
I'm also approving this in terms that when Eduard is satisfied i'm good to go as well.
...-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Nothing to add except what was already noted by others.
bee975e
to
63d8fc0
Compare
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
Outdated
Show resolved
Hide resolved
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; | ||
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
|
||
/** Have a separate class for getting ExecutorService to make it testable with static mock */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure it is a good idea to add another public class for this? How do we see it being used in the future? Why not use ThreadPools
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we do static mock, all invocations need to be mocked as well. That includes ThreadPools
used for SnapshotProducer
and other places. Having a separate class and mock it is the cleanest way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not convinced it justifies adding another public class. What if we overload listPartition
or whatever method we need in TableMigrationUtil
to accept ExecutorService
?
It is an open question whether we should accept ExecutorService
or int
in the actions (not procedures). I think we have both approaches in our APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi I finally get time to refactor a bit, accepting ExecutorService
in the actions, SparkTableUtil
and TableMigrationUtil
. Please take another look.
63d8fc0
to
c524c75
Compare
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
Outdated
Show resolved
Hide resolved
c524c75
to
aa3aef1
Compare
ae7bd62
to
f702e55
Compare
f702e55
to
3de9fd2
Compare
@nastra @RussellSpitzer @aokolnychyi Could you please take another look? |
@manuzhang could you rebase the PR please as there were some changes that might affect it |
2fb2eb9
to
27cf7b7
Compare
@nastra Done! |
api/src/main/java/org/apache/iceberg/actions/SnapshotTable.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
Outdated
Show resolved
Hide resolved
@@ -215,11 +250,7 @@ private static DataFile buildDataFile( | |||
.build(); | |||
} | |||
|
|||
private static ExecutorService migrationService(int parallelism) { | |||
return MoreExecutors.getExitingExecutorService( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we switching here from an exiting executor service?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThreadPools.newWorkerPool
also returns an exiting executor service.
if (!args.isNullAt(4)) { | ||
int parallelism = args.getInt(4); | ||
Preconditions.checkArgument(parallelism > 0, "Parallelism should be larger than 0"); | ||
action = action.executeSnapshotWith(executorService(parallelism, "table-snapshot")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdown of the executor should have been handled in executorService
, but we need to do the same for the executor that is used for migrate. See also my other comment in https://github.com/apache/iceberg/pull/10037/files#r1654342618
27cf7b7
to
a138c0c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for getting this done @manuzhang
Leaving an idea for a further speed up for table w/ data skewness on partition level: we can further divide files from a given partition into X number of buckets. |
@puchengy Feel free to open a new issue to track. |
…te procedures Back-port of apache#10037
…te procedures Back-port of apache#10037
…cedures Back-port of apache#10037
…cedures Back-port of apache#10037
…he#11043) Back-port of apache#9274 Back-port of apache#10037
Similar to #9274 for
snapshot
andmigrate
procedures.