-
Notifications
You must be signed in to change notification settings - Fork 327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Ray] task based shuffle for ray #3040
[Ray] task based shuffle for ray #3040
Conversation
73f466d
to
8427dd7
Compare
37b50b3
to
fd866b2
Compare
ec143bb
to
0c32530
Compare
…shuffle # Conflicts: # mars/core/operand/shuffle.py # mars/services/task/analyzer/analyzer.py # mars/services/task/execution/api.py # mars/tests/test_utils.py
0148219
to
7e0d239
Compare
85423b1
to
7f9e8e8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments
@@ -112,3 +128,11 @@ def iter_mapper_data( | |||
ctx, input_id, mapper_id, pop=pop, skip_none=skip_none | |||
): | |||
yield data | |||
|
|||
def execute(self, ctx, op): | |||
"""The mapper stage must ensure all mapper blocks are inserted into ctx and no blocks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, no block missing for both key and index, or just for index?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for fetch_by_index
28b4ad4
to
e5aa38b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What do these changes do?
This PR implements pull-based shuffle for ray task backend, which will be the basis for later push-based shuffle for ray. Main changes include:
ShuffleFetchType
which containsFETCH_BY_KEY
/FETCH_BY_INDEX
. IfFETCH_BY_INDEX
is used,source_keys/source_idxes/source_mappers
will be skipped. In this way the meta and scheduling cost for supervisor will be much smaller, about 1000~10000x smaller for big shuffle. This will make mars able to schedule large-scale shuffle, even PB-scale data shuffle.FetchShuffle
, and skipsource_keys/source_idxes/source_mappers
whenshuffle_fetch_type
isFETCH_BY_INDEX
.FETCH_BY_INDEX
.TensorReshape
/TensorBincount
PSRSAlign
.data_keys
for subtask execution on rayShuffleManager
for shuffle execution on rayBefore this PR, for a shuffle with 100000 partitions, every reducer subtask meta will took about 16M memory, which make mars to handle only less than 1000 partitions, i.e. less than 1 T data.
data:image/s3,"s3://crabby-images/e1179/e1179347c8bc15a7fac57d1b84ab4b7d3059c0fa" alt=""
data:image/s3,"s3://crabby-images/3213b/3213be498e2a0e68e99edecd8c27f026865ccb52" alt=""
With this PR, every reducer subtask took only 2Kb memory, which make mars can handle 8000_000 partitions. If every partition has 1Gb data, this PR will enable mars handle 8PB data shuffle.
#2916
#3039
#2112
Check code requirements